Compare commits

...

34 Commits

Author SHA1 Message Date
Michael Crenshaw
eb9ba3052c explicit
Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
2023-09-22 11:25:44 -04:00
Michael Crenshaw
5879bd1c79 better?
Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
2023-09-22 11:21:49 -04:00
Michael Crenshaw
cd08afcf98 accuracy
Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
2023-09-22 10:59:19 -04:00
Michael Crenshaw
ac0c8b91e7 not exactly random
Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
2023-09-22 10:54:54 -04:00
Michael Crenshaw
d50a18bd04 update docs to avoid focusing on StatefulSet/Deployment
Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
2023-09-22 10:47:38 -04:00
ishitasequeira
5d42013cda revert go,mod changes
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-22 10:10:58 -04:00
ishitasequeira
e27c3de3db Add feature status link
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-22 09:59:08 -04:00
ishitasequeira
bb69ec7f89 Marking the feature as alpha
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-22 09:53:42 -04:00
ishitasequeira
d6f12240df fix nit
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-22 09:28:14 -04:00
ishitasequeira
1500a2d722 Add an overlay for application controller deployment and update documentation
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-21 21:39:56 -04:00
ishitasequeira
d586314d93 update documentation
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-21 09:22:01 -04:00
ishitasequeira
422cf47af8 Add documentation for the new dynamic distribution feature
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-21 02:25:41 -04:00
ishitasequeira
7eba2288f0 reverting updated docs
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 22:59:10 -04:00
ishitasequeira
579d515e89 revert ApplicationController manifest to StatefulSet
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 22:51:22 -04:00
ishitasequeira
ff31a459ba Add more documentation
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:50:49 -04:00
ishitasequeira
6b9c2c0219 remove unwanted code and logs
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:50:49 -04:00
ishitasequeira
4bd285da98 update godoc
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:50:48 -04:00
ishitasequeira
704eb2cdb8 Add Informer, Update documentation, add unit tests
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:50:48 -04:00
ishitasequeira
ccf8b332d4 Address comments
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
91fd696edb Add nil check on replica count for deployment of application controller
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
20e9a128fb read environment variable for application controller deployment name
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
e0145c04b3 revert commented test case
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
cb184e9e54 revert commented readiness probe
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
e64cee7be7 "comment out readiness probe to see if e2e tests succeed"
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
057732e6b1 increase readiness probe interval period
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
28ed4b021e comment out failing e2e test
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
ea97b2045f fix lint and e2e tests
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
81ce977df6 fix manifests
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
01b402783e use the logic of pre-specified shard number on application controller pod
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
97027798db Add retry logic, heartbeat timeout environment variable
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
e4ea28e9c5 Add heartbeat as an environment variable
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
af120d92de Update sharding logic and add comments
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
92992dc721 Add sharding deployment logic
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:12 -04:00
ishitasequeira
dfbac574da Migrate Application Controller from Statefulset to Deployment
Signed-off-by: ishitasequeira <ishiseq29@gmail.com>
2023-09-20 10:44:11 -04:00
22 changed files with 1239 additions and 94 deletions

View File

@@ -30,11 +30,13 @@ import (
"github.com/argoproj/argo-cd/v2/util/settings"
"github.com/argoproj/argo-cd/v2/util/tls"
"github.com/argoproj/argo-cd/v2/util/trace"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
// CLIName is the name of the CLI
cliName = "argocd-application-controller"
cliName = common.ApplicationController
// Default time in seconds for application resync period
defaultAppResyncPeriod = 180
// Default time in seconds for application hard resync period
@@ -92,7 +94,7 @@ func NewCommand() *cobra.Command {
config, err := clientConfig.ClientConfig()
errors.CheckError(err)
errors.CheckError(v1alpha1.SetK8SConfigDefaults(config))
config.UserAgent = fmt.Sprintf("argocd-application-controller/%s (%s)", vers.Version, vers.Platform)
config.UserAgent = fmt.Sprintf("%s/%s (%s)", common.DefaultApplicationControllerName, vers.Version, vers.Platform)
kubeClient := kubernetes.NewForConfigOrDie(config)
appClient := appclientset.NewForConfigOrDie(config)
@@ -138,6 +140,7 @@ func NewCommand() *cobra.Command {
}))
kubectl := kubeutil.NewKubectl()
clusterFilter := getClusterFilter(kubeClient, settingsMgr, shardingAlgorithm)
errors.CheckError(err)
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
@@ -208,20 +211,49 @@ func NewCommand() *cobra.Command {
}
func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string) sharding.ClusterFilterFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
var replicas int
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, _ := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})
if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicas = int(*appControllerDeployment.Spec.Replicas)
} else {
replicas = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}
var clusterFilter func(cluster *v1alpha1.Cluster) bool
if replicas > 1 {
if shard < 0 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if appControllerDeployment != nil {
var err error
shard, err = sharding.InferShard()
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
shard, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicas, shard)
if !kubeerrors.IsConflict(err) {
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err)
break
}
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
}
errors.CheckError(err)
} else {
if shard < 0 {
var err error
shard, err = sharding.InferShard()
errors.CheckError(err)
}
}
log.Infof("Processing clusters from shard %d", shard)
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
log.Infof("Using filter function: %s", shardingAlgorithm)
distributionFunction := sharding.GetDistributionFunction(db, shardingAlgorithm)
clusterFilter = sharding.GetClusterFilter(distributionFunction, shard)
clusterFilter = sharding.GetClusterFilter(db, distributionFunction, shard)
} else {
log.Info("Processing all cluster shards")
}

View File

