mirror of
https://github.com/argoproj/argo-cd.git
synced 2026-02-20 01:28:45 +01:00
Signed-off-by: Matthieu MOREL <matthieu.morel35@gmail.com> Co-authored-by: Blake Pettersson <blake.pettersson@gmail.com>
512 lines
20 KiB
Go
512 lines
20 KiB
Go
package sharding
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
stderrors "errors"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"math"
|
|
"os"
|
|
"slices"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
|
|
"github.com/argoproj/argo-cd/v3/common"
|
|
"github.com/argoproj/argo-cd/v3/controller/sharding/consistent"
|
|
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
|
|
"github.com/argoproj/argo-cd/v3/util/db"
|
|
"github.com/argoproj/argo-cd/v3/util/env"
|
|
"github.com/argoproj/argo-cd/v3/util/errors"
|
|
"github.com/argoproj/argo-cd/v3/util/settings"
|
|
)
|
|
|
|
// Make it overridable for testing
|
|
var osHostnameFunction = os.Hostname
|
|
|
|
// Make it overridable for testing
|
|
var heartbeatCurrentTime = metav1.Now
|
|
|
|
var (
|
|
HeartbeatDuration = env.ParseNumFromEnv(common.EnvControllerHeartbeatTime, 10, 10, 60)
|
|
HeartbeatTimeout = 3 * HeartbeatDuration
|
|
)
|
|
|
|
const ShardControllerMappingKey = "shardControllerMapping"
|
|
|
|
type (
|
|
DistributionFunction func(c *v1alpha1.Cluster) int
|
|
ClusterFilterFunction func(c *v1alpha1.Cluster) bool
|
|
clusterAccessor func() []*v1alpha1.Cluster
|
|
appAccessor func() []*v1alpha1.Application
|
|
)
|
|
|
|
// shardApplicationControllerMapping stores the mapping of Shard Number to Application Controller in ConfigMap.
|
|
// It also stores the heartbeat of last synced time of the application controller.
|
|
type shardApplicationControllerMapping struct {
|
|
ShardNumber int
|
|
ControllerName string
|
|
HeartbeatTime metav1.Time
|
|
}
|
|
|
|
// GetClusterFilter returns a ClusterFilterFunction which is a function taking a cluster as a parameter
|
|
// and returns whether or not the cluster should be processed by a given shard. It calls the distributionFunction
|
|
// to determine which shard will process the cluster, and if the given shard is equal to the calculated shard
|
|
// the function will return true.
|
|
func GetClusterFilter(_ db.ArgoDB, distributionFunction DistributionFunction, replicas, shard int) ClusterFilterFunction {
|
|
return func(c *v1alpha1.Cluster) bool {
|
|
clusterShard := 0
|
|
if c != nil && c.Shard != nil {
|
|
requestedShard := int(*c.Shard)
|
|
if requestedShard < replicas {
|
|
clusterShard = requestedShard
|
|
} else {
|
|
log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard. Assigning automatically.", requestedShard, c.Name)
|
|
}
|
|
} else {
|
|
clusterShard = distributionFunction(c)
|
|
}
|
|
return clusterShard == shard
|
|
}
|
|
}
|
|
|
|
// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and
|
|
// the current datas.
|
|
func GetDistributionFunction(clusters clusterAccessor, apps appAccessor, shardingAlgorithm string, replicasCount int) DistributionFunction {
|
|
log.Debugf("Using filter function: %s", shardingAlgorithm)
|
|
distributionFunction := LegacyDistributionFunction(replicasCount)
|
|
switch shardingAlgorithm {
|
|
case common.RoundRobinShardingAlgorithm:
|
|
distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount)
|
|
case common.LegacyShardingAlgorithm:
|
|
distributionFunction = LegacyDistributionFunction(replicasCount)
|
|
case common.ConsistentHashingWithBoundedLoadsAlgorithm:
|
|
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, apps, replicasCount)
|
|
default:
|
|
log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
|
|
}
|
|
return distributionFunction
|
|
}
|
|
|
|
// LegacyDistributionFunction returns a DistributionFunction using a stable distribution algorithm:
|
|
// for a given cluster the function will return the shard number based on the cluster id. This function
|
|
// is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as
|
|
// some shards may get assigned more clusters than others. It is the legacy function distribution that is
|
|
// kept for compatibility reasons
|
|
func LegacyDistributionFunction(replicas int) DistributionFunction {
|
|
return func(c *v1alpha1.Cluster) int {
|
|
if replicas == 0 {
|
|
log.Debugf("Replicas count is : %d, returning -1", replicas)
|
|
return -1
|
|
}
|
|
if c == nil {
|
|
log.Debug("In-cluster: returning 0")
|
|
return 0
|
|
}
|
|
// if Shard is manually set and the assigned value is lower than the number of replicas,
|
|
// then its value is returned otherwise it is the default calculated value
|
|
if c.Shard != nil && int(*c.Shard) < replicas {
|
|
return int(*c.Shard)
|
|
}
|
|
id := c.ID
|
|
log.Debugf("Calculating cluster shard for cluster id: %s", id)
|
|
if id == "" {
|
|
return 0
|
|
}
|
|
h := fnv.New32a()
|
|
_, _ = h.Write([]byte(id))
|
|
shard := int32(h.Sum32() % uint32(replicas))
|
|
log.Debugf("Cluster with id=%s will be processed by shard %d", id, shard)
|
|
return int(shard)
|
|
}
|
|
}
|
|
|
|
// RoundRobinDistributionFunction returns a DistributionFunction using an homogeneous distribution algorithm:
|
|
// for a given cluster the function will return the shard number based on the modulo of the cluster rank in
|
|
// the cluster's list sorted by uid on the shard number.
|
|
// This function ensures an homogenous distribution: each shards got assigned the same number of
|
|
// clusters +/-1 , but with the drawback of a reshuffling of clusters across shards in case of some changes
|
|
// in the cluster list
|
|
|
|
func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction {
|
|
return func(c *v1alpha1.Cluster) int {
|
|
if replicas > 0 {
|
|
if c == nil { // in-cluster does not necessarily have a secret assigned. So we are receiving a nil cluster here.
|
|
return 0
|
|
}
|
|
// if Shard is manually set and the assigned value is lower than the number of replicas,
|
|
// then its value is returned otherwise it is the default calculated value
|
|
if c.Shard != nil && int(*c.Shard) < replicas {
|
|
return int(*c.Shard)
|
|
}
|
|
clusterIndexdByClusterIdMap := createClusterIndexByClusterIdMap(clusters)
|
|
clusterIndex, ok := clusterIndexdByClusterIdMap[c.ID]
|
|
if !ok {
|
|
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
|
|
return -1
|
|
}
|
|
shard := int(clusterIndex % replicas)
|
|
log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard)
|
|
return shard
|
|
}
|
|
log.Warnf("The number of replicas (%d) is lower than 1", replicas)
|
|
return -1
|
|
}
|
|
}
|
|
|
|
// ConsistentHashingWithBoundedLoadsDistributionFunction returns a DistributionFunction using an almost homogeneous distribution algorithm:
|
|
// for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm.
|
|
// This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of
|
|
// clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes.
|
|
func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, apps appAccessor, replicas int) DistributionFunction {
|
|
return func(c *v1alpha1.Cluster) int {
|
|
if replicas > 0 {
|
|
if c == nil { // in-cluster does not necessarily have a secret assigned. So we are receiving a nil cluster here.
|
|
return 0
|
|
}
|
|
|
|
// if Shard is manually set and the assigned value is lower than the number of replicas,
|
|
// then its value is returned otherwise it is the default calculated value
|
|
if c.Shard != nil && int(*c.Shard) < replicas {
|
|
return int(*c.Shard)
|
|
}
|
|
// if the cluster is not in the clusters list anymore, we should unassign it from any shard, so we
|
|
// return the reserved value of -1
|
|
if !slices.Contains(clusters(), c) {
|
|
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
|
|
return -1
|
|
}
|
|
shardIndexedByCluster := createConsistentHashingWithBoundLoads(replicas, clusters, apps)
|
|
shard, ok := shardIndexedByCluster[c.ID]
|
|
if !ok {
|
|
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
|
|
return -1
|
|
}
|
|
log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard)
|
|
return shard
|
|
}
|
|
log.Warnf("The number of replicas (%d) is lower than 1", replicas)
|
|
return -1
|
|
}
|
|
}
|
|
|
|
func createConsistentHashingWithBoundLoads(replicas int, getCluster clusterAccessor, getApp appAccessor) map[string]int {
|
|
clusters := getSortedClustersList(getCluster)
|
|
appDistribution := getAppDistribution(getCluster, getApp)
|
|
shardIndexedByCluster := make(map[string]int)
|
|
appsIndexedByShard := make(map[string]int64)
|
|
consistentHashing := consistent.New()
|
|
// Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard
|
|
// this happens for clusters that are removed for the clusters list
|
|
// consistentHashing.Add("-1")
|
|
for i := range replicas {
|
|
shard := strconv.Itoa(i)
|
|
consistentHashing.Add(shard)
|
|
appsIndexedByShard[shard] = 0
|
|
}
|
|
|
|
for _, c := range clusters {
|
|
clusterIndex, err := consistentHashing.GetLeast(c.ID)
|
|
if err != nil {
|
|
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
|
|
}
|
|
shardIndexedByCluster[c.ID], err = strconv.Atoi(clusterIndex)
|
|
if err != nil {
|
|
log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err)
|
|
}
|
|
numApps, ok := appDistribution[c.Server]
|
|
if !ok {
|
|
numApps = 0
|
|
}
|
|
appsIndexedByShard[clusterIndex] += numApps
|
|
consistentHashing.UpdateLoad(clusterIndex, appsIndexedByShard[clusterIndex])
|
|
}
|
|
|
|
return shardIndexedByCluster
|
|
}
|
|
|
|
func getAppDistribution(getCluster clusterAccessor, getApps appAccessor) map[string]int64 {
|
|
apps := getApps()
|
|
clusters := getCluster()
|
|
appDistribution := make(map[string]int64, len(clusters))
|
|
|
|
for _, a := range apps {
|
|
if _, ok := appDistribution[a.Spec.Destination.Server]; !ok {
|
|
appDistribution[a.Spec.Destination.Server] = 0
|
|
}
|
|
appDistribution[a.Spec.Destination.Server]++
|
|
}
|
|
return appDistribution
|
|
}
|
|
|
|
// NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0
|
|
// the function is created for API compatibility purposes and is not supposed to be activated.
|
|
func NoShardingDistributionFunction() DistributionFunction {
|
|
return func(_ *v1alpha1.Cluster) int { return 0 }
|
|
}
|
|
|
|
// InferShard extracts the shard index based on its hostname.
|
|
func InferShard() (int, error) {
|
|
hostname, err := osHostnameFunction()
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
parts := strings.Split(hostname, "-")
|
|
if len(parts) == 0 {
|
|
log.Warnf("hostname should end with shard number separated by '-' but got: %s", hostname)
|
|
return 0, nil
|
|
}
|
|
shard, err := strconv.Atoi(parts[len(parts)-1])
|
|
if err != nil {
|
|
log.Warnf("hostname should end with shard number separated by '-' but got: %s", hostname)
|
|
return 0, nil
|
|
}
|
|
return int(shard), nil
|
|
}
|
|
|
|
func getSortedClustersList(getCluster clusterAccessor) []*v1alpha1.Cluster {
|
|
clusters := getCluster()
|
|
sort.Slice(clusters, func(i, j int) bool {
|
|
return clusters[i].ID < clusters[j].ID
|
|
})
|
|
return clusters
|
|
}
|
|
|
|
func createClusterIndexByClusterIdMap(getCluster clusterAccessor) map[string]int {
|
|
clusters := getSortedClustersList(getCluster)
|
|
log.Debugf("ClustersList has %d items", len(clusters))
|
|
clusterById := make(map[string]*v1alpha1.Cluster)
|
|
clusterIndexedByClusterId := make(map[string]int)
|
|
for i, cluster := range clusters {
|
|
log.Debugf("Adding cluster with id=%s and name=%s to cluster's map", cluster.ID, cluster.Name)
|
|
clusterById[cluster.ID] = cluster
|
|
clusterIndexedByClusterId[cluster.ID] = i
|
|
}
|
|
return clusterIndexedByClusterId
|
|
}
|
|
|
|
// GetOrUpdateShardFromConfigMap finds the shard number from the shard mapping configmap. If the shard mapping configmap does not exist,
|
|
// the function creates the shard mapping configmap.
|
|
// The function takes the shard number from the environment variable (default value -1, if not set) and passes it to this function.
|
|
// If the shard value passed to this function is -1, that is, the shard was not set as an environment variable,
|
|
// we default the shard number to 0 for computing the default config map.
|
|
func GetOrUpdateShardFromConfigMap(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) {
|
|
hostname, err := osHostnameFunction()
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
// fetch the shard mapping configMap
|
|
shardMappingCM, err := kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Get(context.Background(), common.ArgoCDAppControllerShardConfigMapName, metav1.GetOptions{})
|
|
if err != nil {
|
|
if !apierrors.IsNotFound(err) {
|
|
return -1, fmt.Errorf("error getting sharding config map: %w", err)
|
|
}
|
|
log.Infof("shard mapping configmap %s not found. Creating default shard mapping configmap.", common.ArgoCDAppControllerShardConfigMapName)
|
|
|
|
// if the shard is not set as an environment variable, set the default value of shard to 0 for generating default CM
|
|
if shard == -1 {
|
|
shard = 0
|
|
}
|
|
shardMappingCM, err = generateDefaultShardMappingCM(settingsMgr.GetNamespace(), hostname, replicas, shard)
|
|
if err != nil {
|
|
return -1, fmt.Errorf("error generating default shard mapping configmap %w", err)
|
|
}
|
|
if _, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Create(context.Background(), shardMappingCM, metav1.CreateOptions{}); err != nil {
|
|
return -1, fmt.Errorf("error creating shard mapping configmap %w", err)
|
|
}
|
|
// return 0 as the controller is assigned to shard 0 while generating default shard mapping ConfigMap
|
|
return shard, nil
|
|
}
|
|
// Identify the available shard and update the ConfigMap
|
|
data := shardMappingCM.Data[ShardControllerMappingKey]
|
|
var shardMappingData []shardApplicationControllerMapping
|
|
err = json.Unmarshal([]byte(data), &shardMappingData)
|
|
if err != nil {
|
|
return -1, fmt.Errorf("error unmarshalling shard config map data: %w", err)
|
|
}
|
|
|
|
shard, shardMappingData = getOrUpdateShardNumberForController(shardMappingData, hostname, replicas, shard)
|
|
updatedShardMappingData, err := json.Marshal(shardMappingData)
|
|
if err != nil {
|
|
return -1, fmt.Errorf("error marshalling data of shard mapping ConfigMap: %w", err)
|
|
}
|
|
shardMappingCM.Data[ShardControllerMappingKey] = string(updatedShardMappingData)
|
|
|
|
_, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Update(context.Background(), shardMappingCM, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
return shard, nil
|
|
}
|
|
|
|
// getOrUpdateShardNumberForController takes list of shardApplicationControllerMapping and performs computation to find the matching or empty shard number
|
|
func getOrUpdateShardNumberForController(shardMappingData []shardApplicationControllerMapping, hostname string, replicas, shard int) (int, []shardApplicationControllerMapping) {
|
|
// if current length of shardMappingData in shard mapping configMap is less than the number of replicas,
|
|
// create additional empty entries for missing shard numbers in shardMappingDataconfigMap
|
|
if len(shardMappingData) < replicas {
|
|
// generate extra default mappings
|
|
for currentShard := len(shardMappingData); currentShard < replicas; currentShard++ {
|
|
shardMappingData = append(shardMappingData, shardApplicationControllerMapping{
|
|
ShardNumber: currentShard,
|
|
})
|
|
}
|
|
}
|
|
|
|
// if current length of shardMappingData in shard mapping configMap is more than the number of replicas,
|
|
// we replace the config map with default config map and let controllers self assign the new shard to itself
|
|
if len(shardMappingData) > replicas {
|
|
shardMappingData = getDefaultShardMappingData(replicas)
|
|
}
|
|
|
|
if shard != -1 && shard < replicas {
|
|
log.Debugf("update heartbeat for shard %d", shard)
|
|
for i := range shardMappingData {
|
|
shardMapping := shardMappingData[i]
|
|
if shardMapping.ShardNumber == shard {
|
|
log.Debugf("Shard found. Updating heartbeat!!")
|
|
shardMapping.ControllerName = hostname
|
|
shardMapping.HeartbeatTime = heartbeatCurrentTime()
|
|
shardMappingData[i] = shardMapping
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
// find the matching shard with assigned controllerName
|
|
for i := range shardMappingData {
|
|
shardMapping := shardMappingData[i]
|
|
if shardMapping.ControllerName == hostname {
|
|
log.Debugf("Shard matched. Updating heartbeat!!")
|
|
shard = int(shardMapping.ShardNumber)
|
|
shardMapping.HeartbeatTime = heartbeatCurrentTime()
|
|
shardMappingData[i] = shardMapping
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// at this point, we have still not found a shard with matching hostname.
|
|
// So, find a shard with either no controller assigned or assigned controller
|
|
// with heartbeat past threshold
|
|
if shard == -1 {
|
|
for i := range shardMappingData {
|
|
shardMapping := shardMappingData[i]
|
|
if (shardMapping.ControllerName == "") || (metav1.Now().After(shardMapping.HeartbeatTime.Add(time.Duration(HeartbeatTimeout) * time.Second))) {
|
|
shard = int(shardMapping.ShardNumber)
|
|
log.Debugf("Empty shard found %d", shard)
|
|
shardMapping.ControllerName = hostname
|
|
shardMapping.HeartbeatTime = heartbeatCurrentTime()
|
|
shardMappingData[i] = shardMapping
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return shard, shardMappingData
|
|
}
|
|
|
|
// generateDefaultShardMappingCM creates a default shard mapping configMap. Assigns current controller to shard 0.
|
|
func generateDefaultShardMappingCM(namespace, hostname string, replicas, shard int) (*corev1.ConfigMap, error) {
|
|
shardingCM := &corev1.ConfigMap{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: common.ArgoCDAppControllerShardConfigMapName,
|
|
Namespace: namespace,
|
|
},
|
|
Data: map[string]string{},
|
|
}
|
|
|
|
shardMappingData := getDefaultShardMappingData(replicas)
|
|
|
|
// if shard is not assigned to a controller, we use shard 0
|
|
if shard == -1 || shard > replicas {
|
|
shard = 0
|
|
}
|
|
shardMappingData[shard].ControllerName = hostname
|
|
shardMappingData[shard].HeartbeatTime = heartbeatCurrentTime()
|
|
|
|
data, err := json.Marshal(shardMappingData)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error generating default ConfigMap: %w", err)
|
|
}
|
|
shardingCM.Data[ShardControllerMappingKey] = string(data)
|
|
|
|
return shardingCM, nil
|
|
}
|
|
|
|
func getDefaultShardMappingData(replicas int) []shardApplicationControllerMapping {
|
|
shardMappingData := make([]shardApplicationControllerMapping, 0)
|
|
|
|
for i := range replicas {
|
|
mapping := shardApplicationControllerMapping{
|
|
ShardNumber: i,
|
|
}
|
|
shardMappingData = append(shardMappingData, mapping)
|
|
}
|
|
return shardMappingData
|
|
}
|
|
|
|
func GetClusterSharding(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (ClusterShardingCache, error) {
|
|
var replicasCount int
|
|
if enableDynamicClusterDistribution {
|
|
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
|
|
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})
|
|
// if app controller deployment is not found when dynamic cluster distribution is enabled error out
|
|
if err != nil {
|
|
return nil, fmt.Errorf("(dynamic cluster distribution) failed to get app controller deployment: %w", err)
|
|
}
|
|
|
|
if appControllerDeployment == nil || appControllerDeployment.Spec.Replicas == nil {
|
|
return nil, stderrors.New("(dynamic cluster distribution) failed to get app controller deployment replica count")
|
|
}
|
|
replicasCount = int(*appControllerDeployment.Spec.Replicas)
|
|
} else {
|
|
replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
|
|
}
|
|
shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
|
|
if replicasCount > 1 {
|
|
// check for shard mapping using configmap if application-controller is a deployment
|
|
// else use existing logic to infer shard from pod name if application-controller is a statefulset
|
|
if enableDynamicClusterDistribution {
|
|
var err error
|
|
// retry 3 times if we find a conflict while updating shard mapping configMap.
|
|
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
|
|
for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
|
|
shardNumber, err = GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber)
|
|
if err != nil && !apierrors.IsConflict(err) {
|
|
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %w", err)
|
|
break
|
|
}
|
|
// if `err == nil`, should not log the following warning message
|
|
if err != nil {
|
|
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
|
|
}
|
|
}
|
|
errors.CheckError(err)
|
|
} else {
|
|
if shardNumber < 0 {
|
|
var err error
|
|
shardNumber, err = InferShard()
|
|
errors.CheckError(err)
|
|
}
|
|
if shardNumber > replicasCount {
|
|
log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber)
|
|
shardNumber = 0
|
|
}
|
|
}
|
|
} else {
|
|
log.Info("Processing all cluster shards")
|
|
shardNumber = 0
|
|
}
|
|
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
|
|
return NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil
|
|
}
|