Files
argo-cd/controller/sharding/cache.go
2026-02-18 20:30:39 -05:00

284 lines
8.6 KiB
Go

package sharding
import (
"maps"
"strconv"
"sync"
log "github.com/sirupsen/logrus"
"github.com/argoproj/argo-cd/v3/common"
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v3/util/db"
)
type ClusterShardingCache interface {
Init(clusters *v1alpha1.ClusterList, apps *v1alpha1.ApplicationList)
Add(c *v1alpha1.Cluster)
Delete(clusterServer string)
Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster)
AddApp(a *v1alpha1.Application)
DeleteApp(a *v1alpha1.Application)
UpdateApp(a *v1alpha1.Application)
IsManagedCluster(c *v1alpha1.Cluster) bool
GetDistribution() map[string]int
GetAppDistribution() map[string]int
UpdateShard(shard int) bool
}
type ClusterSharding struct {
Shard int
Replicas int
Shards map[string]int
Clusters map[string]*v1alpha1.Cluster
Apps map[string]*v1alpha1.Application
lock sync.RWMutex
getClusterShard DistributionFunction
}
func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
clusterSharding := &ClusterSharding{
Shard: shard,
Replicas: replicas,
Shards: make(map[string]int),
Clusters: make(map[string]*v1alpha1.Cluster),
Apps: make(map[string]*v1alpha1.Application),
}
distributionFunction := NoShardingDistributionFunction()
if replicas > 1 {
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
distributionFunction = GetDistributionFunction(clusterSharding.getClusterAccessor(), clusterSharding.getAppAccessor(), shardingAlgorithm, replicas)
} else {
log.Info("Processing all cluster shards")
}
clusterSharding.getClusterShard = distributionFunction
return clusterSharding
}
// IsManagedCluster returns whether or not the cluster should be processed by a given shard.
func (sharding *ClusterSharding) IsManagedCluster(c *v1alpha1.Cluster) bool {
sharding.lock.RLock()
defer sharding.lock.RUnlock()
if c == nil { // nil cluster (in-cluster) is always managed by current clusterShard
return true
}
if skipReconcile, err := strconv.ParseBool(c.Annotations[common.AnnotationKeyAppSkipReconcile]); err == nil && skipReconcile {
log.Debugf("Cluster %s has %s annotation set, skipping", c.Server, common.AnnotationKeyAppSkipReconcile)
return false
}
clusterShard := 0
if shard, ok := sharding.Shards[c.Server]; ok {
clusterShard = shard
} else {
log.Warnf("The cluster %s has no assigned shard.", c.Server)
}
log.Debugf("Checking if cluster %s with clusterShard %d should be processed by shard %d", c.Server, clusterShard, sharding.Shard)
return clusterShard == sharding.Shard
}
func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList, apps *v1alpha1.ApplicationList) {
sharding.lock.Lock()
defer sharding.lock.Unlock()
newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
for _, c := range clusters.Items {
cluster := c
newClusters[c.Server] = &cluster
}
sharding.Clusters = newClusters
newApps := make(map[string]*v1alpha1.Application, len(apps.Items))
for i := range apps.Items {
app := apps.Items[i]
newApps[app.Name] = &app
}
sharding.Apps = newApps
sharding.updateDistribution()
}
func (sharding *ClusterSharding) Add(c *v1alpha1.Cluster) {
sharding.lock.Lock()
defer sharding.lock.Unlock()
old, ok := sharding.Clusters[c.Server]
sharding.Clusters[c.Server] = c
if !ok || hasShardingUpdates(old, c) {
sharding.updateDistribution()
} else {
log.Debugf("Skipping sharding distribution update. Cluster already added")
}
}
func (sharding *ClusterSharding) Delete(clusterServer string) {
sharding.lock.Lock()
defer sharding.lock.Unlock()
if _, ok := sharding.Clusters[clusterServer]; ok {
delete(sharding.Clusters, clusterServer)
delete(sharding.Shards, clusterServer)
sharding.updateDistribution()
}
}
func (sharding *ClusterSharding) Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) {
sharding.lock.Lock()
defer sharding.lock.Unlock()
if _, ok := sharding.Clusters[oldCluster.Server]; ok && oldCluster.Server != newCluster.Server {
delete(sharding.Clusters, oldCluster.Server)
delete(sharding.Shards, oldCluster.Server)
}
sharding.Clusters[newCluster.Server] = newCluster
if hasShardingUpdates(oldCluster, newCluster) {
sharding.updateDistribution()
} else {
log.Debugf("Skipping sharding distribution update. No relevant changes")
}
}
func (sharding *ClusterSharding) GetDistribution() map[string]int {
sharding.lock.RLock()
defer sharding.lock.RUnlock()
shards := sharding.Shards
distribution := make(map[string]int, len(shards))
maps.Copy(distribution, shards)
return distribution
}
func (sharding *ClusterSharding) updateDistribution() {
for k, c := range sharding.Clusters {
shard := 0
if c.Shard != nil {
requestedShard := int(*c.Shard)
if requestedShard < sharding.Replicas {
shard = requestedShard
} else {
log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard (%d). Using shard 0.", requestedShard, c.Server, sharding.Replicas)
}
} else {
shard = sharding.getClusterShard(c)
}
existingShard, ok := sharding.Shards[k]
switch {
case ok && existingShard != shard:
log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard)
case !ok:
log.Infof("Cluster %s has been assigned to shard %d", k, shard)
default:
log.Debugf("Cluster %s has not changed shard", k)
}
sharding.Shards[k] = shard
}
}
// hasShardingUpdates returns true if the sharding distribution has explicitly changed
func hasShardingUpdates(old, newCluster *v1alpha1.Cluster) bool {
if old == nil || newCluster == nil {
return false
}
// returns true if the cluster id has changed because some sharding algorithms depend on it.
if old.ID != newCluster.ID {
return true
}
if old.Server != newCluster.Server {
return true
}
// return false if the shard field has not been modified
if old.Shard == nil && newCluster.Shard == nil {
return false
}
return old.Shard == nil || newCluster.Shard == nil || int64(*old.Shard) != int64(*newCluster.Shard)
}
// A read lock should be acquired before calling getClusterAccessor.
func (sharding *ClusterSharding) getClusterAccessor() clusterAccessor {
return func() []*v1alpha1.Cluster {
// no need to lock, as this is only called from the updateDistribution function
clusters := make([]*v1alpha1.Cluster, 0, len(sharding.Clusters))
for _, c := range sharding.Clusters {
clusters = append(clusters, c)
}
return clusters
}
}
// A read lock should be acquired before calling getAppAccessor.
func (sharding *ClusterSharding) getAppAccessor() appAccessor {
return func() []*v1alpha1.Application {
apps := make([]*v1alpha1.Application, 0, len(sharding.Apps))
for _, a := range sharding.Apps {
apps = append(apps, a)
}
return apps
}
}
func (sharding *ClusterSharding) AddApp(a *v1alpha1.Application) {
sharding.lock.Lock()
defer sharding.lock.Unlock()
_, ok := sharding.Apps[a.Name]
sharding.Apps[a.Name] = a
if !ok {
sharding.updateDistribution()
} else {
log.Debugf("Skipping sharding distribution update. App already added")
}
}
func (sharding *ClusterSharding) DeleteApp(a *v1alpha1.Application) {
sharding.lock.Lock()
defer sharding.lock.Unlock()
if _, ok := sharding.Apps[a.Name]; ok {
delete(sharding.Apps, a.Name)
sharding.updateDistribution()
}
}
func (sharding *ClusterSharding) UpdateApp(a *v1alpha1.Application) {
sharding.lock.Lock()
defer sharding.lock.Unlock()
_, ok := sharding.Apps[a.Name]
sharding.Apps[a.Name] = a
if !ok {
sharding.updateDistribution()
} else {
log.Debugf("Skipping sharding distribution update. No relevant changes")
}
}
// GetAppDistribution should be not be called from a DestributionFunction because
// it could cause a deadlock when updateDistribution is called.
func (sharding *ClusterSharding) GetAppDistribution() map[string]int {
sharding.lock.RLock()
clusters := sharding.Clusters
apps := sharding.Apps
sharding.lock.RUnlock()
appDistribution := make(map[string]int, len(clusters))
for _, a := range apps {
if _, ok := appDistribution[a.Spec.Destination.Server]; !ok {
appDistribution[a.Spec.Destination.Server] = 0
}
appDistribution[a.Spec.Destination.Server]++
}
return appDistribution
}
// UpdateShard will update the shard of ClusterSharding when the shard has changed.
func (sharding *ClusterSharding) UpdateShard(shard int) bool {
if shard != sharding.Shard {
sharding.lock.RLock()
sharding.Shard = shard
sharding.lock.RUnlock()
return true
}
return false
}