@@ -12,6 +12,11 @@ import (
"google.golang.org/grpc/status"
)
// Component names
const (
ApplicationController = "argocd-application-controller"
)
// Default service addresses and URLS of Argo CD internal services
const (
// DefaultRepoServerAddr is the gRPC address of the Argo CD repo server
@@ -34,6 +39,8 @@ const (
// ArgoCDTLSCertsConfigMapName contains TLS certificate data for connecting repositories. Will get mounted as volume to pods
ArgoCDTLSCertsConfigMapName = "argocd-tls-certs-cm"
ArgoCDGPGKeysConfigMapName = "argocd-gpg-keys-cm"
// ArgoCDAppControllerShardConfigMapName contains the application controller to shard mapping
ArgoCDAppControllerShardConfigMapName = "argocd-app-controller-shard-cm"
)
// Some default configurables
@@ -109,6 +116,8 @@ const (
// RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution accross all shards
RoundRobinShardingAlgorithm = "round-robin"
DefaultShardingAlgorithm = LegacyShardingAlgorithm
// AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller
AppControllerHeartbeatUpdateRetryCount = 3
)
// Dex related constants
@@ -209,6 +218,8 @@ const (
EnvPauseGenerationRequests = "ARGOCD_PAUSE_GEN_REQUESTS"
// EnvControllerReplicas is the number of controller replicas
EnvControllerReplicas = "ARGOCD_CONTROLLER_REPLICAS"
// EnvControllerHeartbeatTime will update the heartbeat for application controller to claim shard
EnvControllerHeartbeatTime = "ARGOCD_CONTROLLER_HEARTBEAT_TIME"
// EnvControllerShard is the shard number that should be handled by controller
EnvControllerShard = "ARGOCD_CONTROLLER_SHARD"
// EnvControllerShardingAlgorithm is the distribution sharding algorithm to be used: legacy or round-robin

View File

@@ -34,6 +34,8 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
informerv1 "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@@ -51,6 +53,7 @@ import (
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"github.com/argoproj/argo-cd/v2/util/argo"
argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff"
"github.com/argoproj/argo-cd/v2/util/env"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/db"
@@ -59,10 +62,12 @@ import (
"github.com/argoproj/argo-cd/v2/util/helm"
logutils "github.com/argoproj/argo-cd/v2/util/log"
settings_util "github.com/argoproj/argo-cd/v2/util/settings"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
)
const (
updateOperationStateTimeout = 1 * time.Second
updateOperationStateTimeout = 1 * time.Second
defaultDeploymentInformerResyncDuration = 10
// orphanedIndex contains application which monitor orphaned resources by namespace
orphanedIndex = "orphaned"
)
@@ -105,6 +110,7 @@ type ApplicationController struct {
appInformer cache.SharedIndexInformer
appLister applisters.ApplicationLister
projInformer cache.SharedIndexInformer
deploymentInformer informerv1.DeploymentInformer
appStateManager AppStateManager
stateCache statecache.LiveStateCache
statusRefreshTimeout time.Duration
@@ -160,7 +166,7 @@ func NewApplicationController(
statusHardRefreshTimeout: appHardResyncPeriod,
refreshRequestedApps: make(map[string]CompareWith),
refreshRequestedAppsMutex: &sync.Mutex{},
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, "argocd-application-controller"),
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
settingsMgr: settingsMgr,
selfHealTimeout: selfHealTimeout,
clusterFilter: clusterFilter,
@@ -201,11 +207,31 @@ func NewApplicationController(
}
},
})
factory := informers.NewSharedInformerFactory(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration)
deploymentInformer := factory.Apps().V1().Deployments()
readinessHealthCheck := func(r *http.Request) error {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName)
if !kubeerrors.IsNotFound(err) {
return fmt.Errorf("error retrieving Application Controller Deployment: %s", err)
}
if appControllerDeployment != nil {
if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 {
return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
}
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err)
}
}
return nil
}
metricsAddr := fmt.Sprintf("0.0.0.0:%d", metricsPort)
var err error
ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, func(r *http.Request) error {
return nil
}, metricsApplicationLabels)
ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, readinessHealthCheck, metricsApplicationLabels)
if err != nil {
return nil, err
}
@@ -220,6 +246,7 @@ func NewApplicationController(
ctrl.appInformer = appInformer
ctrl.appLister = appLister
ctrl.projInformer = projInformer
ctrl.deploymentInformer = deploymentInformer
ctrl.appStateManager = appStateManager
ctrl.stateCache = stateCache
@@ -724,6 +751,7 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int
go ctrl.appInformer.Run(ctx.Done())
go ctrl.projInformer.Run(ctx.Done())
go ctrl.deploymentInformer.Informer().Run(ctx.Done())
errors.CheckError(ctrl.stateCache.Init())

View File

@@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
"github.com/argoproj/argo-cd/v2/common"
argoappv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
applister "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/git"
@@ -260,12 +261,12 @@ func (m *MetricsServer) IncKubernetesRequest(app *argoappv1.Application, server,
}
func (m *MetricsServer) IncRedisRequest(failed bool) {
m.redisRequestCounter.WithLabelValues(m.hostname, "argocd-application-controller", strconv.FormatBool(failed)).Inc()
m.redisRequestCounter.WithLabelValues(m.hostname, common.ApplicationController, strconv.FormatBool(failed)).Inc()
}
// ObserveRedisRequestDuration observes redis request duration
func (m *MetricsServer) ObserveRedisRequestDuration(duration time.Duration) {
m.redisRequestHistogram.WithLabelValues(m.hostname, "argocd-application-controller").Observe(duration.Seconds())
m.redisRequestHistogram.WithLabelValues(m.hostname, common.ApplicationController).Observe(duration.Seconds())
}
// IncReconcile increments the reconcile counter for an application

View File

@@ -4,32 +4,57 @@ import (
"context"
"fmt"
"hash/fnv"
"math"
"os"
"sort"
"strconv"
"strings"
"time"
"encoding/json"
"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/settings"
log "github.com/sirupsen/logrus"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
)
// Make it overridable for testing
var osHostnameFunction = os.Hostname
// Make it overridable for testing
var heartbeatCurrentTime = metav1.Now
var (
HeartbeatDuration = env.ParseNumFromEnv(common.EnvControllerHeartbeatTime, 10, 10, 60)
HeartbeatTimeout = 3 * HeartbeatDuration
)
const ShardControllerMappingKey = "shardControllerMapping"
type DistributionFunction func(c *v1alpha1.Cluster) int
type ClusterFilterFunction func(c *v1alpha1.Cluster) bool
// shardApplicationControllerMapping stores the mapping of Shard Number to Application Controller in ConfigMap.
// It also stores the heartbeat of last synced time of the application controller.
type shardApplicationControllerMapping struct {
ShardNumber int
ControllerName string
HeartbeatTime metav1.Time
}
// GetClusterFilter returns a ClusterFilterFunction which is a function taking a cluster as a parameter
// and returns wheter or not the cluster should be processed by a given shard. It calls the distributionFunction
// to determine which shard will process the cluster, and if the given shard is equal to the calculated shard
// the function will return true.
func GetClusterFilter(distributionFunction DistributionFunction, shard int) ClusterFilterFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
func GetClusterFilter(db db.ArgoDB, distributionFunction DistributionFunction, shard int) ClusterFilterFunction {
replicas := db.GetApplicationControllerReplicas()
return func(c *v1alpha1.Cluster) bool {
clusterShard := 0
if c != nil && c.Shard != nil {
@@ -50,12 +75,12 @@ func GetClusterFilter(distributionFunction DistributionFunction, shard int) Clus
// the current datas.
func GetDistributionFunction(db db.ArgoDB, shardingAlgorithm string) DistributionFunction {
log.Infof("Using filter function: %s", shardingAlgorithm)
distributionFunction := LegacyDistributionFunction()
distributionFunction := LegacyDistributionFunction(db)
switch shardingAlgorithm {
case common.RoundRobinShardingAlgorithm:
distributionFunction = RoundRobinDistributionFunction(db)
case common.LegacyShardingAlgorithm:
distributionFunction = LegacyDistributionFunction()
distributionFunction = LegacyDistributionFunction(db)
default:
log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
}
@@ -67,8 +92,8 @@ func GetDistributionFunction(db db.ArgoDB, shardingAlgorithm string) Distributio
// is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as
// some shards may get assigned more clusters than others. It is the legacy function distribution that is
// kept for compatibility reasons
func LegacyDistributionFunction() DistributionFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
func LegacyDistributionFunction(db db.ArgoDB) DistributionFunction {
replicas := db.GetApplicationControllerReplicas()
return func(c *v1alpha1.Cluster) int {
if replicas == 0 {
return -1
@@ -97,7 +122,7 @@ func LegacyDistributionFunction() DistributionFunction {
// clusters +/-1 , but with the drawback of a reshuffling of clusters accross shards in case of some changes
// in the cluster list
func RoundRobinDistributionFunction(db db.ArgoDB) DistributionFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
replicas := db.GetApplicationControllerReplicas()
return func(c *v1alpha1.Cluster) int {
if replicas > 0 {
if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here.
@@ -123,7 +148,7 @@ func RoundRobinDistributionFunction(db db.ArgoDB) DistributionFunction {
func InferShard() (int, error) {
hostname, err := osHostnameFunction()
if err != nil {
return 0, err
return -1, err
}
parts := strings.Split(hostname, "-")
if len(parts) == 0 {
@@ -162,3 +187,167 @@ func createClusterIndexByClusterIdMap(db db.ArgoDB) map[string]int {
}
return clusterIndexedByClusterId
}
// GetOrUpdateShardFromConfigMap finds the shard number from the shard mapping configmap. If the shard mapping configmap does not exist,
// the function creates the shard mapping configmap.
// The function takes the shard number from the environment variable (default value -1, if not set) and passes it to this function.
// If the shard value passed to this function is -1, that is, the shard was not set as an environment variable,
// we default the shard number to 0 for computing the default config map.
func GetOrUpdateShardFromConfigMap(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) {
hostname, err := osHostnameFunction()
if err != nil {
return -1, err
}
// fetch the shard mapping configMap
shardMappingCM, err := kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Get(context.Background(), common.ArgoCDAppControllerShardConfigMapName, metav1.GetOptions{})
if err != nil {
if !kubeerrors.IsNotFound(err) {
return -1, fmt.Errorf("error getting sharding config map: %s", err)
}
log.Infof("shard mapping configmap %s not found. Creating default shard mapping configmap.", common.ArgoCDAppControllerShardConfigMapName)
// if the shard is not set as an environment variable, set the default value of shard to 0 for generating default CM
if shard == -1 {
shard = 0
}
shardMappingCM, err = generateDefaultShardMappingCM(settingsMgr.GetNamespace(), hostname, replicas, shard)
if err != nil {
return -1, fmt.Errorf("error generating default shard mapping configmap %s", err)
}
if _, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Create(context.Background(), shardMappingCM, metav1.CreateOptions{}); err != nil {
return -1, fmt.Errorf("error creating shard mapping configmap %s", err)
}
// return 0 as the controller is assigned to shard 0 while generating default shard mapping ConfigMap
return shard, nil
} else {
// Identify the available shard and update the ConfigMap
data := shardMappingCM.Data[ShardControllerMappingKey]
var shardMappingData []shardApplicationControllerMapping
err := json.Unmarshal([]byte(data), &shardMappingData)
if err != nil {
return -1, fmt.Errorf("error unmarshalling shard config map data: %s", err)
}
shard, shardMappingData := getOrUpdateShardNumberForController(shardMappingData, hostname, replicas, shard)
updatedShardMappingData, err := json.Marshal(shardMappingData)
if err != nil {
return -1, fmt.Errorf("error marshalling data of shard mapping ConfigMap: %s", err)
}
shardMappingCM.Data[ShardControllerMappingKey] = string(updatedShardMappingData)
_, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Update(context.Background(), shardMappingCM, metav1.UpdateOptions{})
if err != nil {
return -1, err
}
return shard, nil
}
}
// getOrUpdateShardNumberForController takes list of shardApplicationControllerMapping and performs computation to find the matching or empty shard number
func getOrUpdateShardNumberForController(shardMappingData []shardApplicationControllerMapping, hostname string, replicas, shard int) (int, []shardApplicationControllerMapping) {
// if current length of shardMappingData in shard mapping configMap is less than the number of replicas,
// create additional empty entries for missing shard numbers in shardMappingDataconfigMap
if len(shardMappingData) < replicas {
// generate extra default mappings
for currentShard := len(shardMappingData); currentShard < replicas; currentShard++ {
shardMappingData = append(shardMappingData, shardApplicationControllerMapping{
ShardNumber: currentShard,
})
}
}
// if current length of shardMappingData in shard mapping configMap is more than the number of replicas,
// we replace the config map with default config map and let controllers self assign the new shard to itself
if len(shardMappingData) > replicas {
shardMappingData = getDefaultShardMappingData(replicas)
}
if shard != -1 && shard < replicas {
log.Debugf("update heartbeat for shard %d", shard)
for i := range shardMappingData {
shardMapping := shardMappingData[i]
if shardMapping.ShardNumber == shard {
log.Debugf("Shard found. Updating heartbeat!!")
shardMapping.ControllerName = hostname
shardMapping.HeartbeatTime = heartbeatCurrentTime()
shardMappingData[i] = shardMapping
break
}
}
} else {
// find the matching shard with assigned controllerName
for i := range shardMappingData {
shardMapping := shardMappingData[i]
if shardMapping.ControllerName == hostname {
log.Debugf("Shard matched. Updating heartbeat!!")
shard = int(shardMapping.ShardNumber)
shardMapping.HeartbeatTime = heartbeatCurrentTime()
shardMappingData[i] = shardMapping
break
}
}
}
// at this point, we have still not found a shard with matching hostname.
// So, find a shard with either no controller assigned or assigned controller
// with heartbeat past threshold
if shard == -1 {
for i := range shardMappingData {
shardMapping := shardMappingData[i]
if (shardMapping.ControllerName == "") || (metav1.Now().After(shardMapping.HeartbeatTime.Add(time.Duration(HeartbeatTimeout) * time.Second))) {
shard = int(shardMapping.ShardNumber)
log.Debugf("Empty shard found %d", shard)
shardMapping.ControllerName = hostname
shardMapping.HeartbeatTime = heartbeatCurrentTime()
shardMappingData[i] = shardMapping
break
}
}
}
return shard, shardMappingData
}
// generateDefaultShardMappingCM creates a default shard mapping configMap. Assigns current controller to shard 0.
func generateDefaultShardMappingCM(namespace, hostname string, replicas, shard int) (*v1.ConfigMap, error) {
shardingCM := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDAppControllerShardConfigMapName,
Namespace: namespace,
},
Data: map[string]string{},
}
shardMappingData := getDefaultShardMappingData(replicas)
// if shard is not assigned to a controller, we use shard 0
if shard == -1 || shard > replicas {
shard = 0
}
shardMappingData[shard].ControllerName = hostname
shardMappingData[shard].HeartbeatTime = heartbeatCurrentTime()
data, err := json.Marshal(shardMappingData)
if err != nil {
return nil, fmt.Errorf("error generating default ConfigMap: %s", err)
}
shardingCM.Data[ShardControllerMappingKey] = string(data)
return shardingCM, nil
}
func getDefaultShardMappingData(replicas int) []shardApplicationControllerMapping {
shardMappingData := make([]shardApplicationControllerMapping, 0)
for i := 0; i < replicas; i++ {
mapping := shardApplicationControllerMapping{
ShardNumber: i,
}
shardMappingData = append(shardMappingData, mapping)
}
return shardMappingData
}

View File

@@ -1,51 +1,59 @@
package sharding
import (
"encoding/json"
"errors"
"fmt"
"os"
"testing"
"time"
"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestGetShardByID_NotEmptyID(t *testing.T) {
t.Setenv(common.EnvControllerReplicas, "1")
assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "1"}))
assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "2"}))
assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "3"}))
assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "4"}))
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(1)
assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "1"}))
assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "2"}))
assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "3"}))
assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "4"}))
}
func TestGetShardByID_EmptyID(t *testing.T) {
t.Setenv(common.EnvControllerReplicas, "1")
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(1)
distributionFunction := LegacyDistributionFunction
shard := distributionFunction()(&v1alpha1.Cluster{})
shard := distributionFunction(db)(&v1alpha1.Cluster{})
assert.Equal(t, 0, shard)
}
func TestGetShardByID_NoReplicas(t *testing.T) {
t.Setenv(common.EnvControllerReplicas, "0")
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(0)
distributionFunction := LegacyDistributionFunction
shard := distributionFunction()(&v1alpha1.Cluster{})
shard := distributionFunction(db)(&v1alpha1.Cluster{})
assert.Equal(t, -1, shard)
}
func TestGetShardByID_NoReplicasUsingHashDistributionFunction(t *testing.T) {
t.Setenv(common.EnvControllerReplicas, "0")
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(0)
distributionFunction := LegacyDistributionFunction
shard := distributionFunction()(&v1alpha1.Cluster{})
shard := distributionFunction(db)(&v1alpha1.Cluster{})
assert.Equal(t, -1, shard)
}
func TestGetShardByID_NoReplicasUsingHashDistributionFunctionWithClusters(t *testing.T) {
db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters()
// Test with replicas set to 0
t.Setenv(common.EnvControllerReplicas, "0")
db.On("GetApplicationControllerReplicas").Return(0)
t.Setenv(common.EnvControllerShardingAlgorithm, common.RoundRobinShardingAlgorithm)
distributionFunction := RoundRobinDistributionFunction(db)
assert.Equal(t, -1, distributionFunction(nil))
@@ -59,8 +67,9 @@ func TestGetShardByID_NoReplicasUsingHashDistributionFunctionWithClusters(t *tes
func TestGetClusterFilterDefault(t *testing.T) {
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
os.Unsetenv(common.EnvControllerShardingAlgorithm)
t.Setenv(common.EnvControllerReplicas, "2")
filter := GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), shardIndex)
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(2)
filter := GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), shardIndex)
assert.False(t, filter(&v1alpha1.Cluster{ID: "1"}))
assert.True(t, filter(&v1alpha1.Cluster{ID: "2"}))
assert.False(t, filter(&v1alpha1.Cluster{ID: "3"}))
@@ -69,9 +78,10 @@ func TestGetClusterFilterDefault(t *testing.T) {
func TestGetClusterFilterLegacy(t *testing.T) {
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
t.Setenv(common.EnvControllerReplicas, "2")
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(2)
t.Setenv(common.EnvControllerShardingAlgorithm, common.LegacyShardingAlgorithm)
filter := GetClusterFilter(GetDistributionFunction(nil, common.LegacyShardingAlgorithm), shardIndex)
filter := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
assert.False(t, filter(&v1alpha1.Cluster{ID: "1"}))
assert.True(t, filter(&v1alpha1.Cluster{ID: "2"}))
assert.False(t, filter(&v1alpha1.Cluster{ID: "3"}))
@@ -80,9 +90,10 @@ func TestGetClusterFilterLegacy(t *testing.T) {
func TestGetClusterFilterUnknown(t *testing.T) {
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
t.Setenv(common.EnvControllerReplicas, "2")
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(2)
t.Setenv(common.EnvControllerShardingAlgorithm, "unknown")
filter := GetClusterFilter(GetDistributionFunction(nil, "unknown"), shardIndex)
filter := GetClusterFilter(db, GetDistributionFunction(db, "unknown"), shardIndex)
assert.False(t, filter(&v1alpha1.Cluster{ID: "1"}))
assert.True(t, filter(&v1alpha1.Cluster{ID: "2"}))
assert.False(t, filter(&v1alpha1.Cluster{ID: "3"}))
@@ -91,8 +102,9 @@ func TestGetClusterFilterUnknown(t *testing.T) {
func TestLegacyGetClusterFilterWithFixedShard(t *testing.T) {
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
t.Setenv(common.EnvControllerReplicas, "2")
filter := GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), shardIndex)
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(2)
filter := GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), shardIndex)
assert.False(t, filter(nil))
assert.False(t, filter(&v1alpha1.Cluster{ID: "1"}))
assert.True(t, filter(&v1alpha1.Cluster{ID: "2"}))
@@ -100,20 +112,19 @@ func TestLegacyGetClusterFilterWithFixedShard(t *testing.T) {
assert.True(t, filter(&v1alpha1.Cluster{ID: "4"}))
var fixedShard int64 = 4
filter = GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), int(fixedShard))
filter = GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), int(fixedShard))
assert.False(t, filter(&v1alpha1.Cluster{ID: "4", Shard: &fixedShard}))
fixedShard = 1
filter = GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), int(fixedShard))
filter = GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), int(fixedShard))
assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
}
func TestRoundRobinGetClusterFilterWithFixedShard(t *testing.T) {
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
t.Setenv(common.EnvControllerReplicas, "2")
db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
filter := GetClusterFilter(GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), shardIndex)
db.On("GetApplicationControllerReplicas").Return(2)
filter := GetClusterFilter(db, GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), shardIndex)
assert.False(t, filter(nil))
assert.False(t, filter(&cluster1))
assert.True(t, filter(&cluster2))
@@ -123,20 +134,20 @@ func TestRoundRobinGetClusterFilterWithFixedShard(t *testing.T) {
// a cluster with a fixed shard should be processed by the specified exact
// same shard unless the specified shard index is greater than the number of replicas.
var fixedShard int64 = 4
filter = GetClusterFilter(GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard))
filter = GetClusterFilter(db, GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard))
assert.False(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
fixedShard = 1
filter = GetClusterFilter(GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard))
filter = GetClusterFilter(db, GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard))
assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
}
func TestGetClusterFilterLegacyHash(t *testing.T) {
shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
t.Setenv(common.EnvControllerReplicas, "2")
t.Setenv(common.EnvControllerShardingAlgorithm, "hash")
db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
filter := GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
db.On("GetApplicationControllerReplicas").Return(2)
filter := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
assert.False(t, filter(&cluster1))
assert.True(t, filter(&cluster2))
assert.False(t, filter(&cluster3))
@@ -145,22 +156,22 @@ func TestGetClusterFilterLegacyHash(t *testing.T) {
// a cluster with a fixed shard should be processed by the specified exact
// same shard unless the specified shard index is greater than the number of replicas.
var fixedShard int64 = 4
filter = GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard))
filter = GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard))
assert.False(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
fixedShard = 1
filter = GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard))
filter = GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard))
assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
}
func TestGetClusterFilterWithEnvControllerShardingAlgorithms(t *testing.T) {
db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
shardIndex := 1
t.Setenv(common.EnvControllerReplicas, "2")
db.On("GetApplicationControllerReplicas").Return(2)
t.Run("legacy", func(t *testing.T) {
t.Setenv(common.EnvControllerShardingAlgorithm, common.LegacyShardingAlgorithm)
shardShouldProcessCluster := GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
shardShouldProcessCluster := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
assert.False(t, shardShouldProcessCluster(&cluster1))
assert.True(t, shardShouldProcessCluster(&cluster2))
assert.False(t, shardShouldProcessCluster(&cluster3))
@@ -170,7 +181,7 @@ func TestGetClusterFilterWithEnvControllerShardingAlgorithms(t *testing.T) {
t.Run("roundrobin", func(t *testing.T) {
t.Setenv(common.EnvControllerShardingAlgorithm, common.RoundRobinShardingAlgorithm)
shardShouldProcessCluster := GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
shardShouldProcessCluster := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex)
assert.False(t, shardShouldProcessCluster(&cluster1))
assert.True(t, shardShouldProcessCluster(&cluster2))
assert.False(t, shardShouldProcessCluster(&cluster3))
@@ -183,7 +194,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) {
db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters()
t.Run("replicas set to 1", func(t *testing.T) {
t.Setenv(common.EnvControllerReplicas, "1")
db.On("GetApplicationControllerReplicas").Return(1).Once()
distributionFunction := RoundRobinDistributionFunction(db)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
@@ -194,7 +205,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) {
})
t.Run("replicas set to 2", func(t *testing.T) {
t.Setenv(common.EnvControllerReplicas, "2")
db.On("GetApplicationControllerReplicas").Return(2).Once()
distributionFunction := RoundRobinDistributionFunction(db)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
@@ -205,7 +216,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) {
})
t.Run("replicas set to 3", func(t *testing.T) {
t.Setenv(common.EnvControllerReplicas, "3")
db.On("GetApplicationControllerReplicas").Return(3).Once()
distributionFunction := RoundRobinDistributionFunction(db)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
@@ -229,7 +240,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterNumber
clusterList.Items = append(clusterList.Items, cluster)
}
db.On("ListClusters", mock.Anything).Return(clusterList, nil)
t.Setenv(common.EnvControllerReplicas, "2")
db.On("GetApplicationControllerReplicas").Return(2)
distributionFunction := RoundRobinDistributionFunction(&db)
for i, c := range clusterList.Items {
assert.Equal(t, i%2, distributionFunction(&c))
@@ -249,7 +260,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde
db.On("ListClusters", mock.Anything).Return(clusterList, nil)
// Test with replicas set to 2
t.Setenv(common.EnvControllerReplicas, "2")
db.On("GetApplicationControllerReplicas").Return(2)
distributionFunction := RoundRobinDistributionFunction(&db)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
@@ -270,7 +281,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde
func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) {
db, cluster1, cluster2, _, _, _ := createTestClusters()
t.Setenv(common.EnvControllerReplicas, "2")
db.On("GetApplicationControllerReplicas").Return(2)
distributionFunction := RoundRobinDistributionFunction(db)
// Test that the function returns the correct shard for cluster1 and cluster2
@@ -291,8 +302,8 @@ func TestInferShard(t *testing.T) {
// Override the os.Hostname function to return a specific hostname for testing
defer func() { osHostnameFunction = os.Hostname }()
osHostnameFunction = func() (string, error) { return "example-shard-3", nil }
expectedShard := 3
osHostnameFunction = func() (string, error) { return "example-shard-3", nil }
actualShard, _ := InferShard()
assert.Equal(t, expectedShard, actualShard)
@@ -333,3 +344,335 @@ func createCluster(name string, id string) v1alpha1.Cluster {
}
return cluster
}
func Test_getDefaultShardMappingData(t *testing.T) {
expectedData := []shardApplicationControllerMapping{
{
ShardNumber: 0,
ControllerName: "",
}, {
ShardNumber: 1,
ControllerName: "",
},
}
shardMappingData := getDefaultShardMappingData(2)
assert.Equal(t, expectedData, shardMappingData)
}
func Test_generateDefaultShardMappingCM_NoPredefinedShard(t *testing.T) {
replicas := 2
expectedTime := metav1.Now()
defer func() { osHostnameFunction = os.Hostname }()
defer func() { heartbeatCurrentTime = metav1.Now }()
expectedMapping := []shardApplicationControllerMapping{
{
ShardNumber: 0,
ControllerName: "test-example",
HeartbeatTime: expectedTime,
}, {
ShardNumber: 1,
},
}
expectedMappingCM, err := json.Marshal(expectedMapping)
assert.NoError(t, err)
expectedShadingCM := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDAppControllerShardConfigMapName,
Namespace: "test",
},
Data: map[string]string{
"shardControllerMapping": string(expectedMappingCM),
},
}
heartbeatCurrentTime = func() metav1.Time { return expectedTime }
osHostnameFunction = func() (string, error) { return "test-example", nil }
shardingCM, err := generateDefaultShardMappingCM("test", "test-example", replicas, -1)
assert.NoError(t, err)
assert.Equal(t, expectedShadingCM, shardingCM)
}
func Test_generateDefaultShardMappingCM_PredefinedShard(t *testing.T) {
replicas := 2
expectedTime := metav1.Now()
defer func() { osHostnameFunction = os.Hostname }()
defer func() { heartbeatCurrentTime = metav1.Now }()
expectedMapping := []shardApplicationControllerMapping{
{
ShardNumber: 0,
}, {
ShardNumber: 1,
ControllerName: "test-example",
HeartbeatTime: expectedTime,
},
}
expectedMappingCM, err := json.Marshal(expectedMapping)
assert.NoError(t, err)
expectedShadingCM := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDAppControllerShardConfigMapName,
Namespace: "test",
},
Data: map[string]string{
"shardControllerMapping": string(expectedMappingCM),
},
}
heartbeatCurrentTime = func() metav1.Time { return expectedTime }
osHostnameFunction = func() (string, error) { return "test-example", nil }
shardingCM, err := generateDefaultShardMappingCM("test", "test-example", replicas, 1)
assert.NoError(t, err)
assert.Equal(t, expectedShadingCM, shardingCM)
}
func Test_getOrUpdateShardNumberForController(t *testing.T) {
expectedTime := metav1.Now()
testCases := []struct {
name string
shardApplicationControllerMapping []shardApplicationControllerMapping
hostname string
replicas int
shard int
expectedShard int
expectedShardMappingData []shardApplicationControllerMapping
}{
{
name: "length of shard mapping less than number of replicas - Existing controller",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
},
},
hostname: "test-example",
replicas: 2,
shard: -1,
expectedShard: 0,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "",
ShardNumber: 1,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
},
},
},
{
name: "length of shard mapping less than number of replicas - New controller",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example-1",
replicas: 2,
shard: -1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
{
name: "length of shard mapping more than number of replicas",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example",
replicas: 1,
shard: -1,
expectedShard: 0,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
},
{
name: "shard number is pre-specified and length of shard mapping less than number of replicas - Existing controller",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
}, {
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example-1",
replicas: 2,
shard: 1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
},
{
name: "shard number is pre-specified and length of shard mapping less than number of replicas - New controller",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example-1",
replicas: 2,
shard: 1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
{
name: "shard number is pre-specified and length of shard mapping more than number of replicas",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-2",
ShardNumber: 2,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example",
replicas: 2,
shard: 1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "",
ShardNumber: 0,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
}, {
ControllerName: "test-example",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
{
name: "updating heartbeat",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
},
},
hostname: "test-example-1",
replicas: 2,
shard: -1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
{
name: "updating heartbeat - shard pre-defined",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
},
},
hostname: "test-example-1",
replicas: 2,
shard: 1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer func() { osHostnameFunction = os.Hostname }()
heartbeatCurrentTime = func() metav1.Time { return expectedTime }
shard, shardMappingData := getOrUpdateShardNumberForController(tc.shardApplicationControllerMapping, tc.hostname, tc.replicas, tc.shard)
assert.Equal(t, tc.expectedShard, shard)
assert.Equal(t, tc.expectedShardMappingData, shardMappingData)
})
}
}

