mirror of
https://github.com/argoproj/argo-cd.git
synced 2026-02-20 01:28:45 +01:00
fix: Use the cache for sharding (#15237)
* feat(sharding): use a cache Signed-off-by: Alexandre Gaudreault <alexandre.gaudreault@logmein.com> * cluster cmd Signed-off-by: Alexandre Gaudreault <alexandre.gaudreault@logmein.com> * - Assign shard 0 to in-cluster cluster and nil check updates - Caching clusters while sharding: Fixing unit tests - Update generated docs - Debug e2e tests - Default the shardNumber to the number of replicas if it is calculated to a higher value - defered Unlock only when a lock is set - Disabling temporarly other versions of k3s to check if e2e passes - Do not fail if hostname format is not abc-n - Fix unit test and skip some e2e - Skip TestGitSubmoduleHTTPSSupport test - Remove breaking defer c.lock.Unlock() - Reverting testing all k3s version - Default sharding fix Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> * fixes related to code review: renaming structure param, moving db initialisation Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> * Code review Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> * Set default shard to 0 Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> * Set different default value for Sts and Deployment mode Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> * Expose ClusterShardingCache Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> * Removing use of argoDB.db for DistributionFunction Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> * Update generated documentation Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> * Fix comment about NoShardingDistributionFunction and NoShardingAlgorithm Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> --------- Signed-off-by: Alexandre Gaudreault <alexandre.gaudreault@logmein.com> Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com> Co-authored-by: Alexandre Gaudreault <alexandre.gaudreault@logmein.com>
This commit is contained in:
2
Makefile
2
Makefile
@@ -49,7 +49,7 @@ ARGOCD_E2E_DEX_PORT?=5556
|
||||
ARGOCD_E2E_YARN_HOST?=localhost
|
||||
ARGOCD_E2E_DISABLE_AUTH?=
|
||||
|
||||
ARGOCD_E2E_TEST_TIMEOUT?=60m
|
||||
ARGOCD_E2E_TEST_TIMEOUT?=90m
|
||||
|
||||
ARGOCD_IN_CI?=false
|
||||
ARGOCD_TEST_E2E?=true
|
||||
|
||||
2
Procfile
2
Procfile
@@ -1,4 +1,4 @@
|
||||
controller: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-application-controller $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''} --server-side-diff-enabled=${ARGOCD_APPLICATION_CONTROLLER_SERVER_SIDE_DIFF:-'false'}"
|
||||
controller: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "HOSTNAME=testappcontroller-1 FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-application-controller $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''} --server-side-diff-enabled=${ARGOCD_APPLICATION_CONTROLLER_SERVER_SIDE_DIFF:-'false'}"
|
||||
api-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-server $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --disable-auth=${ARGOCD_E2E_DISABLE_AUTH:-'true'} --insecure --dex-server http://localhost:${ARGOCD_E2E_DEX_PORT:-5556} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --port ${ARGOCD_E2E_APISERVER_PORT:-8080} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''}"
|
||||
dex: sh -c "ARGOCD_BINARY_NAME=argocd-dex go run github.com/argoproj/argo-cd/v2/cmd gendexcfg -o `pwd`/dist/dex.yaml && (test -f dist/dex.yaml || { echo 'Failed to generate dex configuration'; exit 1; }) && docker run --rm -p ${ARGOCD_E2E_DEX_PORT:-5556}:${ARGOCD_E2E_DEX_PORT:-5556} -v `pwd`/dist/dex.yaml:/dex.yaml ghcr.io/dexidp/dex:$(grep "image: ghcr.io/dexidp/dex" manifests/base/dex/argocd-dex-server-deployment.yaml | cut -d':' -f3) dex serve /dex.yaml"
|
||||
redis: bash -c "if [ \"$ARGOCD_REDIS_LOCAL\" = 'true' ]; then redis-server --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; else docker run --rm --name argocd-redis -i -p ${ARGOCD_E2E_REDIS_PORT:-6379}:${ARGOCD_E2E_REDIS_PORT:-6379} docker.io/library/redis:$(grep "image: redis" manifests/base/redis/argocd-redis-deployment.yaml | cut -d':' -f3) --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; fi"
|
||||
|
||||
@@ -146,7 +146,7 @@ func NewCommand() *cobra.Command {
|
||||
appController.InvalidateProjectsCache()
|
||||
}))
|
||||
kubectl := kubeutil.NewKubectl()
|
||||
clusterFilter := getClusterFilter(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
|
||||
clusterSharding := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
|
||||
appController, err = controller.NewApplicationController(
|
||||
namespace,
|
||||
settingsMgr,
|
||||
@@ -164,7 +164,7 @@ func NewCommand() *cobra.Command {
|
||||
metricsAplicationLabels,
|
||||
kubectlParallelismLimit,
|
||||
persistResourceHealth,
|
||||
clusterFilter,
|
||||
clusterSharding,
|
||||
applicationNamespaces,
|
||||
&workqueueRateLimit,
|
||||
serverSideDiff,
|
||||
@@ -235,11 +235,10 @@ func NewCommand() *cobra.Command {
|
||||
return &command
|
||||
}
|
||||
|
||||
func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) sharding.ClusterFilterFunction {
|
||||
|
||||
var replicas int
|
||||
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
|
||||
|
||||
func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) sharding.ClusterShardingCache {
|
||||
var replicasCount int
|
||||
// StatefulSet mode and Deployment mode uses different default values for shard number.
|
||||
defaultShardNumberValue := 0
|
||||
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
|
||||
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})
|
||||
|
||||
@@ -249,22 +248,21 @@ func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.Se
|
||||
}
|
||||
|
||||
if enableDynamicClusterDistribution && appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
|
||||
replicas = int(*appControllerDeployment.Spec.Replicas)
|
||||
replicasCount = int(*appControllerDeployment.Spec.Replicas)
|
||||
defaultShardNumberValue = -1
|
||||
} else {
|
||||
replicas = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
|
||||
replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
|
||||
}
|
||||
|
||||
var clusterFilter func(cluster *v1alpha1.Cluster) bool
|
||||
if replicas > 1 {
|
||||
shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, defaultShardNumberValue, -math.MaxInt32, math.MaxInt32)
|
||||
if replicasCount > 1 {
|
||||
// check for shard mapping using configmap if application-controller is a deployment
|
||||
// else use existing logic to infer shard from pod name if application-controller is a statefulset
|
||||
if enableDynamicClusterDistribution && appControllerDeployment != nil {
|
||||
|
||||
var err error
|
||||
// retry 3 times if we find a conflict while updating shard mapping configMap.
|
||||
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
|
||||
for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
|
||||
shard, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicas, shard)
|
||||
shardNumber, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber)
|
||||
if !kubeerrors.IsConflict(err) {
|
||||
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err)
|
||||
break
|
||||
@@ -273,19 +271,19 @@ func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.Se
|
||||
}
|
||||
errors.CheckError(err)
|
||||
} else {
|
||||
if shard < 0 {
|
||||
if shardNumber < 0 {
|
||||
var err error
|
||||
shard, err = sharding.InferShard()
|
||||
shardNumber, err = sharding.InferShard()
|
||||
errors.CheckError(err)
|
||||
}
|
||||
if shardNumber > replicasCount {
|
||||
log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber)
|
||||
shardNumber = 0
|
||||
}
|
||||
}
|
||||
log.Infof("Processing clusters from shard %d", shard)
|
||||
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
|
||||
log.Infof("Using filter function: %s", shardingAlgorithm)
|
||||
distributionFunction := sharding.GetDistributionFunction(db, shardingAlgorithm)
|
||||
clusterFilter = sharding.GetClusterFilter(db, distributionFunction, shard)
|
||||
} else {
|
||||
log.Info("Processing all cluster shards")
|
||||
}
|
||||
return clusterFilter
|
||||
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
|
||||
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm)
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/argoproj/argo-cd/v2/common"
|
||||
"github.com/argoproj/argo-cd/v2/controller/sharding"
|
||||
argocdclient "github.com/argoproj/argo-cd/v2/pkg/apiclient"
|
||||
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
|
||||
argoappv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
|
||||
"github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
|
||||
"github.com/argoproj/argo-cd/v2/util/argo"
|
||||
@@ -78,7 +79,7 @@ type ClusterWithInfo struct {
|
||||
Namespaces []string
|
||||
}
|
||||
|
||||
func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClient *versioned.Clientset, replicas int, namespace string, portForwardRedis bool, cacheSrc func() (*appstatecache.Cache, error), shard int, redisName string, redisHaProxyName string, redisCompressionStr string) ([]ClusterWithInfo, error) {
|
||||
func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClient *versioned.Clientset, replicas int, shardingAlgorithm string, namespace string, portForwardRedis bool, cacheSrc func() (*appstatecache.Cache, error), shard int, redisName string, redisHaProxyName string, redisCompressionStr string) ([]ClusterWithInfo, error) {
|
||||
settingsMgr := settings.NewSettingsManager(ctx, kubeClient, namespace)
|
||||
|
||||
argoDB := db.NewDB(namespace, settingsMgr, kubeClient)
|
||||
@@ -86,6 +87,10 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clusterShardingCache := sharding.NewClusterSharding(argoDB, shard, replicas, shardingAlgorithm)
|
||||
clusterShardingCache.Init(clustersList)
|
||||
clusterShards := clusterShardingCache.GetDistribution()
|
||||
|
||||
var cache *appstatecache.Cache
|
||||
if portForwardRedis {
|
||||
overrides := clientcmd.ConfigOverrides{}
|
||||
@@ -122,8 +127,15 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie
|
||||
apps[i] = app
|
||||
}
|
||||
clusters := make([]ClusterWithInfo, len(clustersList.Items))
|
||||
|
||||
batchSize := 10
|
||||
batchesCount := int(math.Ceil(float64(len(clusters)) / float64(batchSize)))
|
||||
clusterSharding := &sharding.ClusterSharding{
|
||||
Shard: shard,
|
||||
Replicas: replicas,
|
||||
Shards: make(map[string]int),
|
||||
Clusters: make(map[string]*v1alpha1.Cluster),
|
||||
}
|
||||
for batchNum := 0; batchNum < batchesCount; batchNum++ {
|
||||
batchStart := batchSize * batchNum
|
||||
batchEnd := batchSize * (batchNum + 1)
|
||||
@@ -135,12 +147,12 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie
|
||||
clusterShard := 0
|
||||
cluster := batch[i]
|
||||
if replicas > 0 {
|
||||
distributionFunction := sharding.GetDistributionFunction(argoDB, common.DefaultShardingAlgorithm)
|
||||
distributionFunction := sharding.GetDistributionFunction(clusterSharding.GetClusterAccessor(), common.DefaultShardingAlgorithm, replicas)
|
||||
distributionFunction(&cluster)
|
||||
clusterShard := clusterShards[cluster.Server]
|
||||
cluster.Shard = pointer.Int64(int64(clusterShard))
|
||||
log.Infof("Cluster with uid: %s will be processed by shard %d", cluster.ID, clusterShard)
|
||||
}
|
||||
|
||||
if shard != -1 && clusterShard != shard {
|
||||
return nil
|
||||
}
|
||||
@@ -176,6 +188,7 @@ func NewClusterShardsCommand(clientOpts *argocdclient.ClientOptions) *cobra.Comm
|
||||
var (
|
||||
shard int
|
||||
replicas int
|
||||
shardingAlgorithm string
|
||||
clientConfig clientcmd.ClientConfig
|
||||
cacheSrc func() (*appstatecache.Cache, error)
|
||||
portForwardRedis bool
|
||||
@@ -183,7 +196,7 @@ func NewClusterShardsCommand(clientOpts *argocdclient.ClientOptions) *cobra.Comm
|
||||
)
|
||||
var command = cobra.Command{
|
||||
Use: "shards",
|
||||
Short: "Print information about each controller shard and portion of Kubernetes resources it is responsible for.",
|
||||
Short: "Print information about each controller shard and the estimated portion of Kubernetes resources it is responsible for.",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
ctx := cmd.Context()
|
||||
|
||||
@@ -203,8 +216,7 @@ func NewClusterShardsCommand(clientOpts *argocdclient.ClientOptions) *cobra.Comm
|
||||
if replicas == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
clusters, err := loadClusters(ctx, kubeClient, appClient, replicas, namespace, portForwardRedis, cacheSrc, shard, clientOpts.RedisName, clientOpts.RedisHaProxyName, redisCompressionStr)
|
||||
clusters, err := loadClusters(ctx, kubeClient, appClient, replicas, shardingAlgorithm, namespace, portForwardRedis, cacheSrc, shard, clientOpts.RedisName, clientOpts.RedisHaProxyName, redisCompressionStr)
|
||||
errors.CheckError(err)
|
||||
if len(clusters) == 0 {
|
||||
return
|
||||
@@ -216,7 +228,9 @@ func NewClusterShardsCommand(clientOpts *argocdclient.ClientOptions) *cobra.Comm
|
||||
clientConfig = cli.AddKubectlFlagsToCmd(&command)
|
||||
command.Flags().IntVar(&shard, "shard", -1, "Cluster shard filter")
|
||||
command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified")
|
||||
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] ")
|
||||
command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?")
|
||||
|
||||
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command)
|
||||
|
||||
// parse all added flags so far to get the redis-compression flag that was added by AddCacheFlagsToCmd() above
|
||||
@@ -461,6 +475,7 @@ func NewClusterStatsCommand(clientOpts *argocdclient.ClientOptions) *cobra.Comma
|
||||
var (
|
||||
shard int
|
||||
replicas int
|
||||
shardingAlgorithm string
|
||||
clientConfig clientcmd.ClientConfig
|
||||
cacheSrc func() (*appstatecache.Cache, error)
|
||||
portForwardRedis bool
|
||||
@@ -494,7 +509,7 @@ argocd admin cluster stats target-cluster`,
|
||||
replicas, err = getControllerReplicas(ctx, kubeClient, namespace, clientOpts.AppControllerName)
|
||||
errors.CheckError(err)
|
||||
}
|
||||
clusters, err := loadClusters(ctx, kubeClient, appClient, replicas, namespace, portForwardRedis, cacheSrc, shard, clientOpts.RedisName, clientOpts.RedisHaProxyName, redisCompressionStr)
|
||||
clusters, err := loadClusters(ctx, kubeClient, appClient, replicas, shardingAlgorithm, namespace, portForwardRedis, cacheSrc, shard, clientOpts.RedisName, clientOpts.RedisHaProxyName, redisCompressionStr)
|
||||
errors.CheckError(err)
|
||||
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
||||
@@ -508,6 +523,7 @@ argocd admin cluster stats target-cluster`,
|
||||
clientConfig = cli.AddKubectlFlagsToCmd(&command)
|
||||
command.Flags().IntVar(&shard, "shard", -1, "Cluster shard filter")
|
||||
command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified")
|
||||
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] ")
|
||||
command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?")
|
||||
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command)
|
||||
|
||||
|
||||
@@ -115,9 +115,9 @@ const (
|
||||
LegacyShardingAlgorithm = "legacy"
|
||||
// RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution accross all shards
|
||||
RoundRobinShardingAlgorithm = "round-robin"
|
||||
DefaultShardingAlgorithm = LegacyShardingAlgorithm
|
||||
// AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller
|
||||
AppControllerHeartbeatUpdateRetryCount = 3
|
||||
DefaultShardingAlgorithm = LegacyShardingAlgorithm
|
||||
)
|
||||
|
||||
// Dex related constants
|
||||
|
||||
@@ -126,7 +126,7 @@ type ApplicationController struct {
|
||||
refreshRequestedAppsMutex *sync.Mutex
|
||||
metricsServer *metrics.MetricsServer
|
||||
kubectlSemaphore *semaphore.Weighted
|
||||
clusterFilter func(cluster *appv1.Cluster) bool
|
||||
clusterSharding sharding.ClusterShardingCache
|
||||
projByNameCache sync.Map
|
||||
applicationNamespaces []string
|
||||
}
|
||||
@@ -149,7 +149,7 @@ func NewApplicationController(
|
||||
metricsApplicationLabels []string,
|
||||
kubectlParallelismLimit int64,
|
||||
persistResourceHealth bool,
|
||||
clusterFilter func(cluster *appv1.Cluster) bool,
|
||||
clusterSharding sharding.ClusterShardingCache,
|
||||
applicationNamespaces []string,
|
||||
rateLimiterConfig *ratelimiter.AppControllerRateLimiterConfig,
|
||||
serverSideDiff bool,
|
||||
@@ -179,7 +179,7 @@ func NewApplicationController(
|
||||
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
|
||||
settingsMgr: settingsMgr,
|
||||
selfHealTimeout: selfHealTimeout,
|
||||
clusterFilter: clusterFilter,
|
||||
clusterSharding: clusterSharding,
|
||||
projByNameCache: sync.Map{},
|
||||
applicationNamespaces: applicationNamespaces,
|
||||
}
|
||||
@@ -260,7 +260,7 @@ func NewApplicationController(
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
stateCache := statecache.NewLiveStateCache(db, appInformer, ctrl.settingsMgr, kubectl, ctrl.metricsServer, ctrl.handleObjectUpdated, clusterFilter, argo.NewResourceTracking())
|
||||
stateCache := statecache.NewLiveStateCache(db, appInformer, ctrl.settingsMgr, kubectl, ctrl.metricsServer, ctrl.handleObjectUpdated, clusterSharding, argo.NewResourceTracking())
|
||||
appStateManager := NewAppStateManager(db, applicationClientset, repoClientset, namespace, kubectl, ctrl.settingsMgr, stateCache, projInformer, ctrl.metricsServer, argoCache, ctrl.statusRefreshTimeout, argo.NewResourceTracking(), persistResourceHealth, repoErrorGracePeriod, serverSideDiff)
|
||||
ctrl.appInformer = appInformer
|
||||
ctrl.appLister = appLister
|
||||
@@ -772,6 +772,13 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int
|
||||
go ctrl.projInformer.Run(ctx.Done())
|
||||
go ctrl.deploymentInformer.Informer().Run(ctx.Done())
|
||||
|
||||
clusters, err := ctrl.db.ListClusters(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("Cannot init sharding. Error while querying clusters list from database: %v", err)
|
||||
} else {
|
||||
ctrl.clusterSharding.Init(clusters)
|
||||
}
|
||||
|
||||
errors.CheckError(ctrl.stateCache.Init())
|
||||
|
||||
if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced, ctrl.projInformer.HasSynced) {
|
||||
@@ -1976,15 +1983,11 @@ func (ctrl *ApplicationController) canProcessApp(obj interface{}) bool {
|
||||
}
|
||||
}
|
||||
|
||||
if ctrl.clusterFilter != nil {
|
||||
cluster, err := ctrl.db.GetCluster(context.Background(), app.Spec.Destination.Server)
|
||||
if err != nil {
|
||||
return ctrl.clusterFilter(nil)
|
||||
}
|
||||
return ctrl.clusterFilter(cluster)
|
||||
cluster, err := ctrl.db.GetCluster(context.Background(), app.Spec.Destination.Server)
|
||||
if err != nil {
|
||||
return ctrl.clusterSharding.IsManagedCluster(nil)
|
||||
}
|
||||
|
||||
return true
|
||||
return ctrl.clusterSharding.IsManagedCluster(cluster)
|
||||
}
|
||||
|
||||
func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.SharedIndexInformer, applisters.ApplicationLister) {
|
||||
@@ -2136,7 +2139,7 @@ func (ctrl *ApplicationController) projectErrorToCondition(err error, app *appv1
|
||||
}
|
||||
|
||||
func (ctrl *ApplicationController) RegisterClusterSecretUpdater(ctx context.Context) {
|
||||
updater := NewClusterInfoUpdater(ctrl.stateCache, ctrl.db, ctrl.appLister.Applications(""), ctrl.cache, ctrl.clusterFilter, ctrl.getAppProj, ctrl.namespace)
|
||||
updater := NewClusterInfoUpdater(ctrl.stateCache, ctrl.db, ctrl.appLister.Applications(""), ctrl.cache, ctrl.clusterSharding.IsManagedCluster, ctrl.getAppProj, ctrl.namespace)
|
||||
go updater.Run(ctx)
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,9 @@ import (
|
||||
|
||||
"github.com/argoproj/argo-cd/v2/common"
|
||||
statecache "github.com/argoproj/argo-cd/v2/controller/cache"
|
||||
"github.com/argoproj/argo-cd/v2/controller/sharding"
|
||||
|
||||
dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks"
|
||||
"github.com/argoproj/gitops-engine/pkg/cache/mocks"
|
||||
synccommon "github.com/argoproj/gitops-engine/pkg/sync/common"
|
||||
"github.com/argoproj/gitops-engine/pkg/utils/kube"
|
||||
@@ -154,6 +156,10 @@ func newFakeController(data *fakeData, repoErr error) *ApplicationController {
|
||||
nil,
|
||||
false,
|
||||
)
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(1)
|
||||
// Setting a default sharding algorithm for the tests where we cannot set it.
|
||||
ctrl.clusterSharding = sharding.NewClusterSharding(db, 0, 1, common.DefaultShardingAlgorithm)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -686,7 +692,6 @@ func TestFinalizeAppDeletion(t *testing.T) {
|
||||
ctrl := newFakeController(&fakeData{apps: []runtime.Object{app, &defaultProj}, managedLiveObjs: map[kube.ResourceKey]*unstructured.Unstructured{
|
||||
kube.GetResourceKey(appObj): appObj,
|
||||
}}, nil)
|
||||
|
||||
patched := false
|
||||
fakeAppCs := ctrl.applicationClientset.(*appclientset.Clientset)
|
||||
defaultReactor := fakeAppCs.ReactionChain[0]
|
||||
@@ -1809,13 +1814,11 @@ func Test_canProcessApp(t *testing.T) {
|
||||
})
|
||||
t.Run("with cluster filter, good namespace", func(t *testing.T) {
|
||||
app.Namespace = "good"
|
||||
ctrl.clusterFilter = func(_ *v1alpha1.Cluster) bool { return true }
|
||||
canProcess := ctrl.canProcessApp(app)
|
||||
assert.True(t, canProcess)
|
||||
})
|
||||
t.Run("with cluster filter, bad namespace", func(t *testing.T) {
|
||||
app.Namespace = "bad"
|
||||
ctrl.clusterFilter = func(_ *v1alpha1.Cluster) bool { return true }
|
||||
canProcess := ctrl.canProcessApp(app)
|
||||
assert.False(t, canProcess)
|
||||
})
|
||||
|
||||
21
controller/cache/cache.go
vendored
21
controller/cache/cache.go
vendored
@@ -29,6 +29,7 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"github.com/argoproj/argo-cd/v2/controller/metrics"
|
||||
"github.com/argoproj/argo-cd/v2/controller/sharding"
|
||||
"github.com/argoproj/argo-cd/v2/pkg/apis/application"
|
||||
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
|
||||
"github.com/argoproj/argo-cd/v2/util/argo"
|
||||
@@ -168,7 +169,7 @@ func NewLiveStateCache(
|
||||
kubectl kube.Kubectl,
|
||||
metricsServer *metrics.MetricsServer,
|
||||
onObjectUpdated ObjectUpdatedHandler,
|
||||
clusterFilter func(cluster *appv1.Cluster) bool,
|
||||
clusterSharding sharding.ClusterShardingCache,
|
||||
resourceTracking argo.ResourceTracking) LiveStateCache {
|
||||
|
||||
return &liveStateCache{
|
||||
@@ -179,7 +180,7 @@ func NewLiveStateCache(
|
||||
kubectl: kubectl,
|
||||
settingsMgr: settingsMgr,
|
||||
metricsServer: metricsServer,
|
||||
clusterFilter: clusterFilter,
|
||||
clusterSharding: clusterSharding,
|
||||
resourceTracking: resourceTracking,
|
||||
}
|
||||
}
|
||||
@@ -202,7 +203,7 @@ type liveStateCache struct {
|
||||
kubectl kube.Kubectl
|
||||
settingsMgr *settings.SettingsManager
|
||||
metricsServer *metrics.MetricsServer
|
||||
clusterFilter func(cluster *appv1.Cluster) bool
|
||||
clusterSharding sharding.ClusterShardingCache
|
||||
resourceTracking argo.ResourceTracking
|
||||
|
||||
clusters map[string]clustercache.ClusterCache
|
||||
@@ -722,22 +723,24 @@ func (c *liveStateCache) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (c *liveStateCache) canHandleCluster(cluster *appv1.Cluster) bool {
|
||||
if c.clusterFilter == nil {
|
||||
return true
|
||||
}
|
||||
return c.clusterFilter(cluster)
|
||||
return c.clusterSharding.IsManagedCluster(cluster)
|
||||
}
|
||||
|
||||
func (c *liveStateCache) handleAddEvent(cluster *appv1.Cluster) {
|
||||
c.clusterSharding.Add(cluster)
|
||||
if !c.canHandleCluster(cluster) {
|
||||
log.Infof("Ignoring cluster %s", cluster.Server)
|
||||
return
|
||||
}
|
||||
|
||||
c.lock.Lock()
|
||||
_, ok := c.clusters[cluster.Server]
|
||||
c.lock.Unlock()
|
||||
if !ok {
|
||||
log.Debugf("Checking if cache %v / cluster %v has appInformer %v", c, cluster, c.appInformer)
|
||||
if c.appInformer == nil {
|
||||
log.Warn("Cannot get a cluster appInformer. Cache may not be started this time")
|
||||
return
|
||||
}
|
||||
if c.isClusterHasApps(c.appInformer.GetStore().List(), cluster) {
|
||||
go func() {
|
||||
// warm up cache for cluster with apps
|
||||
@@ -748,6 +751,7 @@ func (c *liveStateCache) handleAddEvent(cluster *appv1.Cluster) {
|
||||
}
|
||||
|
||||
func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *appv1.Cluster) {
|
||||
c.clusterSharding.Update(newCluster)
|
||||
c.lock.Lock()
|
||||
cluster, ok := c.clusters[newCluster.Server]
|
||||
c.lock.Unlock()
|
||||
@@ -790,6 +794,7 @@ func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *a
|
||||
|
||||
func (c *liveStateCache) handleDeleteEvent(clusterServer string) {
|
||||
c.lock.RLock()
|
||||
c.clusterSharding.Delete(clusterServer)
|
||||
cluster, ok := c.clusters[clusterServer]
|
||||
c.lock.RUnlock()
|
||||
if ok {
|
||||
|
||||
50
controller/cache/cache_test.go
vendored
50
controller/cache/cache_test.go
vendored
@@ -21,7 +21,11 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
|
||||
"github.com/argoproj/argo-cd/v2/common"
|
||||
"github.com/argoproj/argo-cd/v2/controller/metrics"
|
||||
"github.com/argoproj/argo-cd/v2/controller/sharding"
|
||||
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
|
||||
dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks"
|
||||
argosettings "github.com/argoproj/argo-cd/v2/util/settings"
|
||||
)
|
||||
|
||||
@@ -35,11 +39,13 @@ func TestHandleModEvent_HasChanges(t *testing.T) {
|
||||
clusterCache := &mocks.ClusterCache{}
|
||||
clusterCache.On("Invalidate", mock.Anything, mock.Anything).Return(nil).Once()
|
||||
clusterCache.On("EnsureSynced").Return(nil).Once()
|
||||
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(1)
|
||||
clustersCache := liveStateCache{
|
||||
clusters: map[string]cache.ClusterCache{
|
||||
"https://mycluster": clusterCache,
|
||||
},
|
||||
clusterSharding: sharding.NewClusterSharding(db, 0, 1, common.DefaultShardingAlgorithm),
|
||||
}
|
||||
|
||||
clustersCache.handleModEvent(&appv1.Cluster{
|
||||
@@ -56,14 +62,22 @@ func TestHandleModEvent_ClusterExcluded(t *testing.T) {
|
||||
clusterCache := &mocks.ClusterCache{}
|
||||
clusterCache.On("Invalidate", mock.Anything, mock.Anything).Return(nil).Once()
|
||||
clusterCache.On("EnsureSynced").Return(nil).Once()
|
||||
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(1)
|
||||
clustersCache := liveStateCache{
|
||||
clusters: map[string]cache.ClusterCache{
|
||||
"https://mycluster": clusterCache,
|
||||
},
|
||||
clusterFilter: func(cluster *appv1.Cluster) bool {
|
||||
return false
|
||||
db: nil,
|
||||
appInformer: nil,
|
||||
onObjectUpdated: func(managedByApp map[string]bool, ref v1.ObjectReference) {
|
||||
},
|
||||
kubectl: nil,
|
||||
settingsMgr: &argosettings.SettingsManager{},
|
||||
metricsServer: &metrics.MetricsServer{},
|
||||
// returns a shard that never process any cluster
|
||||
clusterSharding: sharding.NewClusterSharding(db, 0, 1, common.DefaultShardingAlgorithm),
|
||||
resourceTracking: nil,
|
||||
clusters: map[string]cache.ClusterCache{"https://mycluster": clusterCache},
|
||||
cacheSettings: cacheSettings{},
|
||||
lock: sync.RWMutex{},
|
||||
}
|
||||
|
||||
clustersCache.handleModEvent(&appv1.Cluster{
|
||||
@@ -75,18 +89,20 @@ func TestHandleModEvent_ClusterExcluded(t *testing.T) {
|
||||
Namespaces: []string{"default"},
|
||||
})
|
||||
|
||||
assert.Len(t, clustersCache.clusters, 0)
|
||||
assert.Len(t, clustersCache.clusters, 1)
|
||||
}
|
||||
|
||||
func TestHandleModEvent_NoChanges(t *testing.T) {
|
||||
clusterCache := &mocks.ClusterCache{}
|
||||
clusterCache.On("Invalidate", mock.Anything).Panic("should not invalidate")
|
||||
clusterCache.On("EnsureSynced").Return(nil).Panic("should not re-sync")
|
||||
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(1)
|
||||
clustersCache := liveStateCache{
|
||||
clusters: map[string]cache.ClusterCache{
|
||||
"https://mycluster": clusterCache,
|
||||
},
|
||||
clusterSharding: sharding.NewClusterSharding(db, 0, 1, common.DefaultShardingAlgorithm),
|
||||
}
|
||||
|
||||
clustersCache.handleModEvent(&appv1.Cluster{
|
||||
@@ -99,11 +115,11 @@ func TestHandleModEvent_NoChanges(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandleAddEvent_ClusterExcluded(t *testing.T) {
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(1)
|
||||
clustersCache := liveStateCache{
|
||||
clusters: map[string]cache.ClusterCache{},
|
||||
clusterFilter: func(cluster *appv1.Cluster) bool {
|
||||
return false
|
||||
},
|
||||
clusters: map[string]cache.ClusterCache{},
|
||||
clusterSharding: sharding.NewClusterSharding(db, 0, 2, common.DefaultShardingAlgorithm),
|
||||
}
|
||||
clustersCache.handleAddEvent(&appv1.Cluster{
|
||||
Server: "https://mycluster",
|
||||
@@ -118,6 +134,8 @@ func TestHandleDeleteEvent_CacheDeadlock(t *testing.T) {
|
||||
Server: "https://mycluster",
|
||||
Config: appv1.ClusterConfig{Username: "bar"},
|
||||
}
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(1)
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
settingsMgr := argosettings.NewSettingsManager(context.TODO(), fakeClient, "argocd")
|
||||
liveStateCacheLock := sync.RWMutex{}
|
||||
@@ -126,10 +144,8 @@ func TestHandleDeleteEvent_CacheDeadlock(t *testing.T) {
|
||||
clusters: map[string]cache.ClusterCache{
|
||||
testCluster.Server: gitopsEngineClusterCache,
|
||||
},
|
||||
clusterFilter: func(cluster *appv1.Cluster) bool {
|
||||
return true
|
||||
},
|
||||
settingsMgr: settingsMgr,
|
||||
clusterSharding: sharding.NewClusterSharding(db, 0, 1, common.DefaultShardingAlgorithm),
|
||||
settingsMgr: settingsMgr,
|
||||
// Set the lock here so we can reference it later
|
||||
// nolint We need to overwrite here to have access to the lock
|
||||
lock: liveStateCacheLock,
|
||||
|
||||
163
controller/sharding/cache.go
Normal file
163
controller/sharding/cache.go
Normal file
@@ -0,0 +1,163 @@
|
||||
package sharding
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
|
||||
"github.com/argoproj/argo-cd/v2/util/db"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type ClusterShardingCache interface {
|
||||
Init(clusters *v1alpha1.ClusterList)
|
||||
Add(c *v1alpha1.Cluster)
|
||||
Delete(clusterServer string)
|
||||
Update(c *v1alpha1.Cluster)
|
||||
IsManagedCluster(c *v1alpha1.Cluster) bool
|
||||
GetDistribution() map[string]int
|
||||
}
|
||||
|
||||
type ClusterSharding struct {
|
||||
Shard int
|
||||
Replicas int
|
||||
Shards map[string]int
|
||||
Clusters map[string]*v1alpha1.Cluster
|
||||
lock sync.RWMutex
|
||||
getClusterShard DistributionFunction
|
||||
}
|
||||
|
||||
func NewClusterSharding(db db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
|
||||
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
|
||||
clusterSharding := &ClusterSharding{
|
||||
Shard: shard,
|
||||
Replicas: replicas,
|
||||
Shards: make(map[string]int),
|
||||
Clusters: make(map[string]*v1alpha1.Cluster),
|
||||
}
|
||||
distributionFunction := NoShardingDistributionFunction()
|
||||
if replicas > 1 {
|
||||
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
|
||||
distributionFunction = GetDistributionFunction(clusterSharding.GetClusterAccessor(), shardingAlgorithm, replicas)
|
||||
} else {
|
||||
log.Info("Processing all cluster shards")
|
||||
}
|
||||
clusterSharding.getClusterShard = distributionFunction
|
||||
return clusterSharding
|
||||
}
|
||||
|
||||
// IsManagedCluster returns wheter or not the cluster should be processed by a given shard.
|
||||
func (s *ClusterSharding) IsManagedCluster(c *v1alpha1.Cluster) bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
if c == nil { // nil cluster (in-cluster) is always managed by current clusterShard
|
||||
return true
|
||||
}
|
||||
clusterShard := 0
|
||||
if shard, ok := s.Shards[c.Server]; ok {
|
||||
clusterShard = shard
|
||||
} else {
|
||||
log.Warnf("The cluster %s has no assigned shard.", c.Server)
|
||||
}
|
||||
log.Debugf("Checking if cluster %s with clusterShard %d should be processed by shard %d", c.Server, clusterShard, s.Shard)
|
||||
return clusterShard == s.Shard
|
||||
}
|
||||
|
||||
func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
|
||||
sharding.lock.Lock()
|
||||
defer sharding.lock.Unlock()
|
||||
newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
|
||||
for _, c := range clusters.Items {
|
||||
newClusters[c.Server] = &c
|
||||
}
|
||||
sharding.Clusters = newClusters
|
||||
sharding.updateDistribution()
|
||||
}
|
||||
|
||||
func (sharding *ClusterSharding) Add(c *v1alpha1.Cluster) {
|
||||
sharding.lock.Lock()
|
||||
defer sharding.lock.Unlock()
|
||||
|
||||
old, ok := sharding.Clusters[c.Server]
|
||||
sharding.Clusters[c.Server] = c
|
||||
if !ok || hasShardingUpdates(old, c) {
|
||||
sharding.updateDistribution()
|
||||
} else {
|
||||
log.Debugf("Skipping sharding distribution update. Cluster already added")
|
||||
}
|
||||
}
|
||||
|
||||
func (sharding *ClusterSharding) Delete(clusterServer string) {
|
||||
sharding.lock.Lock()
|
||||
defer sharding.lock.Unlock()
|
||||
if _, ok := sharding.Clusters[clusterServer]; ok {
|
||||
delete(sharding.Clusters, clusterServer)
|
||||
delete(sharding.Shards, clusterServer)
|
||||
sharding.updateDistribution()
|
||||
}
|
||||
}
|
||||
|
||||
func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) {
|
||||
sharding.lock.Lock()
|
||||
defer sharding.lock.Unlock()
|
||||
|
||||
old, ok := sharding.Clusters[c.Server]
|
||||
sharding.Clusters[c.Server] = c
|
||||
if !ok || hasShardingUpdates(old, c) {
|
||||
sharding.updateDistribution()
|
||||
} else {
|
||||
log.Debugf("Skipping sharding distribution update. No relevant changes")
|
||||
}
|
||||
}
|
||||
|
||||
func (sharding *ClusterSharding) GetDistribution() map[string]int {
|
||||
sharding.lock.RLock()
|
||||
shards := sharding.Shards
|
||||
sharding.lock.RUnlock()
|
||||
|
||||
distribution := make(map[string]int, len(shards))
|
||||
for k, v := range shards {
|
||||
distribution[k] = v
|
||||
}
|
||||
return distribution
|
||||
}
|
||||
|
||||
func (sharding *ClusterSharding) updateDistribution() {
|
||||
log.Info("Updating cluster shards")
|
||||
|
||||
for _, c := range sharding.Clusters {
|
||||
shard := 0
|
||||
if c.Shard != nil {
|
||||
requestedShard := int(*c.Shard)
|
||||
if requestedShard < sharding.Replicas {
|
||||
shard = requestedShard
|
||||
} else {
|
||||
log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard (%d). Using shard 0.", requestedShard, c.Server, sharding.Replicas)
|
||||
}
|
||||
} else {
|
||||
shard = sharding.getClusterShard(c)
|
||||
}
|
||||
var shard64 int64 = int64(shard)
|
||||
c.Shard = &shard64
|
||||
sharding.Shards[c.Server] = shard
|
||||
}
|
||||
}
|
||||
|
||||
// hasShardingUpdates returns true if the sharding distribution has been updated.
|
||||
// nil checking is done for the corner case of the in-cluster cluster which may
|
||||
// have a nil shard assigned
|
||||
func hasShardingUpdates(old, new *v1alpha1.Cluster) bool {
|
||||
if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) {
|
||||
return false
|
||||
}
|
||||
return old.Shard != new.Shard
|
||||
}
|
||||
|
||||
func (d *ClusterSharding) GetClusterAccessor() clusterAccessor {
|
||||
return func() []*v1alpha1.Cluster {
|
||||
clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters))
|
||||
for _, c := range d.Clusters {
|
||||
clusters = append(clusters, c)
|
||||
}
|
||||
return clusters
|
||||
}
|
||||
}
|
||||
@@ -40,6 +40,7 @@ const ShardControllerMappingKey = "shardControllerMapping"
|
||||
|
||||
type DistributionFunction func(c *v1alpha1.Cluster) int
|
||||
type ClusterFilterFunction func(c *v1alpha1.Cluster) bool
|
||||
type clusterAccessor func() []*v1alpha1.Cluster
|
||||
|
||||
// shardApplicationControllerMapping stores the mapping of Shard Number to Application Controller in ConfigMap.
|
||||
// It also stores the heartbeat of last synced time of the application controller.
|
||||
@@ -53,8 +54,7 @@ type shardApplicationControllerMapping struct {
|
||||
// and returns wheter or not the cluster should be processed by a given shard. It calls the distributionFunction
|
||||
// to determine which shard will process the cluster, and if the given shard is equal to the calculated shard
|
||||
// the function will return true.
|
||||
func GetClusterFilter(db db.ArgoDB, distributionFunction DistributionFunction, shard int) ClusterFilterFunction {
|
||||
replicas := db.GetApplicationControllerReplicas()
|
||||
func GetClusterFilter(db db.ArgoDB, distributionFunction DistributionFunction, replicas, shard int) ClusterFilterFunction {
|
||||
return func(c *v1alpha1.Cluster) bool {
|
||||
clusterShard := 0
|
||||
if c != nil && c.Shard != nil {
|
||||
@@ -73,14 +73,14 @@ func GetClusterFilter(db db.ArgoDB, distributionFunction DistributionFunction, s
|
||||
|
||||
// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and
|
||||
// the current datas.
|
||||
func GetDistributionFunction(db db.ArgoDB, shardingAlgorithm string) DistributionFunction {
|
||||
log.Infof("Using filter function: %s", shardingAlgorithm)
|
||||
distributionFunction := LegacyDistributionFunction(db)
|
||||
func GetDistributionFunction(clusters clusterAccessor, shardingAlgorithm string, replicasCount int) DistributionFunction {
|
||||
log.Debugf("Using filter function: %s", shardingAlgorithm)
|
||||
distributionFunction := LegacyDistributionFunction(replicasCount)
|
||||
switch shardingAlgorithm {
|
||||
case common.RoundRobinShardingAlgorithm:
|
||||
distributionFunction = RoundRobinDistributionFunction(db)
|
||||
distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount)
|
||||
case common.LegacyShardingAlgorithm:
|
||||
distributionFunction = LegacyDistributionFunction(db)
|
||||
distributionFunction = LegacyDistributionFunction(replicasCount)
|
||||
default:
|
||||
log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
|
||||
}
|
||||
@@ -92,15 +92,21 @@ func GetDistributionFunction(db db.ArgoDB, shardingAlgorithm string) Distributio
|
||||
// is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as
|
||||
// some shards may get assigned more clusters than others. It is the legacy function distribution that is
|
||||
// kept for compatibility reasons
|
||||
func LegacyDistributionFunction(db db.ArgoDB) DistributionFunction {
|
||||
replicas := db.GetApplicationControllerReplicas()
|
||||
func LegacyDistributionFunction(replicas int) DistributionFunction {
|
||||
return func(c *v1alpha1.Cluster) int {
|
||||
if replicas == 0 {
|
||||
log.Debugf("Replicas count is : %d, returning -1", replicas)
|
||||
return -1
|
||||
}
|
||||
if c == nil {
|
||||
log.Debug("In-cluster: returning 0")
|
||||
return 0
|
||||
}
|
||||
// if Shard is manually set and the assigned value is lower than the number of replicas,
|
||||
// then its value is returned otherwise it is the default calculated value
|
||||
if c.Shard != nil && int(*c.Shard) < replicas {
|
||||
return int(*c.Shard)
|
||||
}
|
||||
id := c.ID
|
||||
log.Debugf("Calculating cluster shard for cluster id: %s", id)
|
||||
if id == "" {
|
||||
@@ -121,14 +127,19 @@ func LegacyDistributionFunction(db db.ArgoDB) DistributionFunction {
|
||||
// This function ensures an homogenous distribution: each shards got assigned the same number of
|
||||
// clusters +/-1 , but with the drawback of a reshuffling of clusters accross shards in case of some changes
|
||||
// in the cluster list
|
||||
func RoundRobinDistributionFunction(db db.ArgoDB) DistributionFunction {
|
||||
replicas := db.GetApplicationControllerReplicas()
|
||||
|
||||
func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction {
|
||||
return func(c *v1alpha1.Cluster) int {
|
||||
if replicas > 0 {
|
||||
if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here.
|
||||
return 0
|
||||
}
|
||||
// if Shard is manually set and the assigned value is lower than the number of replicas,
|
||||
// then its value is returned otherwise it is the default calculated value
|
||||
if c.Shard != nil && int(*c.Shard) < replicas {
|
||||
return int(*c.Shard)
|
||||
} else {
|
||||
clusterIndexdByClusterIdMap := createClusterIndexByClusterIdMap(db)
|
||||
clusterIndexdByClusterIdMap := createClusterIndexByClusterIdMap(clusters)
|
||||
clusterIndex, ok := clusterIndexdByClusterIdMap[c.ID]
|
||||
if !ok {
|
||||
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
|
||||
@@ -144,6 +155,12 @@ func RoundRobinDistributionFunction(db db.ArgoDB) DistributionFunction {
|
||||
}
|
||||
}
|
||||
|
||||
// NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0
|
||||
// the function is created for API compatibility purposes and is not supposed to be activated.
|
||||
func NoShardingDistributionFunction() DistributionFunction {
|
||||
return func(c *v1alpha1.Cluster) int { return 0 }
|
||||
}
|
||||
|
||||
// InferShard extracts the shard index based on its hostname.
|
||||
func InferShard() (int, error) {
|
||||
hostname, err := osHostnameFunction()
|
||||
@@ -152,33 +169,29 @@ func InferShard() (int, error) {
|
||||
}
|
||||
parts := strings.Split(hostname, "-")
|
||||
if len(parts) == 0 {
|
||||
return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname)
|
||||
log.Warnf("hostname should end with shard number separated by '-' but got: %s", hostname)
|
||||
return 0, nil
|
||||
}
|
||||
shard, err := strconv.Atoi(parts[len(parts)-1])
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname)
|
||||
log.Warnf("hostname should end with shard number separated by '-' but got: %s", hostname)
|
||||
return 0, nil
|
||||
}
|
||||
return int(shard), nil
|
||||
}
|
||||
|
||||
func getSortedClustersList(db db.ArgoDB) []v1alpha1.Cluster {
|
||||
ctx := context.Background()
|
||||
clustersList, dbErr := db.ListClusters(ctx)
|
||||
if dbErr != nil {
|
||||
log.Warnf("Error while querying clusters list from database: %v", dbErr)
|
||||
return []v1alpha1.Cluster{}
|
||||
}
|
||||
clusters := clustersList.Items
|
||||
func getSortedClustersList(getCluster clusterAccessor) []*v1alpha1.Cluster {
|
||||
clusters := getCluster()
|
||||
sort.Slice(clusters, func(i, j int) bool {
|
||||
return clusters[i].ID < clusters[j].ID
|
||||
})
|
||||
return clusters
|
||||
}
|
||||
|
||||
func createClusterIndexByClusterIdMap(db db.ArgoDB) map[string]int {
|
||||
clusters := getSortedClustersList(db)
|
||||
func createClusterIndexByClusterIdMap(getCluster clusterAccessor) map[string]int {
|
||||
clusters := getSortedClustersList(getCluster)
|
||||
log.Debugf("ClustersList has %d items", len(clusters))
|
||||
clusterById := make(map[string]v1alpha1.Cluster)
|
||||
clusterById := make(map[string]*v1alpha1.Cluster)
|
||||
clusterIndexedByClusterId := make(map[string]int)
|
||||
for i, cluster := range clusters {
|
||||
log.Debugf("Adding cluster with id=%s and name=%s to cluster's map", cluster.ID, cluster.Name)
|
||||
@@ -194,7 +207,6 @@ func createClusterIndexByClusterIdMap(db db.ArgoDB) map[string]int {
|
||||
// If the shard value passed to this function is -1, that is, the shard was not set as an environment variable,
|
||||
// we default the shard number to 0 for computing the default config map.
|
||||
func GetOrUpdateShardFromConfigMap(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) {
|
||||
|
||||
hostname, err := osHostnameFunction()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -19,18 +20,20 @@ import (
|
||||
|
||||
func TestGetShardByID_NotEmptyID(t *testing.T) {
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(1)
|
||||
assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "1"}))
|
||||
assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "2"}))
|
||||
assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "3"}))
|
||||
assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "4"}))
|
||||
replicasCount := 1
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount)
|
||||
assert.Equal(t, 0, LegacyDistributionFunction(replicasCount)(&v1alpha1.Cluster{ID: "1"}))
|
||||
assert.Equal(t, 0, LegacyDistributionFunction(replicasCount)(&v1alpha1.Cluster{ID: "2"}))
|
||||
assert.Equal(t, 0, LegacyDistributionFunction(replicasCount)(&v1alpha1.Cluster{ID: "3"}))
|
||||
assert.Equal(t, 0, LegacyDistributionFunction(replicasCount)(&v1alpha1.Cluster{ID: "4"}))
|
||||
}
|
||||
|
||||
func TestGetShardByID_EmptyID(t *testing.T) {
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(1)
|
||||
replicasCount := 1
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount)
|
||||
distributionFunction := LegacyDistributionFunction
|
||||
shard := distributionFunction(db)(&v1alpha1.Cluster{})
|
||||
shard := distributionFunction(replicasCount)(&v1alpha1.Cluster{})
|
||||
assert.Equal(t, 0, shard)
|
||||
}
|
||||
|
||||
@@ -38,7 +41,7 @@ func TestGetShardByID_NoReplicas(t *testing.T) {
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(0)
|
||||
distributionFunction := LegacyDistributionFunction
|
||||
shard := distributionFunction(db)(&v1alpha1.Cluster{})
|
||||
shard := distributionFunction(0)(&v1alpha1.Cluster{})
|
||||
assert.Equal(t, -1, shard)
|
||||
}
|
||||
|
||||
@@ -46,16 +49,16 @@ func TestGetShardByID_NoReplicasUsingHashDistributionFunction(t *testing.T) {
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(0)
|
||||
distributionFunction := LegacyDistributionFunction
|
||||
shard := distributionFunction(db)(&v1alpha1.Cluster{})
|
||||
shard := distributionFunction(0)(&v1alpha1.Cluster{})
|
||||
assert.Equal(t, -1, shard)
|
||||
}
|
||||
|
||||
func TestGetShardByID_NoReplicasUsingHashDistributionFunctionWithClusters(t *testing.T) {
|
||||
db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters()
|
||||
clusters, db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters()
|
||||
// Test with replicas set to 0
|
||||
db.On("GetApplicationControllerReplicas").Return(0)
|
||||
t.Setenv(common.EnvControllerShardingAlgorithm, common.RoundRobinShardingAlgorithm)
|
||||
distributionFunction := RoundRobinDistributionFunction(db)
|
||||
distributionFunction := RoundRobinDistributionFunction(clusters, 0)
|
||||
assert.Equal(t, -1, distributionFunction(nil))
|
||||
assert.Equal(t, -1, distributionFunction(&cluster1))
|
||||
assert.Equal(t, -1, distributionFunction(&cluster2))
|
||||
@@ -65,137 +68,112 @@ func TestGetShardByID_NoReplicasUsingHashDistributionFunctionWithClusters(t *tes
|
||||
}
|
||||
|
||||
func TestGetClusterFilterDefault(t *testing.T) {
|
||||
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
//shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
clusterAccessor, _, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
|
||||
os.Unsetenv(common.EnvControllerShardingAlgorithm)
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
filter := GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), shardIndex)
|
||||
assert.False(t, filter(&v1alpha1.Cluster{ID: "1"}))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{ID: "2"}))
|
||||
assert.False(t, filter(&v1alpha1.Cluster{ID: "3"}))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{ID: "4"}))
|
||||
replicasCount := 2
|
||||
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
|
||||
assert.Equal(t, 0, distributionFunction(nil))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster1))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster2))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster3))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster4))
|
||||
}
|
||||
|
||||
func TestGetClusterFilterLegacy(t *testing.T) {
|
||||
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
//shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
|
||||
replicasCount := 2
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount)
|
||||
t.Setenv(common.EnvControllerShardingAlgorithm, common.LegacyShardingAlgorithm)
|
||||
filter := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
|
||||
assert.False(t, filter(&v1alpha1.Cluster{ID: "1"}))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{ID: "2"}))
|
||||
assert.False(t, filter(&v1alpha1.Cluster{ID: "3"}))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{ID: "4"}))
|
||||
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
|
||||
assert.Equal(t, 0, distributionFunction(nil))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster1))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster2))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster3))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster4))
|
||||
}
|
||||
|
||||
func TestGetClusterFilterUnknown(t *testing.T) {
|
||||
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
|
||||
// Test with replicas set to 0
|
||||
t.Setenv(common.EnvControllerReplicas, "2")
|
||||
os.Unsetenv(common.EnvControllerShardingAlgorithm)
|
||||
t.Setenv(common.EnvControllerShardingAlgorithm, "unknown")
|
||||
filter := GetClusterFilter(db, GetDistributionFunction(db, "unknown"), shardIndex)
|
||||
assert.False(t, filter(&v1alpha1.Cluster{ID: "1"}))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{ID: "2"}))
|
||||
assert.False(t, filter(&v1alpha1.Cluster{ID: "3"}))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{ID: "4"}))
|
||||
replicasCount := 2
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount)
|
||||
distributionFunction := GetDistributionFunction(clusterAccessor, "unknown", replicasCount)
|
||||
assert.Equal(t, 0, distributionFunction(nil))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster1))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster2))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster3))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster4))
|
||||
}
|
||||
|
||||
func TestLegacyGetClusterFilterWithFixedShard(t *testing.T) {
|
||||
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
db := &dbmocks.ArgoDB{}
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
filter := GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), shardIndex)
|
||||
assert.False(t, filter(nil))
|
||||
assert.False(t, filter(&v1alpha1.Cluster{ID: "1"}))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{ID: "2"}))
|
||||
assert.False(t, filter(&v1alpha1.Cluster{ID: "3"}))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{ID: "4"}))
|
||||
//shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
t.Setenv(common.EnvControllerReplicas, "5")
|
||||
clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
|
||||
replicasCount := 5
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount)
|
||||
filter := GetDistributionFunction(clusterAccessor, common.DefaultShardingAlgorithm, replicasCount)
|
||||
assert.Equal(t, 0, filter(nil))
|
||||
assert.Equal(t, 4, filter(&cluster1))
|
||||
assert.Equal(t, 1, filter(&cluster2))
|
||||
assert.Equal(t, 2, filter(&cluster3))
|
||||
assert.Equal(t, 2, filter(&cluster4))
|
||||
|
||||
var fixedShard int64 = 4
|
||||
filter = GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), int(fixedShard))
|
||||
assert.False(t, filter(&v1alpha1.Cluster{ID: "4", Shard: &fixedShard}))
|
||||
cluster5 := &v1alpha1.Cluster{ID: "5", Shard: &fixedShard}
|
||||
clusterAccessor = getClusterAccessor([]v1alpha1.Cluster{cluster1, cluster2, cluster2, cluster4, *cluster5})
|
||||
filter = GetDistributionFunction(clusterAccessor, common.DefaultShardingAlgorithm, replicasCount)
|
||||
assert.Equal(t, int(fixedShard), filter(cluster5))
|
||||
|
||||
fixedShard = 1
|
||||
filter = GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), int(fixedShard))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
|
||||
cluster5.Shard = &fixedShard
|
||||
clusterAccessor = getClusterAccessor([]v1alpha1.Cluster{cluster1, cluster2, cluster2, cluster4, *cluster5})
|
||||
filter = GetDistributionFunction(clusterAccessor, common.DefaultShardingAlgorithm, replicasCount)
|
||||
assert.Equal(t, int(fixedShard), filter(&v1alpha1.Cluster{ID: "4", Shard: &fixedShard}))
|
||||
}
|
||||
|
||||
func TestRoundRobinGetClusterFilterWithFixedShard(t *testing.T) {
|
||||
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
filter := GetClusterFilter(db, GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), shardIndex)
|
||||
assert.False(t, filter(nil))
|
||||
assert.False(t, filter(&cluster1))
|
||||
assert.True(t, filter(&cluster2))
|
||||
assert.False(t, filter(&cluster3))
|
||||
assert.True(t, filter(&cluster4))
|
||||
//shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
t.Setenv(common.EnvControllerReplicas, "4")
|
||||
clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
|
||||
replicasCount := 4
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount)
|
||||
|
||||
filter := GetDistributionFunction(clusterAccessor, common.RoundRobinShardingAlgorithm, replicasCount)
|
||||
assert.Equal(t, filter(nil), 0)
|
||||
assert.Equal(t, filter(&cluster1), 0)
|
||||
assert.Equal(t, filter(&cluster2), 1)
|
||||
assert.Equal(t, filter(&cluster3), 2)
|
||||
assert.Equal(t, filter(&cluster4), 3)
|
||||
|
||||
// a cluster with a fixed shard should be processed by the specified exact
|
||||
// same shard unless the specified shard index is greater than the number of replicas.
|
||||
var fixedShard int64 = 4
|
||||
filter = GetClusterFilter(db, GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard))
|
||||
assert.False(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
|
||||
var fixedShard int64 = 1
|
||||
cluster5 := v1alpha1.Cluster{Name: "cluster5", ID: "5", Shard: &fixedShard}
|
||||
clusters := []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}
|
||||
clusterAccessor = getClusterAccessor(clusters)
|
||||
filter = GetDistributionFunction(clusterAccessor, common.RoundRobinShardingAlgorithm, replicasCount)
|
||||
assert.Equal(t, int(fixedShard), filter(&cluster5))
|
||||
|
||||
fixedShard = 1
|
||||
filter = GetClusterFilter(db, GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
|
||||
}
|
||||
|
||||
func TestGetClusterFilterLegacyHash(t *testing.T) {
|
||||
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
|
||||
t.Setenv(common.EnvControllerShardingAlgorithm, "hash")
|
||||
db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
filter := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
|
||||
assert.False(t, filter(&cluster1))
|
||||
assert.True(t, filter(&cluster2))
|
||||
assert.False(t, filter(&cluster3))
|
||||
assert.True(t, filter(&cluster4))
|
||||
|
||||
// a cluster with a fixed shard should be processed by the specified exact
|
||||
// same shard unless the specified shard index is greater than the number of replicas.
|
||||
var fixedShard int64 = 4
|
||||
filter = GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard))
|
||||
assert.False(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
|
||||
|
||||
fixedShard = 1
|
||||
filter = GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard))
|
||||
assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
|
||||
}
|
||||
|
||||
func TestGetClusterFilterWithEnvControllerShardingAlgorithms(t *testing.T) {
|
||||
db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
|
||||
shardIndex := 1
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
|
||||
t.Run("legacy", func(t *testing.T) {
|
||||
t.Setenv(common.EnvControllerShardingAlgorithm, common.LegacyShardingAlgorithm)
|
||||
shardShouldProcessCluster := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
|
||||
assert.False(t, shardShouldProcessCluster(&cluster1))
|
||||
assert.True(t, shardShouldProcessCluster(&cluster2))
|
||||
assert.False(t, shardShouldProcessCluster(&cluster3))
|
||||
assert.True(t, shardShouldProcessCluster(&cluster4))
|
||||
assert.False(t, shardShouldProcessCluster(nil))
|
||||
})
|
||||
|
||||
t.Run("roundrobin", func(t *testing.T) {
|
||||
t.Setenv(common.EnvControllerShardingAlgorithm, common.RoundRobinShardingAlgorithm)
|
||||
shardShouldProcessCluster := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
|
||||
assert.False(t, shardShouldProcessCluster(&cluster1))
|
||||
assert.True(t, shardShouldProcessCluster(&cluster2))
|
||||
assert.False(t, shardShouldProcessCluster(&cluster3))
|
||||
assert.True(t, shardShouldProcessCluster(&cluster4))
|
||||
assert.False(t, shardShouldProcessCluster(nil))
|
||||
})
|
||||
cluster5 = v1alpha1.Cluster{Name: "cluster5", ID: "5", Shard: &fixedShard}
|
||||
clusters = []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}
|
||||
clusterAccessor = getClusterAccessor(clusters)
|
||||
filter = GetDistributionFunction(clusterAccessor, common.RoundRobinShardingAlgorithm, replicasCount)
|
||||
assert.Equal(t, int(fixedShard), filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
|
||||
}
|
||||
|
||||
func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) {
|
||||
db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters()
|
||||
clusters, db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters()
|
||||
|
||||
t.Run("replicas set to 1", func(t *testing.T) {
|
||||
db.On("GetApplicationControllerReplicas").Return(1).Once()
|
||||
distributionFunction := RoundRobinDistributionFunction(db)
|
||||
replicasCount := 1
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount).Once()
|
||||
distributionFunction := RoundRobinDistributionFunction(clusters, replicasCount)
|
||||
assert.Equal(t, 0, distributionFunction(nil))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster1))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster2))
|
||||
@@ -205,8 +183,9 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("replicas set to 2", func(t *testing.T) {
|
||||
db.On("GetApplicationControllerReplicas").Return(2).Once()
|
||||
distributionFunction := RoundRobinDistributionFunction(db)
|
||||
replicasCount := 2
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount).Once()
|
||||
distributionFunction := RoundRobinDistributionFunction(clusters, replicasCount)
|
||||
assert.Equal(t, 0, distributionFunction(nil))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster1))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster2))
|
||||
@@ -216,8 +195,9 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("replicas set to 3", func(t *testing.T) {
|
||||
db.On("GetApplicationControllerReplicas").Return(3).Once()
|
||||
distributionFunction := RoundRobinDistributionFunction(db)
|
||||
replicasCount := 3
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount).Once()
|
||||
distributionFunction := RoundRobinDistributionFunction(clusters, replicasCount)
|
||||
assert.Equal(t, 0, distributionFunction(nil))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster1))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster2))
|
||||
@@ -233,17 +213,19 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterNumber
|
||||
// Initial tests where showing that under 1024 clusters, execution time was around 400ms
|
||||
// and for 4096 clusters, execution time was under 9s
|
||||
// The other implementation was giving almost linear time of 400ms up to 10'000 clusters
|
||||
db := dbmocks.ArgoDB{}
|
||||
clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{}}
|
||||
clusterPointers := []*v1alpha1.Cluster{}
|
||||
for i := 0; i < 2048; i++ {
|
||||
cluster := createCluster(fmt.Sprintf("cluster-%d", i), fmt.Sprintf("%d", i))
|
||||
clusterList.Items = append(clusterList.Items, cluster)
|
||||
clusterPointers = append(clusterPointers, &cluster)
|
||||
}
|
||||
db.On("ListClusters", mock.Anything).Return(clusterList, nil)
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
distributionFunction := RoundRobinDistributionFunction(&db)
|
||||
for i, c := range clusterList.Items {
|
||||
assert.Equal(t, i%2, distributionFunction(&c))
|
||||
replicasCount := 2
|
||||
t.Setenv(common.EnvControllerReplicas, strconv.Itoa(replicasCount))
|
||||
_, db, _, _, _, _, _ := createTestClusters()
|
||||
clusterAccessor := func() []*v1alpha1.Cluster { return clusterPointers }
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount)
|
||||
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
|
||||
for i, c := range clusterPointers {
|
||||
assert.Equal(t, i%2, distributionFunction(c))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,12 +238,15 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde
|
||||
cluster5 := createCluster("cluster5", "5")
|
||||
cluster6 := createCluster("cluster6", "6")
|
||||
|
||||
clusters := []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}
|
||||
clusterAccessor := getClusterAccessor(clusters)
|
||||
|
||||
clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}}
|
||||
db.On("ListClusters", mock.Anything).Return(clusterList, nil)
|
||||
|
||||
// Test with replicas set to 2
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
distributionFunction := RoundRobinDistributionFunction(&db)
|
||||
replicasCount := 2
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount)
|
||||
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
|
||||
assert.Equal(t, 0, distributionFunction(nil))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster1))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster2))
|
||||
@@ -272,17 +257,20 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde
|
||||
|
||||
// Now, the database knows cluster6. Shard should be assigned a proper shard
|
||||
clusterList.Items = append(clusterList.Items, cluster6)
|
||||
distributionFunction = RoundRobinDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount)
|
||||
assert.Equal(t, 1, distributionFunction(&cluster6))
|
||||
|
||||
// Now, we remove the last added cluster, it should be unassigned as well
|
||||
clusterList.Items = clusterList.Items[:len(clusterList.Items)-1]
|
||||
distributionFunction = RoundRobinDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount)
|
||||
assert.Equal(t, -1, distributionFunction(&cluster6))
|
||||
}
|
||||
|
||||
func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) {
|
||||
db, cluster1, cluster2, _, _, _ := createTestClusters()
|
||||
db.On("GetApplicationControllerReplicas").Return(2)
|
||||
distributionFunction := RoundRobinDistributionFunction(db)
|
||||
clusters, db, cluster1, cluster2, _, _, _ := createTestClusters()
|
||||
replicasCount := 2
|
||||
db.On("GetApplicationControllerReplicas").Return(replicasCount)
|
||||
distributionFunction := RoundRobinDistributionFunction(clusters, replicasCount)
|
||||
|
||||
// Test that the function returns the correct shard for cluster1 and cluster2
|
||||
expectedShardForCluster1 := 0
|
||||
@@ -315,14 +303,14 @@ func TestInferShard(t *testing.T) {
|
||||
|
||||
osHostnameFunction = func() (string, error) { return "exampleshard", nil }
|
||||
_, err = InferShard()
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, err)
|
||||
|
||||
osHostnameFunction = func() (string, error) { return "example-shard", nil }
|
||||
_, err = InferShard()
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func createTestClusters() (*dbmocks.ArgoDB, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster) {
|
||||
func createTestClusters() (clusterAccessor, *dbmocks.ArgoDB, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster) {
|
||||
db := dbmocks.ArgoDB{}
|
||||
cluster1 := createCluster("cluster1", "1")
|
||||
cluster2 := createCluster("cluster2", "2")
|
||||
@@ -330,10 +318,27 @@ func createTestClusters() (*dbmocks.ArgoDB, v1alpha1.Cluster, v1alpha1.Cluster,
|
||||
cluster4 := createCluster("cluster4", "4")
|
||||
cluster5 := createCluster("cluster5", "5")
|
||||
|
||||
clusters := []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}
|
||||
|
||||
db.On("ListClusters", mock.Anything).Return(&v1alpha1.ClusterList{Items: []v1alpha1.Cluster{
|
||||
cluster1, cluster2, cluster3, cluster4, cluster5,
|
||||
}}, nil)
|
||||
return &db, cluster1, cluster2, cluster3, cluster4, cluster5
|
||||
return getClusterAccessor(clusters), &db, cluster1, cluster2, cluster3, cluster4, cluster5
|
||||
}
|
||||
|
||||
func getClusterAccessor(clusters []v1alpha1.Cluster) clusterAccessor {
|
||||
// Convert the array to a slice of pointers
|
||||
clusterPointers := getClusterPointers(clusters)
|
||||
clusterAccessor := func() []*v1alpha1.Cluster { return clusterPointers }
|
||||
return clusterAccessor
|
||||
}
|
||||
|
||||
func getClusterPointers(clusters []v1alpha1.Cluster) []*v1alpha1.Cluster {
|
||||
var clusterPointers []*v1alpha1.Cluster
|
||||
for i := range clusters {
|
||||
clusterPointers = append(clusterPointers, &clusters[i])
|
||||
}
|
||||
return clusterPointers
|
||||
}
|
||||
|
||||
func createCluster(name string, id string) v1alpha1.Cluster {
|
||||
|
||||
@@ -3,6 +3,7 @@ package sharding
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/argoproj/argo-cd/v2/common"
|
||||
@@ -22,9 +23,11 @@ func TestLargeShuffle(t *testing.T) {
|
||||
clusterList.Items = append(clusterList.Items, cluster)
|
||||
}
|
||||
db.On("ListClusters", mock.Anything).Return(clusterList, nil)
|
||||
clusterAccessor := getClusterAccessor(clusterList.Items)
|
||||
// Test with replicas set to 256
|
||||
t.Setenv(common.EnvControllerReplicas, "256")
|
||||
distributionFunction := RoundRobinDistributionFunction(&db)
|
||||
replicasCount := 256
|
||||
t.Setenv(common.EnvControllerReplicas, strconv.Itoa(replicasCount))
|
||||
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
|
||||
for i, c := range clusterList.Items {
|
||||
assert.Equal(t, i%2567, distributionFunction(&c))
|
||||
}
|
||||
@@ -44,10 +47,11 @@ func TestShuffle(t *testing.T) {
|
||||
|
||||
clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5, cluster6}}
|
||||
db.On("ListClusters", mock.Anything).Return(clusterList, nil)
|
||||
|
||||
clusterAccessor := getClusterAccessor(clusterList.Items)
|
||||
// Test with replicas set to 3
|
||||
t.Setenv(common.EnvControllerReplicas, "3")
|
||||
distributionFunction := RoundRobinDistributionFunction(&db)
|
||||
replicasCount := 3
|
||||
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
|
||||
assert.Equal(t, 0, distributionFunction(nil))
|
||||
assert.Equal(t, 0, distributionFunction(&cluster1))
|
||||
assert.Equal(t, 1, distributionFunction(&cluster2))
|
||||
|
||||
@@ -62,6 +62,6 @@ argocd admin cluster namespaces my-cluster
|
||||
* [argocd admin cluster generate-spec](argocd_admin_cluster_generate-spec.md) - Generate declarative config for a cluster
|
||||
* [argocd admin cluster kubeconfig](argocd_admin_cluster_kubeconfig.md) - Generates kubeconfig for the specified cluster
|
||||
* [argocd admin cluster namespaces](argocd_admin_cluster_namespaces.md) - Print information namespaces which Argo CD manages in each cluster.
|
||||
* [argocd admin cluster shards](argocd_admin_cluster_shards.md) - Print information about each controller shard and portion of Kubernetes resources it is responsible for.
|
||||
* [argocd admin cluster shards](argocd_admin_cluster_shards.md) - Print information about each controller shard and the estimated portion of Kubernetes resources it is responsible for.
|
||||
* [argocd admin cluster stats](argocd_admin_cluster_stats.md) - Prints information cluster statistics and inferred shard number
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
## argocd admin cluster shards
|
||||
|
||||
Print information about each controller shard and portion of Kubernetes resources it is responsible for.
|
||||
Print information about each controller shard and the estimated portion of Kubernetes resources it is responsible for.
|
||||
|
||||
```
|
||||
argocd admin cluster shards [flags]
|
||||
@@ -43,6 +43,7 @@ argocd admin cluster shards [flags]
|
||||
--sentinelmaster string Redis sentinel master group name. (default "master")
|
||||
--server string The address and port of the Kubernetes API server
|
||||
--shard int Cluster shard filter (default -1)
|
||||
--sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] (default "legacy")
|
||||
--tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used.
|
||||
--token string Bearer token for authentication to the API server
|
||||
--user string The name of the kubeconfig user to use
|
||||
|
||||
@@ -57,6 +57,7 @@ argocd admin cluster stats target-cluster
|
||||
--sentinelmaster string Redis sentinel master group name. (default "master")
|
||||
--server string The address and port of the Kubernetes API server
|
||||
--shard int Cluster shard filter (default -1)
|
||||
--sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] (default "legacy")
|
||||
--tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used.
|
||||
--token string Bearer token for authentication to the API server
|
||||
--user string The name of the kubeconfig user to use
|
||||
|
||||
@@ -38,7 +38,7 @@ https://kubernetes.default.svc in-cluster %v Successful `, GetVe
|
||||
When().
|
||||
CreateApp()
|
||||
|
||||
tries := 2
|
||||
tries := 5
|
||||
for i := 0; i <= tries; i += 1 {
|
||||
clusterFixture.GivenWithSameState(t).
|
||||
When().
|
||||
|
||||
Reference in New Issue
Block a user