960 lines
30 KiB
Go
960 lines
30 KiB
Go
/*
|
|
Copyright 2021 Alibaba Group Holding Limited.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package instance
|
|
|
|
import (
|
|
"fmt"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
xstoremeta "github.com/alibaba/polardbx-operator/pkg/operator/v1/xstore/meta"
|
|
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
|
|
polardbxv1 "github.com/alibaba/polardbx-operator/api/v1"
|
|
polardbxv1polardbx "github.com/alibaba/polardbx-operator/api/v1/polardbx"
|
|
polardbxv1xstore "github.com/alibaba/polardbx-operator/api/v1/xstore"
|
|
"github.com/alibaba/polardbx-operator/pkg/featuregate"
|
|
"github.com/alibaba/polardbx-operator/pkg/k8s/control"
|
|
k8shelper "github.com/alibaba/polardbx-operator/pkg/k8s/helper"
|
|
"github.com/alibaba/polardbx-operator/pkg/meta/core/gms"
|
|
"github.com/alibaba/polardbx-operator/pkg/operator/v1/polardbx/convention"
|
|
"github.com/alibaba/polardbx-operator/pkg/operator/v1/polardbx/factory"
|
|
"github.com/alibaba/polardbx-operator/pkg/operator/v1/polardbx/helper"
|
|
polardbxmeta "github.com/alibaba/polardbx-operator/pkg/operator/v1/polardbx/meta"
|
|
polardbxv1reconcile "github.com/alibaba/polardbx-operator/pkg/operator/v1/polardbx/reconcile"
|
|
)
|
|
|
|
var CreateSecretsIfNotFound = polardbxv1reconcile.NewStepBinder("CreateSecretsIfNotFound",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polarDBX := rc.MustGetPolarDBX()
|
|
accountSecret, err := rc.GetPolarDBXSecret(convention.SecretTypeAccount)
|
|
if client.IgnoreNotFound(err) != nil {
|
|
return flow.Error(err, "Unable to get account secret.")
|
|
}
|
|
if accountSecret == nil {
|
|
if polarDBX.Spec.Restore != nil {
|
|
if client.IgnoreNotFound(err) != nil {
|
|
return flow.Error(err, "Unable to get original secret for restore")
|
|
}
|
|
accountSecret, err = factory.NewObjectFactory(rc).NewSecretForRestore()
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new account secret during restoring.")
|
|
}
|
|
} else {
|
|
accountSecret, err = factory.NewObjectFactory(rc).NewSecret()
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new account secret.")
|
|
}
|
|
}
|
|
err = rc.SetControllerRefAndCreate(accountSecret)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to create account secret.")
|
|
}
|
|
}
|
|
|
|
keySecret, err := rc.GetPolarDBXSecret(convention.SecretTypeSecurity)
|
|
if client.IgnoreNotFound(err) != nil {
|
|
return flow.Error(err, "Unable to get encode key secret.")
|
|
}
|
|
if keySecret == nil {
|
|
keySecret, err = factory.NewObjectFactory(rc).NewSecuritySecret()
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new encode key secret.")
|
|
}
|
|
err = rc.SetControllerRefAndCreate(keySecret)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to create encode key secret.")
|
|
}
|
|
}
|
|
return flow.Pass()
|
|
},
|
|
)
|
|
|
|
var CreateServicesIfNotFound = polardbxv1reconcile.NewStepBinder("CreateServicesIfNotFound",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
objectFactory := factory.NewObjectFactory(rc)
|
|
|
|
polardbx := rc.MustGetPolarDBX()
|
|
|
|
serviceTypes := []convention.ServiceType{
|
|
convention.ServiceTypeReadOnly,
|
|
}
|
|
|
|
if !polardbx.Spec.Readonly {
|
|
serviceTypes = []convention.ServiceType{
|
|
convention.ServiceTypeCDCMetrics,
|
|
convention.ServiceTypeReadWrite,
|
|
}
|
|
}
|
|
|
|
for _, serviceType := range serviceTypes {
|
|
service, err := rc.GetPolarDBXService(serviceType)
|
|
if client.IgnoreNotFound(err) != nil {
|
|
return flow.Error(err, "Unable to get service", "service-type", serviceType)
|
|
}
|
|
|
|
if service == nil {
|
|
switch serviceType {
|
|
case convention.ServiceTypeReadWrite:
|
|
service, err = objectFactory.NewService()
|
|
case convention.ServiceTypeReadOnly:
|
|
service, err = objectFactory.NewReadOnlyService()
|
|
case convention.ServiceTypeCDCMetrics:
|
|
service, err = objectFactory.NewCDCMetricsService()
|
|
default:
|
|
panic("unimplemented")
|
|
}
|
|
|
|
if err != nil {
|
|
return flow.Error(err, "Unable new service.", "service-type", serviceType)
|
|
}
|
|
|
|
err = rc.SetControllerRefAndCreate(service)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to create service.", "service-type", serviceType)
|
|
}
|
|
}
|
|
}
|
|
|
|
return flow.Pass()
|
|
},
|
|
)
|
|
|
|
var CreateConfigMapsIfNotFound = polardbxv1reconcile.NewStepBinder("CreateConfigMapsIfNotFound",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
objectFactory := factory.NewObjectFactory(rc)
|
|
|
|
for _, cmType := range []convention.ConfigMapType{
|
|
convention.ConfigMapTypeConfig,
|
|
convention.ConfigMapTypeTask,
|
|
} {
|
|
cm, err := rc.GetPolarDBXConfigMap(cmType)
|
|
if client.IgnoreNotFound(err) != nil {
|
|
return flow.Error(err, "Unable to get config map.", "configmap-type", cmType)
|
|
}
|
|
|
|
if cm == nil {
|
|
cm, err := objectFactory.NewConfigMap(cmType)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new config map.", "configmap-type", cmType)
|
|
}
|
|
|
|
err = rc.SetControllerRefAndCreate(cm)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to create config map.", "configmap-type", cmType)
|
|
}
|
|
}
|
|
}
|
|
|
|
return flow.Pass()
|
|
},
|
|
)
|
|
|
|
var CreateOrReconcileGMS = polardbxv1reconcile.NewStepBinder("CreateOrReconcileGMS",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
|
|
if polardbx.Spec.Readonly {
|
|
return flow.Continue("Readonly pxc, skip.")
|
|
}
|
|
|
|
if polardbx.Spec.ShareGMS {
|
|
return flow.Continue("GMS shared, skip.")
|
|
}
|
|
|
|
gmsStore, err := rc.GetGMS()
|
|
if client.IgnoreNotFound(err) != nil {
|
|
return flow.Continue("Unable to get xstore of GMS.")
|
|
}
|
|
if gmsStore == nil {
|
|
// Only valid when creating/restoring... Consider cluster is broken when
|
|
// GMS not found in other phases, so transfer the phase into failed.
|
|
if !helper.IsPhaseIn(polardbx, polardbxv1polardbx.PhaseCreating, polardbxv1polardbx.PhaseRestoring) {
|
|
helper.TransferPhase(polardbx, polardbxv1polardbx.PhaseFailed)
|
|
return flow.Retry("GMS not found, transfer into failed.")
|
|
}
|
|
|
|
objectFactory := factory.NewObjectFactory(rc)
|
|
gmsStore, err := objectFactory.NewXStoreGMS()
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new xstore of GMS.")
|
|
}
|
|
|
|
err = rc.SetControllerRefAndCreate(gmsStore)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to create xstore of GMS.")
|
|
}
|
|
|
|
return flow.Continue("GMS xstore created!")
|
|
} else {
|
|
gmsStoreGeneration, err := convention.GetGenerationLabelValue(gmsStore)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get generation from xstore of GMS.")
|
|
}
|
|
|
|
// Branch observed generation larger than generation on GMS xstore, update it!
|
|
if gmsStoreGeneration < polardbx.Status.ObservedGeneration {
|
|
// Skip upgrade if feature gate isn't enabled.
|
|
if !featuregate.StoreUpgrade.Enabled() {
|
|
return flow.Continue("Feature 'StoreUpgrade' not enabled, skip upgrade.", "xstore", gmsStore.Name)
|
|
}
|
|
|
|
objectFactory := factory.NewObjectFactory(rc)
|
|
newGmsStore, err := objectFactory.NewXStoreGMS()
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new xstore of GMS.")
|
|
}
|
|
|
|
convention.CopyMetadataForUpdate(&newGmsStore.ObjectMeta, &gmsStore.ObjectMeta,
|
|
polardbx.Status.ObservedGeneration)
|
|
|
|
err = rc.Client().Update(rc.Context(), newGmsStore)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to update xstore of GMS.", "generation", polardbx.Status.ObservedGeneration)
|
|
}
|
|
|
|
return flow.Continue("GMS xstore updated!")
|
|
}
|
|
|
|
return flow.Pass()
|
|
}
|
|
},
|
|
)
|
|
|
|
var SyncDnReplicasAndCheckControllerRef = polardbxv1reconcile.NewStepBinder("SyncDnReplicasAndCheckControllerRef",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
if !polardbx.Spec.Readonly {
|
|
return flow.Pass()
|
|
}
|
|
|
|
primaryPolardbx, err := rc.GetPrimaryPolarDBX()
|
|
if err != nil {
|
|
return flow.RetryErr(err, "Failed to get primary pxc.")
|
|
}
|
|
|
|
shouldUpdatePolardbx := false
|
|
|
|
if err = k8shelper.CheckControllerReference(polardbx, primaryPolardbx); err != nil {
|
|
err = ctrl.SetControllerReference(primaryPolardbx, polardbx, rc.Scheme())
|
|
if err != nil {
|
|
return flow.Error(err, "Failed to set controller ref: %w")
|
|
}
|
|
shouldUpdatePolardbx = true
|
|
}
|
|
|
|
primaryDnReplicas := primaryPolardbx.Spec.Topology.Nodes.DN.Replicas
|
|
|
|
if polardbx.Spec.Topology.Nodes.DN.Replicas != primaryDnReplicas {
|
|
polardbx.Spec.Topology.Nodes.DN.Replicas = primaryDnReplicas
|
|
shouldUpdatePolardbx = true
|
|
}
|
|
|
|
if shouldUpdatePolardbx {
|
|
err = rc.Client().Update(rc.Context(), polardbx)
|
|
|
|
if err != nil {
|
|
return flow.Error(err, "Failed to update dn replicas.")
|
|
} else {
|
|
return flow.Continue("DN replicas updated.")
|
|
}
|
|
}
|
|
return flow.Pass()
|
|
},
|
|
)
|
|
|
|
var CreateOrReconcileReadonlyPolardbx = polardbxv1reconcile.NewStepBinder("CreateOrReconcileReadonlyPolardbx",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
if polardbx.Spec.Readonly {
|
|
return flow.Pass()
|
|
}
|
|
|
|
objectFactory := factory.NewObjectFactory(rc)
|
|
wg := &sync.WaitGroup{}
|
|
readonlyPolardbxList := polardbx.Spec.InitReadonly
|
|
errs := make([]error, len(readonlyPolardbxList))
|
|
var errCnt uint32
|
|
|
|
for i, readonlyInst := range readonlyPolardbxList {
|
|
newReadonlyPolardbx, err := objectFactory.NewReadonlyPolardbx(readonlyInst)
|
|
|
|
readonlyName := readonlyInst.Name
|
|
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new readonly pxc.", "name", readonlyName)
|
|
}
|
|
|
|
key := types.NamespacedName{
|
|
Namespace: polardbx.Namespace,
|
|
Name: polardbx.Name + "-" + readonlyName,
|
|
}
|
|
|
|
if rc.CheckPolarDBXExist(key) {
|
|
// Dismiss existed pxc
|
|
continue
|
|
} else {
|
|
wg.Add(1)
|
|
logger, idx := flow.Logger(), i
|
|
go func() {
|
|
defer wg.Done()
|
|
err = rc.SetControllerRefAndCreate(newReadonlyPolardbx)
|
|
if err != nil {
|
|
logger.Error(err, "Unable to create readonly pxc.", "name", readonlyName)
|
|
errs[idx] = err
|
|
atomic.AddUint32(&errCnt, 1)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
if errCnt > 0 {
|
|
var firstErr error
|
|
for _, err := range errs {
|
|
if err != nil {
|
|
firstErr = err
|
|
break
|
|
}
|
|
}
|
|
return flow.Error(firstErr, "Unable to create or reconcile readonly pxc.", "error-cnt", errCnt)
|
|
}
|
|
|
|
return flow.Pass()
|
|
},
|
|
)
|
|
|
|
var CreateOrReconcileDNs = polardbxv1reconcile.NewStepBinder("CreateOrReconcileDNs",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
readonly := polardbx.Spec.Readonly
|
|
topology := polardbx.Status.SpecSnapshot.Topology
|
|
observedGeneration := polardbx.Status.ObservedGeneration
|
|
replicas := int(topology.Nodes.DN.Replicas)
|
|
|
|
dnStores, err := rc.GetDNMap()
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get xstores of DN.")
|
|
}
|
|
|
|
// Ensure DNs are sorted and their indexes are incremental
|
|
// when phase is not in creating or restoring.
|
|
if !helper.IsPhaseIn(polardbx, polardbxv1polardbx.PhaseCreating, polardbxv1polardbx.PhaseRestoring) {
|
|
lastIndex := 0
|
|
for ; lastIndex < replicas; lastIndex++ {
|
|
if _, ok := dnStores[lastIndex]; !ok {
|
|
break
|
|
}
|
|
}
|
|
|
|
if lastIndex != replicas && lastIndex != len(dnStores) {
|
|
helper.TransferPhase(polardbx, polardbxv1polardbx.PhaseFailed)
|
|
return flow.Retry("Found broken DN, transfer into failed.")
|
|
}
|
|
}
|
|
|
|
objectFactory := factory.NewObjectFactory(rc)
|
|
anyChanged := false
|
|
wg := &sync.WaitGroup{}
|
|
errs := make([]error, replicas)
|
|
var errCnt uint32
|
|
for i := 0; i < replicas; i++ {
|
|
observedDnStore, ok := dnStores[i]
|
|
|
|
if !ok {
|
|
newDnStore, err := objectFactory.NewXStoreDN(i)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new xstore of DN.", "index", i)
|
|
}
|
|
|
|
wg.Add(1)
|
|
logger, idx := flow.Logger(), i
|
|
go func() {
|
|
defer wg.Done()
|
|
err = rc.SetControllerRefAndCreate(newDnStore)
|
|
if err != nil {
|
|
logger.Error(err, "Unable to create xstore of DN.", "index", i)
|
|
errs[idx] = err
|
|
atomic.AddUint32(&errCnt, 1)
|
|
}
|
|
}()
|
|
|
|
anyChanged = true
|
|
} else {
|
|
generation, err := convention.GetGenerationLabelValue(observedDnStore)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get generation from xstore of DN.", "index", i)
|
|
}
|
|
|
|
if generation < observedGeneration {
|
|
// Skip upgrade if feature gate isn't enabled.
|
|
if !featuregate.StoreUpgrade.Enabled() {
|
|
flow.Logger().Info("Feature 'StoreUpgrade' not enabled, skip upgrade.", "xstore", observedDnStore.Name)
|
|
continue
|
|
}
|
|
|
|
newDnStore, err := objectFactory.NewXStoreDN(i)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new xstore of DN.", "index", i)
|
|
}
|
|
|
|
newDnStoreHash := newDnStore.Labels[xstoremeta.LabelHash]
|
|
observedDnStoreHash := observedDnStore.Labels[xstoremeta.LabelHash]
|
|
if newDnStoreHash != observedDnStoreHash {
|
|
convention.CopyMetadataForUpdate(&newDnStore.ObjectMeta, &observedDnStore.ObjectMeta, observedGeneration)
|
|
newDnStore.SetLabels(k8shelper.PatchLabels(newDnStore.Labels, map[string]string{
|
|
xstoremeta.LabelHash: newDnStoreHash,
|
|
}))
|
|
wg.Add(1)
|
|
logger, idx := flow.Logger(), i
|
|
go func() {
|
|
defer wg.Done()
|
|
err = rc.Client().Update(rc.Context(), newDnStore)
|
|
if err != nil {
|
|
logger.Error(err, "Unable to update xstore of DN.", "index", i)
|
|
errs[idx] = err
|
|
atomic.AddUint32(&errCnt, 1)
|
|
}
|
|
}()
|
|
anyChanged = true
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
if errCnt > 0 {
|
|
var firstErr error
|
|
for _, err := range errs {
|
|
if err != nil {
|
|
firstErr = err
|
|
break
|
|
}
|
|
}
|
|
return flow.Error(firstErr, "Unable to create or reconcile xstores of DN.", "error-cnt", errCnt)
|
|
}
|
|
|
|
// Remove DNs not enabled in GMS and larger than current target replicas (safe because not in use).
|
|
toRemoveStores := make(map[int]*polardbxv1.XStore, 0)
|
|
for index, xstore := range dnStores {
|
|
if index >= replicas {
|
|
toRemoveStores[index] = xstore
|
|
}
|
|
}
|
|
if len(toRemoveStores) > 0 {
|
|
mgr, err := rc.GetPolarDBXGMSManager()
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get manager for GMS.")
|
|
}
|
|
|
|
storageNodeIds := make(map[string]int)
|
|
if initialized, err := mgr.IsMetaDBInitialized(polardbx.Name); err != nil {
|
|
return flow.Error(err, "Unable to determine if GMS is initialized.")
|
|
} else if initialized {
|
|
storageKind := gms.StorageKindMaster
|
|
if readonly {
|
|
storageKind = gms.StorageKindSlave
|
|
}
|
|
storageNodes, err := mgr.ListStorageNodes(storageKind)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to list storage nodes in GMS.")
|
|
}
|
|
for _, s := range storageNodes {
|
|
storageNodeIds[s.Id] = 1
|
|
}
|
|
} else {
|
|
// No storage nodes.
|
|
}
|
|
|
|
canRemoveStoreIndices := make([]int, 0)
|
|
for index, xstore := range toRemoveStores {
|
|
if _, found := storageNodeIds[xstore.Name]; !found {
|
|
canRemoveStoreIndices = append(canRemoveStoreIndices, index)
|
|
}
|
|
}
|
|
if len(canRemoveStoreIndices) > 0 {
|
|
sort.Slice(canRemoveStoreIndices, func(i, j int) bool {
|
|
return canRemoveStoreIndices[i] > canRemoveStoreIndices[j]
|
|
})
|
|
flow.Logger().Info("Trying to remove trailing xstores (not in use)",
|
|
"trailing-indices", canRemoveStoreIndices)
|
|
for _, index := range canRemoveStoreIndices {
|
|
xstore := dnStores[index]
|
|
err := rc.Client().Delete(rc.Context(), xstore, client.PropagationPolicy(metav1.DeletePropagationBackground))
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to delete unused trailing xstore.", "xstore", xstore.Name)
|
|
}
|
|
}
|
|
flow.Logger().Info("Unused trailing xstores are all removed.")
|
|
}
|
|
}
|
|
|
|
if anyChanged {
|
|
return flow.Retry("DNs created or updated!")
|
|
}
|
|
|
|
return flow.Pass()
|
|
},
|
|
)
|
|
|
|
var RemoveTrailingDNs = polardbxv1reconcile.NewStepBinder("RemoveTrailingDNs",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
topology := polardbx.Status.SpecSnapshot.Topology
|
|
replicas := int(topology.Nodes.DN.Replicas)
|
|
|
|
dnStores, err := rc.GetOrderedDNList()
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get xstores of DN.")
|
|
}
|
|
|
|
for i := len(dnStores) - 1; i >= replicas; i-- {
|
|
if dnStores[i].DeletionTimestamp.IsZero() {
|
|
err := rc.Client().Delete(rc.Context(), dnStores[i],
|
|
client.PropagationPolicy(metav1.DeletePropagationBackground))
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to remove trailing DN.", "xstore", dnStores[i].Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(dnStores) > replicas {
|
|
return flow.Retry("Trailing DNs are deleted.")
|
|
}
|
|
return flow.Pass()
|
|
},
|
|
)
|
|
|
|
func reconcileGroupedDeployments(rc *polardbxv1reconcile.Context, flow control.Flow, role string) (reconcile.Result, error) {
|
|
polardbxmeta.AssertRoleIn(role, polardbxmeta.RoleCN, polardbxmeta.RoleCDC, polardbxmeta.RoleColumnar)
|
|
|
|
flow = flow.WithLoggerValues("role", role)
|
|
|
|
polardbx := rc.MustGetPolarDBX()
|
|
observedGeneration := polardbx.Status.ObservedGeneration
|
|
|
|
observedDeployments, err := rc.GetDeploymentMap(role)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get deployments.")
|
|
}
|
|
|
|
var deployments map[string]appsv1.Deployment
|
|
if role == polardbxmeta.RoleCN {
|
|
if !rc.HasCNs() {
|
|
return flow.Pass()
|
|
}
|
|
deployments, err = factory.NewObjectFactory(rc).NewDeployments4CN()
|
|
} else if role == polardbxmeta.RoleCDC {
|
|
if polardbx.Spec.Readonly {
|
|
return flow.Pass()
|
|
}
|
|
deployments, err = factory.NewObjectFactory(rc).NewDeployments4CDC()
|
|
} else if role == polardbxmeta.RoleColumnar {
|
|
deployments, err = factory.NewObjectFactory(rc).NewDeployments4Columnar()
|
|
}
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to new deployments.")
|
|
}
|
|
|
|
anyChanged := false
|
|
|
|
// Update or delete outdated deployments
|
|
for group, observedDeployment := range observedDeployments {
|
|
generation, err := convention.GetGenerationLabelValue(observedDeployment)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get generation from deployment.",
|
|
"deployment", observedDeployment.Name)
|
|
}
|
|
|
|
if generation < observedGeneration {
|
|
newDeployment, ok := deployments[group]
|
|
if !ok {
|
|
err := rc.Client().Delete(rc.Context(), observedDeployment,
|
|
client.PropagationPolicy(metav1.DeletePropagationBackground))
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to delete deployment.",
|
|
"deployment", observedDeployment.Name)
|
|
}
|
|
anyChanged = true
|
|
} else {
|
|
newDeploymentLabelHash := newDeployment.Labels[polardbxmeta.LabelHash]
|
|
if newDeploymentLabelHash != observedDeployment.Labels[polardbxmeta.LabelHash] {
|
|
convention.CopyMetadataForUpdate(&newDeployment.ObjectMeta, &observedDeployment.ObjectMeta, observedGeneration)
|
|
newDeployment.SetLabels(k8shelper.PatchLabels(newDeployment.Labels, map[string]string{
|
|
polardbxmeta.LabelHash: newDeploymentLabelHash,
|
|
}))
|
|
err := rc.Client().Update(rc.Context(), &newDeployment)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to update deployment.",
|
|
"deployment", observedDeployment.Name)
|
|
}
|
|
anyChanged = true
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// Create new deployments if not found
|
|
for group, deployment := range deployments {
|
|
if _, ok := observedDeployments[group]; ok {
|
|
continue
|
|
}
|
|
|
|
err := rc.SetControllerRefAndCreate(&deployment)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to create deployment.", "deployment", deployment.Name)
|
|
}
|
|
|
|
anyChanged = true
|
|
}
|
|
|
|
if anyChanged {
|
|
return flow.Retry("Deployments reconciled.")
|
|
}
|
|
|
|
return flow.Pass()
|
|
}
|
|
|
|
var CreateFileStorage = polardbxv1reconcile.NewStepBinder("CreateFileStorage",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
groupManager, err := rc.GetPolarDBXGroupManager()
|
|
if err != nil {
|
|
return flow.Error(err, "Failed to get CN group manager.")
|
|
}
|
|
|
|
supportFileStorage, err := groupManager.CheckFileStorageCompatibility()
|
|
if err != nil {
|
|
return flow.Error(err, "Failed to check compatibility of file storage.")
|
|
}
|
|
if !supportFileStorage {
|
|
return flow.Continue("Current pxc does not support file storage.")
|
|
}
|
|
|
|
fileStorageInfoList, err := groupManager.ListFileStorage()
|
|
if err != nil {
|
|
return flow.Error(err, "Failed to get file storage list")
|
|
}
|
|
|
|
config := rc.Config()
|
|
|
|
for _, info := range polardbx.Spec.Config.CN.ColdDataFileStorage {
|
|
// check if the filestorage already existed
|
|
if info.CheckEngineExists(fileStorageInfoList) {
|
|
continue
|
|
}
|
|
|
|
// create the file storage
|
|
err = groupManager.CreateFileStorage(info, config)
|
|
if err != nil {
|
|
return flow.Error(err, fmt.Sprintf("Failed to create file storage: %s.", info.Engine))
|
|
}
|
|
}
|
|
return flow.Pass()
|
|
},
|
|
)
|
|
|
|
var CreateOrReconcileCNs = polardbxv1reconcile.NewStepBinder("CreateOrReconcileCNs",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
return reconcileGroupedDeployments(rc, flow, polardbxmeta.RoleCN)
|
|
},
|
|
)
|
|
|
|
var CreateOrReconcileCDCs = polardbxv1reconcile.NewStepBinder("CreateOrReconcileCDCs",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
return reconcileGroupedDeployments(rc, flow, polardbxmeta.RoleCDC)
|
|
},
|
|
)
|
|
|
|
var CreateOrReconcileColumnars = polardbxv1reconcile.NewStepBinder("CreateOrReconcileColumnars",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
return reconcileGroupedDeployments(rc, flow, polardbxmeta.RoleColumnar)
|
|
},
|
|
)
|
|
|
|
func isXStoreReady(xstore *polardbxv1.XStore) bool {
|
|
return xstore.Status.ObservedGeneration == xstore.Generation &&
|
|
xstore.Status.Phase == polardbxv1xstore.PhaseRunning
|
|
}
|
|
|
|
var WaitUntilCNCDCPodsReady = polardbxv1reconcile.NewStepBinder("WaitUntilCNCDCPodsReady",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
cnPods, err := rc.GetPods(polardbxmeta.RoleCN)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get pods for CN")
|
|
}
|
|
|
|
unready := k8shelper.FilterPodsBy(cnPods, func(pod *corev1.Pod) bool {
|
|
return !k8shelper.IsPodReady(pod)
|
|
})
|
|
|
|
if len(unready) > 0 {
|
|
return flow.Wait("Found unready cn pods, keep waiting...", "unready-pods",
|
|
strings.Join(k8shelper.ToObjectNames(unready), ","))
|
|
}
|
|
|
|
cdcPods, err := rc.GetPods(polardbxmeta.RoleCDC)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get pods for CDC")
|
|
}
|
|
|
|
unready = k8shelper.FilterPodsBy(cdcPods, func(pod *corev1.Pod) bool {
|
|
return !k8shelper.IsPodReady(pod)
|
|
})
|
|
|
|
if len(unready) > 0 {
|
|
return flow.Wait("Found unready cdc pods, keep waiting...", "unready-pods",
|
|
strings.Join(k8shelper.ToObjectNames(unready), ","))
|
|
}
|
|
|
|
return flow.Pass()
|
|
},
|
|
)
|
|
|
|
var WaitUntilGMSReady = polardbxv1reconcile.NewStepBinder("WaitUntilGMSReady",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
gms, err := rc.GetGMS()
|
|
if client.IgnoreNotFound(err) != nil {
|
|
return flow.Error(err, "Unable to get xstore of GMS.")
|
|
}
|
|
if gms == nil {
|
|
return flow.Wait("XStore of GMS not created.")
|
|
}
|
|
|
|
if gms.Status.Phase == polardbxv1xstore.PhaseFailed {
|
|
helper.TransferPhase(rc.MustGetPolarDBX(), polardbxv1polardbx.PhaseFailed)
|
|
return flow.Retry("XStore of GMS is failed, transfer phase into failed.")
|
|
}
|
|
|
|
if isXStoreReady(gms) {
|
|
return flow.Continue("XStore of GMS is ready.")
|
|
} else {
|
|
return flow.Wait("XStore of GMS isn't ready, wait.", "xstore", gms.Name, "xstore.phase", gms.Status.Phase)
|
|
}
|
|
},
|
|
)
|
|
|
|
var WaitUntilDNsReady = polardbxv1reconcile.NewStepBinder("WaitUntilDNsReady",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
topology := &polardbx.Status.SpecSnapshot.Topology
|
|
|
|
dnStores, err := rc.GetDNMap()
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to list xstores of DNs.")
|
|
}
|
|
|
|
notReadyCnt, skipCnt := 0, 0
|
|
for i, dnStore := range dnStores {
|
|
// Trailing DNs.
|
|
if i >= int(topology.Nodes.DN.Replicas) {
|
|
skipCnt++
|
|
continue
|
|
}
|
|
|
|
if dnStore.Status.Phase == polardbxv1xstore.PhaseFailed {
|
|
helper.TransferPhase(rc.MustGetPolarDBX(), polardbxv1polardbx.PhaseFailed)
|
|
return flow.Retry("XStore of DN is failed, transfer phase into failed.", "xstore", dnStore.Name)
|
|
}
|
|
|
|
if !isXStoreReady(dnStore) {
|
|
notReadyCnt++
|
|
}
|
|
}
|
|
|
|
if notReadyCnt > 0 {
|
|
return flow.Wait("Some xstore of DN is not ready, wait.", "not-ready", notReadyCnt, "skip", skipCnt)
|
|
}
|
|
|
|
return flow.Continue("XStores of DNs are ready.", "skip", skipCnt)
|
|
},
|
|
)
|
|
|
|
func areDeploymentsRolledOut(deployments map[string]*appsv1.Deployment) bool {
|
|
for _, deploy := range deployments {
|
|
if !k8shelper.IsDeploymentRolledOut(deploy) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
var WaitUntilCNDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUntilCNDeploymentsRolledOut",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
cnDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleCN)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get deployments of CN.")
|
|
}
|
|
|
|
if areDeploymentsRolledOut(cnDeployments) {
|
|
return flow.Continue("Deployments of CN are rolled out.")
|
|
}
|
|
return flow.Wait("Some deployment of CN is rolling.")
|
|
},
|
|
)
|
|
|
|
var WaitUntilPrimaryCNDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUntilPrimaryCNDeploymentsRolledOut",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
cnDeployments, err := rc.GetPrimaryDeploymentMap(polardbxmeta.RoleCN)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get deployments of primary CN.")
|
|
}
|
|
|
|
if len(cnDeployments) > 0 && areDeploymentsRolledOut(cnDeployments) {
|
|
return flow.Continue("Deployments of primary CN are rolled out.")
|
|
}
|
|
return flow.RetryAfter(5*time.Second, "Some deployment of primary CN is rolling.")
|
|
},
|
|
)
|
|
|
|
var WaitUntilCNPodsStable = polardbxv1reconcile.NewStepBinder("WaitUntilCNPodsStable",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
|
|
cnPods, err := rc.GetPods(polardbxmeta.RoleCN)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get pods of CN.")
|
|
}
|
|
|
|
unFinalizedPodsSize := k8shelper.FilterPodsBy(cnPods, func(pod *corev1.Pod) bool {
|
|
return len(pod.Finalizers) > 0
|
|
})
|
|
|
|
cnTemplate := &polardbx.Status.SpecSnapshot.Topology.Nodes.CN
|
|
if len(unFinalizedPodsSize) == int(*cnTemplate.Replicas) {
|
|
return flow.Pass()
|
|
}
|
|
return flow.Wait("Wait until some pod to be finalized.")
|
|
},
|
|
)
|
|
|
|
var WaitUntilCDCPodsStable = polardbxv1reconcile.NewStepBinder("WaitUntilCDCPodsStable",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
|
|
if polardbx.Spec.Readonly {
|
|
return flow.Pass()
|
|
}
|
|
|
|
cdcPods, err := rc.GetPods(polardbxmeta.RoleCDC)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get pods of CDC.")
|
|
}
|
|
|
|
unFinalizedPodsSize := k8shelper.FilterPodsBy(cdcPods, func(pod *corev1.Pod) bool {
|
|
return len(pod.Finalizers) > 0
|
|
})
|
|
|
|
cdcTemplate := polardbx.Status.SpecSnapshot.Topology.Nodes.CDC
|
|
cdcReplicas := 0
|
|
if cdcTemplate != nil {
|
|
cdcReplicas = int(cdcTemplate.Replicas + cdcTemplate.XReplicas)
|
|
}
|
|
if len(unFinalizedPodsSize) == cdcReplicas {
|
|
return flow.Pass()
|
|
}
|
|
return flow.Wait("Wait until some pod to be finalized.")
|
|
},
|
|
)
|
|
|
|
var WaitUntilCDCDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUntilCDCDeploymentsRolledOut",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
cdcDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleCDC)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get deployments of CDC.")
|
|
}
|
|
|
|
if areDeploymentsRolledOut(cdcDeployments) {
|
|
return flow.Continue("Deployments of CDC are rolled out.")
|
|
}
|
|
return flow.Wait("Some deployment of CDC is rolling.")
|
|
},
|
|
)
|
|
|
|
var WaitUntilColumnarPodsStable = polardbxv1reconcile.NewStepBinder("WaitUntilColumnarPodsStable",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
polardbx := rc.MustGetPolarDBX()
|
|
|
|
columnarPods, err := rc.GetPods(polardbxmeta.RoleColumnar)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get pods of Columnar.")
|
|
}
|
|
|
|
unFinalizedPodsSize := k8shelper.FilterPodsBy(columnarPods, func(pod *corev1.Pod) bool {
|
|
return len(pod.Finalizers) > 0
|
|
})
|
|
|
|
columnarTemplate := polardbx.Status.SpecSnapshot.Topology.Nodes.Columnar
|
|
columnarReplicas := 0
|
|
if columnarTemplate != nil {
|
|
columnarReplicas = int(columnarTemplate.Replicas)
|
|
}
|
|
if len(unFinalizedPodsSize) == columnarReplicas {
|
|
return flow.Pass()
|
|
}
|
|
return flow.Wait("Wait until some pod to be finalized.")
|
|
},
|
|
)
|
|
|
|
var WaitUntilColumnarDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUntilColumnarDeploymentsRolledOut",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
columnarDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleColumnar)
|
|
if err != nil {
|
|
return flow.Error(err, "Unable to get deployments of Columnar.")
|
|
}
|
|
|
|
if areDeploymentsRolledOut(columnarDeployments) {
|
|
return flow.Continue("Deployments of Columnar are rolled out.")
|
|
}
|
|
return flow.Wait("Some deployment of Columnar is rolling.")
|
|
},
|
|
)
|
|
|
|
var TrySyncCnLabelToPodsDirectly = polardbxv1reconcile.NewStepBinder("TrySyncCnLabelToPodsDirectly",
|
|
func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
|
|
changedCount := 0
|
|
cnPods, err := rc.GetPods(polardbxmeta.RoleCN)
|
|
if err != nil {
|
|
return flow.Error(err, "Failed to Get Cn Pods")
|
|
}
|
|
for _, pod := range cnPods {
|
|
if pod.Labels[polardbxmeta.LabelAuditLog] != strconv.FormatBool(rc.MustGetPolarDBX().Spec.Config.CN.EnableAuditLog) {
|
|
pod.SetLabels(k8shelper.PatchLabels(pod.Labels, map[string]string{
|
|
polardbxmeta.LabelAuditLog: strconv.FormatBool(rc.MustGetPolarDBX().Spec.Config.CN.EnableAuditLog),
|
|
}))
|
|
if err := rc.Client().Update(rc.Context(), &pod); err != nil {
|
|
return flow.RetryErr(err, "Failed to Update pod for polardbx/enableAuditLog")
|
|
}
|
|
}
|
|
}
|
|
return flow.Continue(" CN Deployment labels are synced", "count", changedCount)
|
|
},
|
|
)
|