View File

@@ -0,0 +1,57 @@
# Dynamic Cluster Distribution
*Current Status: [Alpha][1] (Since v2.9.0)*
By default, clusters are assigned to shards indefinitely. For users of the default, hash-based sharding algorithm, this
static assignment is fine: shards will always be roughly-balanced by the hash-based algorithm. But for users of the
[round-robin](high_availability.md#argocd-application-controller) or other custom shard assignment algorithms, this
static assignment can lead to unbalanced shards when replicas are added or removed.
Starting v2.9, Argo CD supports a dynamic cluster distribution feature. When replicas are added or removed, the sharding
algorithm is re-run to ensure that the clusters are distributed according to the algorithm. If the algorithm is
well-balanced, like round-robin, then the shards will be well-balanced.
Previously, the shard count was set via the `ARGOCD_CONTROLLER_REPLICAS` environment variable. Changing the environment
variable forced a restart of all application controller pods. Now, the shard count is set via the `replicas` field of
the Deployment, which does not require a restart of the application controller pods.
## Enabling Dynamic Distribution of Clusters
In order to utilize the feature, the manifests `manifests/ha/base/controller-deployment/` can be applied as a Kustomize
overlay. This overlay sets the StatefulSet replicas to `0` and deploys the application controller as a Deployment. The
dynamic distribution code automatically kicks in when the controller is deployed as a Deployment.
!!! important
The use of a Deployment instead of a StatefulSet is an implementation detail which may change in future versions of
this feature. Therefore, the directory name of the Kustomize overlay may change as well. Monitor the release notes
to avoid issues.
Note the introduction of new environment variable `ARGOCD_CONTROLLER_HEARTBEAT_TIME`. The environment variable is explained in [working of Dynamic Distribution Heartbeat Process](#working-of-dynamic-distribution)
## Working of Dynamic Distribution
To accomplish runtime distribution of clusters, the Application Controller uses a ConfigMap to associate a controller
pod with a shard number and a heartbeat to ensure that controller pods are still alive and handling their shard, in
effect, their share of the work.
The Application Controller will create a new ConfigMap named `argocd-app-controller-shard-cm` to store the Controller <-> Shard mapping. The mapping would look like below for each shard:
```yaml
ShardNumber : 0
ControllerName : "argocd-application-controller-hydrxyt"
HeartbeatTime : "2009-11-17 20:34:58.651387237 +0000 UTC"
```
* `ControllerName`: Stores the hostname of the Application Controller pod
* `ShardNumber` : Stores the shard number managed by the controller pod
* `HeartbeatTime`: Stores the last time this heartbeat was updated.
Controller Shard Mapping is updated in the ConfigMap during each readiness probe check of the pod, that is every 10 seconds (otherwise as configured). The controller will acquire the shard during every iteration of readiness probe check and try to update the ConfigMap with the `HeartbeatTime`. The default `HeartbeatDuration` after which the heartbeat should be updated is `10` seconds. If the ConfigMap was not updated for any controller pod for more than `3 * HeartbeatDuration`, then the readiness probe for the application pod is marked as `Unhealthy`. To increase the default `HeartbeatDuration`, you can set the environment variable `ARGOCD_CONTROLLER_HEARTBEAT_TIME` with the desired value.
The new sharding mechanism does not monitor the environment variable `ARGOCD_CONTROLLER_REPLICAS` but instead reads the replica count directly from the Application Controller Deployment. The controller identifies the change in the number of replicas by comparing the replica count in the Application Controller Deployment and the number of mappings in the `argocd-app-controller-shard-cm` ConfigMap.
In the scenario when the number of Application Controller replicas increases, a new entry is added to the list of mappings in the `argocd-app-controller-shard-cm` ConfigMap and the cluster distribution is triggered to re-distribute the clusters.
In the scenario when the number of Application Controller replicas decreases, the mappings in the `argocd-app-controller-shard-cm` ConfigMap are reset and every controller acquires the shard again thus triggering the re-distribution of the clusters.
[1]: https://github.com/argoproj/argoproj/blob/master/community/feature-status.md

View File

@@ -61,8 +61,7 @@ reconciliation. In this case, we advise to use the preferred resource version in
* If the controller is managing too many clusters and uses too much memory then you can shard clusters across multiple
controller replicas. To enable sharding, increase the number of replicas in `argocd-application-controller` `StatefulSet`
and repeat the number of replicas in the `ARGOCD_CONTROLLER_REPLICAS` environment variable. The strategic merge patch below
demonstrates changes required to configure two controller replicas.
and repeat the number of replicas in the `ARGOCD_CONTROLLER_REPLICAS` environment variable. The strategic merge patch below demonstrates changes required to configure two controller replicas.
* By default, the controller will update the cluster information every 10 seconds. If there is a problem with your cluster network environment that is causing the update time to take a long time, you can try modifying the environment variable `ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT` to increase the timeout (the unit is seconds).

View File

@@ -0,0 +1,218 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app.kubernetes.io/name: argocd-application-controller
app.kubernetes.io/part-of: argocd
app.kubernetes.io/component: application-controller
name: argocd-application-controller
spec:
selector:
matchLabels:
app.kubernetes.io/name: argocd-application-controller
replicas: 1
template:
metadata:
labels:
app.kubernetes.io/name: argocd-application-controller
spec:
containers:
- args:
- /usr/local/bin/argocd-application-controller
env:
- name: ARGOCD_CONTROLLER_REPLICAS
value: "1"
- name: ARGOCD_RECONCILIATION_TIMEOUT
valueFrom:
configMapKeyRef:
name: argocd-cm
key: timeout.reconciliation
optional: true
- name: ARGOCD_HARD_RECONCILIATION_TIMEOUT
valueFrom:
configMapKeyRef:
name: argocd-cm
key: timeout.hard.reconciliation
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: repo.server
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_TIMEOUT_SECONDS
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.repo.server.timeout.seconds
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_STATUS_PROCESSORS
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.status.processors
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_OPERATION_PROCESSORS
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.operation.processors
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_LOGFORMAT
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.log.format
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_LOGLEVEL
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.log.level
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_METRICS_CACHE_EXPIRATION
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.metrics.cache.expiration
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_SELF_HEAL_TIMEOUT_SECONDS
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.self.heal.timeout.seconds
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_PLAINTEXT
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.repo.server.plaintext
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_STRICT_TLS
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.repo.server.strict.tls
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_PERSIST_RESOURCE_HEALTH
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.resource.health.persist
optional: true
- name: ARGOCD_APP_STATE_CACHE_EXPIRATION
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.app.state.cache.expiration
optional: true
- name: REDIS_SERVER
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: redis.server
optional: true
- name: REDIS_COMPRESSION
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: redis.compression
optional: true
- name: REDISDB
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: redis.db
optional: true
- name: ARGOCD_DEFAULT_CACHE_EXPIRATION
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.default.cache.expiration
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_OTLP_ADDRESS
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: otlp.address
optional: true
- name: ARGOCD_APPLICATION_NAMESPACES
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: application.namespaces
optional: true
- name: ARGOCD_CONTROLLER_SHARDING_ALGORITHM
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.sharding.algorithm
optional: true
- name: ARGOCD_APPLICATION_CONTROLLER_KUBECTL_PARALLELISM_LIMIT
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.kubectl.parallelism.limit
optional: true
- name: ARGOCD_CONTROLLER_HEARTBEAT_TIME
valueFrom:
configMapKeyRef:
name: argocd-cmd-params-cm
key: controller.heatbeat.time
optional: true
image: quay.io/argoproj/argocd:latest
imagePullPolicy: Always
name: argocd-application-controller
ports:
- containerPort: 8082
readinessProbe:
httpGet:
path: /healthz
port: 8082
initialDelaySeconds: 5
periodSeconds: 10
securityContext:
runAsNonRoot: true
readOnlyRootFilesystem: true
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
workingDir: /home/argocd
volumeMounts:
- name: argocd-repo-server-tls
mountPath: /app/config/controller/tls
- name: argocd-home
mountPath: /home/argocd
serviceAccountName: argocd-application-controller
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: argocd-application-controller
topologyKey: kubernetes.io/hostname
- weight: 5
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/part-of: argocd
topologyKey: kubernetes.io/hostname
volumes:
- emptyDir: {}
name: argocd-home
- name: argocd-repo-server-tls
secret:
secretName: argocd-repo-server-tls
optional: true
items:
- key: tls.crt
path: tls.crt
- key: tls.key
path: tls.key
- key: ca.crt
path: ca.crt

View File

@@ -0,0 +1,20 @@
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/component: argocd-application-controller
app.kubernetes.io/name: argocd-application-controller
app.kubernetes.io/part-of: argocd
name: argocd-application-controller
spec:
ports:
- name: application-controller
protocol: TCP
port: 8082
targetPort: 8082
- name: metrics
protocol: TCP
port: 8084
targetPort: 8084
selector:
app.kubernetes.io/name: argocd-application-controller

View File

@@ -0,0 +1,6 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- argocd-application-controller-service.yaml
- argocd-application-controller-deployment.yaml

View File

@@ -210,4 +210,4 @@ spec:
- key: tls.key
path: tls.key
- key: ca.crt
path: ca.crt
path: ca.crt

View File

@@ -0,0 +1,15 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: argocd-application-controller
spec:
replicas: 0
template:
spec:
containers:
- name: argocd-application-controller
args:
- /usr/local/bin/argocd-application-controller
env:
- name: ARGOCD_CONTROLLER_REPLICAS
value: "0"

View File

@@ -0,0 +1,6 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: argocd-cmd-params-cm
data:
redis.server: argocd-redis-ha-haproxy:6379

View File

@@ -0,0 +1,26 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: argocd-repo-server
spec:
replicas: 2
template:
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
app.kubernetes.io/name: argocd-repo-server
topologyKey: kubernetes.io/hostname
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: argocd-repo-server
topologyKey: topology.kubernetes.io/zone
containers:
- name: argocd-repo-server
args:
- /usr/local/bin/argocd-repo-server

View File

@@ -0,0 +1,29 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: argocd-server
spec:
replicas: 2
template:
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
app.kubernetes.io/name: argocd-server
topologyKey: kubernetes.io/hostname
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: argocd-server
topologyKey: topology.kubernetes.io/zone
containers:
- name: argocd-server
env:
- name: ARGOCD_API_SERVER_REPLICAS
value: '2'
args:
- /usr/local/bin/argocd-server

View File

@@ -0,0 +1,25 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
patches:
- path: argocd-repo-server-deployment.yaml
- path: argocd-server-deployment.yaml
- path: argocd-application-controller-statefulset.yaml
- path: argocd-cmd-params-cm.yaml
images:
- name: quay.io/argoproj/argocd
newName: quay.io/argoproj/argocd
newTag: latest
resources:
- ../../../base/application-controller
- ../../../base/application-controller-deployment
- ../../../base/applicationset-controller
- ../../../base/dex
- ../../../base/repo-server
- ../../../base/server
- ../../../base/config
- ../../../base/notification
- ../redis-ha

View File

@@ -8,4 +8,4 @@ spec:
containers:
- name: argocd-application-controller
args:
- /usr/local/bin/argocd-application-controller
- /usr/local/bin/argocd-application-controller

View File

@@ -65,7 +65,7 @@ func GivenWithNamespace(t *testing.T, namespace string) *Context {
func GivenWithSameState(t *testing.T) *Context {
// ARGOCE_E2E_DEFAULT_TIMEOUT can be used to override the default timeout
// for any context.
timeout := env.ParseNumFromEnv("ARGOCD_E2E_DEFAULT_TIMEOUT", 10, 0, 180)
timeout := env.ParseNumFromEnv("ARGOCD_E2E_DEFAULT_TIMEOUT", 20, 0, 180)
return &Context{
t: t,
destServer: v1alpha1.KubernetesInternalAPIServerAddr,

View File

@@ -2,12 +2,16 @@ package db
import (
"context"
"math"
"strings"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"github.com/argoproj/argo-cd/v2/common"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/settings"
)
@@ -83,6 +87,9 @@ type ArgoDB interface {
AddGPGPublicKey(ctx context.Context, keyData string) (map[string]*appv1.GnuPGPublicKey, []string, error)
// DeleteGPGPublicKey removes a GPG public key from the configuration
DeleteGPGPublicKey(ctx context.Context, keyID string) error
// GetApplicationControllerReplicas gets the replicas of application controller
GetApplicationControllerReplicas() int
}
type db struct {
@@ -140,3 +147,14 @@ func (db *db) unmarshalFromSecretsStr(secrets map[*SecretMaperValidation]*v1.Sec
func StripCRLFCharacter(input string) string {
return strings.TrimSpace(input)
}
// GetApplicationControllerReplicas gets the replicas of application controller
func (db *db) GetApplicationControllerReplicas() int {
// get the replicas from application controller deployment, if the application controller deployment does not exist, check for environment variable
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, _ := db.kubeclientset.AppsV1().Deployments(db.settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})
if appControllerDeployment != nil {
return int(*appControllerDeployment.Spec.Replicas)
}
return env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
appv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -749,3 +750,27 @@ func TestGetClusterServersByName_InClusterConfigured(t *testing.T) {
assert.NoError(t, err)
assert.ElementsMatch(t, []string{v1alpha1.KubernetesInternalAPIServerAddr}, servers)
}
func TestGetApplicationControllerReplicas(t *testing.T) {
clientset := getClientset(nil)
expectedReplicas := int32(2)
t.Setenv(common.EnvControllerReplicas, "2")
db := NewDB(testNamespace, settings.NewSettingsManager(context.Background(), clientset, testNamespace), clientset)
replicas := db.GetApplicationControllerReplicas()
assert.Equal(t, int(expectedReplicas), replicas)
expectedReplicas = int32(3)
clientset = getClientset(nil, &appv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: common.ApplicationController,
Namespace: testNamespace,
},
Spec: appv1.DeploymentSpec{
Replicas: &expectedReplicas,
},
})
t.Setenv(common.EnvControllerReplicas, "2")
db = NewDB(testNamespace, settings.NewSettingsManager(context.Background(), clientset, testNamespace), clientset)
replicas = db.GetApplicationControllerReplicas()
assert.Equal(t, int(expectedReplicas), replicas)
}

View File

@@ -1,14 +1,14 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mocks
import (
context "context"
db "github.com/argoproj/argo-cd/v2/util/db"
mock "github.com/stretchr/testify/mock"
v1alpha1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
db "github.com/argoproj/argo-cd/v2/util/db"
)
// ArgoDB is an autogenerated mock type for the ArgoDB type
@@ -21,6 +21,11 @@ func (_m *ArgoDB) AddGPGPublicKey(ctx context.Context, keyData string) (map[stri
ret := _m.Called(ctx, keyData)
var r0 map[string]*v1alpha1.GnuPGPublicKey
var r1 []string
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, string) (map[string]*v1alpha1.GnuPGPublicKey, []string, error)); ok {
return rf(ctx, keyData)
}
if rf, ok := ret.Get(0).(func(context.Context, string) map[string]*v1alpha1.GnuPGPublicKey); ok {
r0 = rf(ctx, keyData)
} else {
@@ -29,7 +34,6 @@ func (_m *ArgoDB) AddGPGPublicKey(ctx context.Context, keyData string) (map[stri
}
}
var r1 []string
if rf, ok := ret.Get(1).(func(context.Context, string) []string); ok {
r1 = rf(ctx, keyData)
} else {
@@ -38,7 +42,6 @@ func (_m *ArgoDB) AddGPGPublicKey(ctx context.Context, keyData string) (map[stri
}
}
var r2 error
if rf, ok := ret.Get(2).(func(context.Context, string) error); ok {
r2 = rf(ctx, keyData)
} else {
@@ -53,6 +56,10 @@ func (_m *ArgoDB) CreateCluster(ctx context.Context, c *v1alpha1.Cluster) (*v1al
ret := _m.Called(ctx, c)
var r0 *v1alpha1.Cluster
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Cluster) (*v1alpha1.Cluster, error)); ok {
return rf(ctx, c)
}
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Cluster) *v1alpha1.Cluster); ok {
r0 = rf(ctx, c)
} else {
@@ -61,7 +68,6 @@ func (_m *ArgoDB) CreateCluster(ctx context.Context, c *v1alpha1.Cluster) (*v1al
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.Cluster) error); ok {
r1 = rf(ctx, c)
} else {
@@ -76,6 +82,10 @@ func (_m *ArgoDB) CreateRepoCertificate(ctx context.Context, certificate *v1alph
ret := _m.Called(ctx, certificate, upsert)
var r0 *v1alpha1.RepositoryCertificateList
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepositoryCertificateList, bool) (*v1alpha1.RepositoryCertificateList, error)); ok {
return rf(ctx, certificate, upsert)
}
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepositoryCertificateList, bool) *v1alpha1.RepositoryCertificateList); ok {
r0 = rf(ctx, certificate, upsert)
} else {
@@ -84,7 +94,6 @@ func (_m *ArgoDB) CreateRepoCertificate(ctx context.Context, certificate *v1alph
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.RepositoryCertificateList, bool) error); ok {
r1 = rf(ctx, certificate, upsert)
} else {
@@ -99,6 +108,10 @@ func (_m *ArgoDB) CreateRepository(ctx context.Context, r *v1alpha1.Repository)
ret := _m.Called(ctx, r)
var r0 *v1alpha1.Repository
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Repository) (*v1alpha1.Repository, error)); ok {
return rf(ctx, r)
}
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Repository) *v1alpha1.Repository); ok {
r0 = rf(ctx, r)
} else {
@@ -107,7 +120,6 @@ func (_m *ArgoDB) CreateRepository(ctx context.Context, r *v1alpha1.Repository)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.Repository) error); ok {
r1 = rf(ctx, r)
} else {
@@ -122,6 +134,10 @@ func (_m *ArgoDB) CreateRepositoryCredentials(ctx context.Context, r *v1alpha1.R
ret := _m.Called(ctx, r)
var r0 *v1alpha1.RepoCreds
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepoCreds) (*v1alpha1.RepoCreds, error)); ok {
return rf(ctx, r)
}
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepoCreds) *v1alpha1.RepoCreds); ok {
r0 = rf(ctx, r)
} else {
@@ -130,7 +146,6 @@ func (_m *ArgoDB) CreateRepositoryCredentials(ctx context.Context, r *v1alpha1.R
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.RepoCreds) error); ok {
r1 = rf(ctx, r)
} else {
@@ -201,6 +216,10 @@ func (_m *ArgoDB) GetAllHelmRepositoryCredentials(ctx context.Context) ([]*v1alp
ret := _m.Called(ctx)
var r0 []*v1alpha1.RepoCreds
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]*v1alpha1.RepoCreds, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []*v1alpha1.RepoCreds); ok {
r0 = rf(ctx)
} else {
@@ -209,7 +228,6 @@ func (_m *ArgoDB) GetAllHelmRepositoryCredentials(ctx context.Context) ([]*v1alp
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
@@ -219,11 +237,29 @@ func (_m *ArgoDB) GetAllHelmRepositoryCredentials(ctx context.Context) ([]*v1alp
return r0, r1
}
// GetApplicationControllerReplicas provides a mock function with given fields:
func (_m *ArgoDB) GetApplicationControllerReplicas() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// GetCluster provides a mock function with given fields: ctx, server
func (_m *ArgoDB) GetCluster(ctx context.Context, server string) (*v1alpha1.Cluster, error) {
ret := _m.Called(ctx, server)
var r0 *v1alpha1.Cluster
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (*v1alpha1.Cluster, error)); ok {
return rf(ctx, server)
}
if rf, ok := ret.Get(0).(func(context.Context, string) *v1alpha1.Cluster); ok {
r0 = rf(ctx, server)
} else {
@@ -232,7 +268,6 @@ func (_m *ArgoDB) GetCluster(ctx context.Context, server string) (*v1alpha1.Clus
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, server)
} else {
@@ -247,6 +282,10 @@ func (_m *ArgoDB) GetClusterServersByName(ctx context.Context, name string) ([]s
ret := _m.Called(ctx, name)
var r0 []string
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) ([]string, error)); ok {
return rf(ctx, name)
}
if rf, ok := ret.Get(0).(func(context.Context, string) []string); ok {
r0 = rf(ctx, name)
} else {
@@ -255,7 +294,6 @@ func (_m *ArgoDB) GetClusterServersByName(ctx context.Context, name string) ([]s
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, name)
} else {
@@ -270,6 +308,10 @@ func (_m *ArgoDB) GetProjectClusters(ctx context.Context, project string) ([]*v1
ret := _m.Called(ctx, project)
var r0 []*v1alpha1.Cluster
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) ([]*v1alpha1.Cluster, error)); ok {
return rf(ctx, project)
}
if rf, ok := ret.Get(0).(func(context.Context, string) []*v1alpha1.Cluster); ok {
r0 = rf(ctx, project)
} else {
@@ -278,7 +320,6 @@ func (_m *ArgoDB) GetProjectClusters(ctx context.Context, project string) ([]*v1
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, project)
} else {
@@ -293,6 +334,10 @@ func (_m *ArgoDB) GetProjectRepositories(ctx context.Context, project string) ([
ret := _m.Called(ctx, project)
var r0 []*v1alpha1.Repository
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) ([]*v1alpha1.Repository, error)); ok {
return rf(ctx, project)
}
if rf, ok := ret.Get(0).(func(context.Context, string) []*v1alpha1.Repository); ok {
r0 = rf(ctx, project)
} else {
@@ -301,7 +346,6 @@ func (_m *ArgoDB) GetProjectRepositories(ctx context.Context, project string) ([
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, project)
} else {
@@ -316,6 +360,10 @@ func (_m *ArgoDB) GetRepository(ctx context.Context, url string) (*v1alpha1.Repo
ret := _m.Called(ctx, url)
var r0 *v1alpha1.Repository
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (*v1alpha1.Repository, error)); ok {
return rf(ctx, url)
}
if rf, ok := ret.Get(0).(func(context.Context, string) *v1alpha1.Repository); ok {
r0 = rf(ctx, url)
} else {
@@ -324,7 +372,6 @@ func (_m *ArgoDB) GetRepository(ctx context.Context, url string) (*v1alpha1.Repo
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, url)
} else {
@@ -339,6 +386,10 @@ func (_m *ArgoDB) GetRepositoryCredentials(ctx context.Context, name string) (*v
ret := _m.Called(ctx, name)
var r0 *v1alpha1.RepoCreds
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (*v1alpha1.RepoCreds, error)); ok {
return rf(ctx, name)
}
if rf, ok := ret.Get(0).(func(context.Context, string) *v1alpha1.RepoCreds); ok {
r0 = rf(ctx, name)
} else {
@@ -347,7 +398,6 @@ func (_m *ArgoDB) GetRepositoryCredentials(ctx context.Context, name string) (*v
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, name)
} else {
@@ -362,6 +412,10 @@ func (_m *ArgoDB) ListClusters(ctx context.Context) (*v1alpha1.ClusterList, erro
ret := _m.Called(ctx)
var r0 *v1alpha1.ClusterList
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (*v1alpha1.ClusterList, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) *v1alpha1.ClusterList); ok {
r0 = rf(ctx)
} else {
@@ -370,7 +424,6 @@ func (_m *ArgoDB) ListClusters(ctx context.Context) (*v1alpha1.ClusterList, erro
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
@@ -385,6 +438,10 @@ func (_m *ArgoDB) ListConfiguredGPGPublicKeys(ctx context.Context) (map[string]*
ret := _m.Called(ctx)
var r0 map[string]*v1alpha1.GnuPGPublicKey
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (map[string]*v1alpha1.GnuPGPublicKey, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) map[string]*v1alpha1.GnuPGPublicKey); ok {
r0 = rf(ctx)
} else {
@@ -393,7 +450,6 @@ func (_m *ArgoDB) ListConfiguredGPGPublicKeys(ctx context.Context) (map[string]*
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
@@ -408,6 +464,10 @@ func (_m *ArgoDB) ListHelmRepositories(ctx context.Context) ([]*v1alpha1.Reposit
ret := _m.Called(ctx)
var r0 []*v1alpha1.Repository
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]*v1alpha1.Repository, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []*v1alpha1.Repository); ok {
r0 = rf(ctx)
} else {
@@ -416,7 +476,6 @@ func (_m *ArgoDB) ListHelmRepositories(ctx context.Context) ([]*v1alpha1.Reposit
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
@@ -431,6 +490,10 @@ func (_m *ArgoDB) ListRepoCertificates(ctx context.Context, selector *db.Certifi
ret := _m.Called(ctx, selector)
var r0 *v1alpha1.RepositoryCertificateList
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *db.CertificateListSelector) (*v1alpha1.RepositoryCertificateList, error)); ok {
return rf(ctx, selector)
}
if rf, ok := ret.Get(0).(func(context.Context, *db.CertificateListSelector) *v1alpha1.RepositoryCertificateList); ok {
r0 = rf(ctx, selector)
} else {
@@ -439,7 +502,6 @@ func (_m *ArgoDB) ListRepoCertificates(ctx context.Context, selector *db.Certifi
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *db.CertificateListSelector) error); ok {
r1 = rf(ctx, selector)
} else {
@@ -454,6 +516,10 @@ func (_m *ArgoDB) ListRepositories(ctx context.Context) ([]*v1alpha1.Repository,
ret := _m.Called(ctx)
var r0 []*v1alpha1.Repository
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]*v1alpha1.Repository, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []*v1alpha1.Repository); ok {
r0 = rf(ctx)
} else {
@@ -462,7 +528,6 @@ func (_m *ArgoDB) ListRepositories(ctx context.Context) ([]*v1alpha1.Repository,
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
@@ -477,6 +542,10 @@ func (_m *ArgoDB) ListRepositoryCredentials(ctx context.Context) ([]string, erro
ret := _m.Called(ctx)
var r0 []string
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]string, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []string); ok {
r0 = rf(ctx)
} else {
@@ -485,7 +554,6 @@ func (_m *ArgoDB) ListRepositoryCredentials(ctx context.Context) ([]string, erro
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
@@ -500,6 +568,10 @@ func (_m *ArgoDB) RemoveRepoCertificates(ctx context.Context, selector *db.Certi
ret := _m.Called(ctx, selector)
var r0 *v1alpha1.RepositoryCertificateList
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *db.CertificateListSelector) (*v1alpha1.RepositoryCertificateList, error)); ok {
return rf(ctx, selector)
}
if rf, ok := ret.Get(0).(func(context.Context, *db.CertificateListSelector) *v1alpha1.RepositoryCertificateList); ok {
r0 = rf(ctx, selector)
} else {
@@ -508,7 +580,6 @@ func (_m *ArgoDB) RemoveRepoCertificates(ctx context.Context, selector *db.Certi
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *db.CertificateListSelector) error); ok {
r1 = rf(ctx, selector)
} else {
@@ -523,13 +594,16 @@ func (_m *ArgoDB) RepositoryExists(ctx context.Context, repoURL string) (bool, e
ret := _m.Called(ctx, repoURL)
var r0 bool
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok {
return rf(ctx, repoURL)
}
if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok {
r0 = rf(ctx, repoURL)
} else {
r0 = ret.Get(0).(bool)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, repoURL)
} else {
@@ -544,6 +618,10 @@ func (_m *ArgoDB) UpdateCluster(ctx context.Context, c *v1alpha1.Cluster) (*v1al
ret := _m.Called(ctx, c)
var r0 *v1alpha1.Cluster
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Cluster) (*v1alpha1.Cluster, error)); ok {
return rf(ctx, c)
}
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Cluster) *v1alpha1.Cluster); ok {
r0 = rf(ctx, c)
} else {
@@ -552,7 +630,6 @@ func (_m *ArgoDB) UpdateCluster(ctx context.Context, c *v1alpha1.Cluster) (*v1al
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.Cluster) error); ok {
r1 = rf(ctx, c)
} else {
@@ -567,6 +644,10 @@ func (_m *ArgoDB) UpdateRepository(ctx context.Context, r *v1alpha1.Repository)
ret := _m.Called(ctx, r)
var r0 *v1alpha1.Repository
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Repository) (*v1alpha1.Repository, error)); ok {
return rf(ctx, r)
}
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Repository) *v1alpha1.Repository); ok {
r0 = rf(ctx, r)
} else {
@@ -575,7 +656,6 @@ func (_m *ArgoDB) UpdateRepository(ctx context.Context, r *v1alpha1.Repository)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.Repository) error); ok {
r1 = rf(ctx, r)
} else {
@@ -590,6 +670,10 @@ func (_m *ArgoDB) UpdateRepositoryCredentials(ctx context.Context, r *v1alpha1.R
ret := _m.Called(ctx, r)
var r0 *v1alpha1.RepoCreds
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepoCreds) (*v1alpha1.RepoCreds, error)); ok {
return rf(ctx, r)
}
if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepoCreds) *v1alpha1.RepoCreds); ok {
r0 = rf(ctx, r)
} else {
@@ -598,7 +682,6 @@ func (_m *ArgoDB) UpdateRepositoryCredentials(ctx context.Context, r *v1alpha1.R
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.RepoCreds) error); ok {
r1 = rf(ctx, r)
} else {
@@ -621,3 +704,17 @@ func (_m *ArgoDB) WatchClusters(ctx context.Context, handleAddEvent func(*v1alph
return r0
}
// NewArgoDB creates a new instance of ArgoDB. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewArgoDB(t interface {
mock.TestingT
Cleanup(func())
}) *ArgoDB {
mock := &ArgoDB{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}