Files
argo-cd/controller/sharding/sharding_test.go
2026-02-06 17:16:37 -05:00

1060 lines
37 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sharding
import (
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubefake "k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/yaml"
"github.com/argoproj/argo-cd/v3/common"
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
dbmocks "github.com/argoproj/argo-cd/v3/util/db/mocks"
"github.com/argoproj/argo-cd/v3/util/settings"
)
func TestGetShardByID_NotEmptyID(t *testing.T) {
db := &dbmocks.ArgoDB{}
replicasCount := 1
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Maybe()
assert.Equal(t, 0, LegacyDistributionFunction(replicasCount)(&v1alpha1.Cluster{ID: "1"}))
assert.Equal(t, 0, LegacyDistributionFunction(replicasCount)(&v1alpha1.Cluster{ID: "2"}))
assert.Equal(t, 0, LegacyDistributionFunction(replicasCount)(&v1alpha1.Cluster{ID: "3"}))
assert.Equal(t, 0, LegacyDistributionFunction(replicasCount)(&v1alpha1.Cluster{ID: "4"}))
}
func TestGetShardByID_EmptyID(t *testing.T) {
db := &dbmocks.ArgoDB{}
replicasCount := 1
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Maybe()
distributionFunction := LegacyDistributionFunction
shard := distributionFunction(replicasCount)(&v1alpha1.Cluster{})
assert.Equal(t, 0, shard)
}
func TestGetShardByID_NoReplicas(t *testing.T) {
db := &dbmocks.ArgoDB{}
db.EXPECT().GetApplicationControllerReplicas().Return(0).Maybe()
distributionFunction := LegacyDistributionFunction
shard := distributionFunction(0)(&v1alpha1.Cluster{})
assert.Equal(t, -1, shard)
}
func TestGetShardByID_NoReplicasUsingHashDistributionFunction(t *testing.T) {
db := &dbmocks.ArgoDB{}
db.EXPECT().GetApplicationControllerReplicas().Return(0).Maybe()
distributionFunction := LegacyDistributionFunction
shard := distributionFunction(0)(&v1alpha1.Cluster{})
assert.Equal(t, -1, shard)
}
func TestGetShardByID_NoReplicasUsingHashDistributionFunctionWithClusters(t *testing.T) {
clusters, db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters()
// Test with replicas set to 0
db.EXPECT().GetApplicationControllerReplicas().Return(0).Maybe()
t.Setenv(common.EnvControllerShardingAlgorithm, common.RoundRobinShardingAlgorithm)
distributionFunction := RoundRobinDistributionFunction(clusters, 0)
assert.Equal(t, -1, distributionFunction(nil))
assert.Equal(t, -1, distributionFunction(&cluster1))
assert.Equal(t, -1, distributionFunction(&cluster2))
assert.Equal(t, -1, distributionFunction(&cluster3))
assert.Equal(t, -1, distributionFunction(&cluster4))
assert.Equal(t, -1, distributionFunction(&cluster5))
}
func TestGetClusterFilterDefault(t *testing.T) {
// shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
clusterAccessor, _, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
os.Unsetenv(common.EnvControllerShardingAlgorithm)
replicasCount := 2
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
assert.Equal(t, 1, distributionFunction(&cluster2))
assert.Equal(t, 0, distributionFunction(&cluster3))
assert.Equal(t, 1, distributionFunction(&cluster4))
}
func TestGetClusterFilterLegacy(t *testing.T) {
// shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
replicasCount := 2
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Maybe()
t.Setenv(common.EnvControllerShardingAlgorithm, common.LegacyShardingAlgorithm)
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
assert.Equal(t, 1, distributionFunction(&cluster2))
assert.Equal(t, 0, distributionFunction(&cluster3))
assert.Equal(t, 1, distributionFunction(&cluster4))
}
func TestGetClusterFilterUnknown(t *testing.T) {
clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
appAccessor, _, _, _, _, _ := createTestApps()
// Test with replicas set to 0
t.Setenv(common.EnvControllerReplicas, "2")
os.Unsetenv(common.EnvControllerShardingAlgorithm)
t.Setenv(common.EnvControllerShardingAlgorithm, "unknown")
replicasCount := 2
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Maybe()
distributionFunction := GetDistributionFunction(clusterAccessor, appAccessor, "unknown", replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
assert.Equal(t, 1, distributionFunction(&cluster2))
assert.Equal(t, 0, distributionFunction(&cluster3))
assert.Equal(t, 1, distributionFunction(&cluster4))
}
func TestLegacyGetClusterFilterWithFixedShard(t *testing.T) {
// shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
t.Setenv(common.EnvControllerReplicas, "5")
clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
appAccessor, _, _, _, _, _ := createTestApps()
replicasCount := 5
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Maybe()
filter := GetDistributionFunction(clusterAccessor, appAccessor, common.DefaultShardingAlgorithm, replicasCount)
assert.Equal(t, 0, filter(nil))
assert.Equal(t, 4, filter(&cluster1))
assert.Equal(t, 1, filter(&cluster2))
assert.Equal(t, 2, filter(&cluster3))
assert.Equal(t, 2, filter(&cluster4))
var fixedShard int64 = 4
cluster5 := &v1alpha1.Cluster{ID: "5", Shard: &fixedShard}
clusterAccessor = getClusterAccessor([]v1alpha1.Cluster{cluster1, cluster2, cluster2, cluster4, *cluster5})
filter = GetDistributionFunction(clusterAccessor, appAccessor, common.DefaultShardingAlgorithm, replicasCount)
assert.Equal(t, int(fixedShard), filter(cluster5))
fixedShard = 1
cluster5.Shard = &fixedShard
clusterAccessor = getClusterAccessor([]v1alpha1.Cluster{cluster1, cluster2, cluster2, cluster4, *cluster5})
filter = GetDistributionFunction(clusterAccessor, appAccessor, common.DefaultShardingAlgorithm, replicasCount)
assert.Equal(t, int(fixedShard), filter(&v1alpha1.Cluster{ID: "4", Shard: &fixedShard}))
}
func TestRoundRobinGetClusterFilterWithFixedShard(t *testing.T) {
// shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...)
t.Setenv(common.EnvControllerReplicas, "4")
clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters()
appAccessor, _, _, _, _, _ := createTestApps()
replicasCount := 4
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Maybe()
filter := GetDistributionFunction(clusterAccessor, appAccessor, common.RoundRobinShardingAlgorithm, replicasCount)
assert.Equal(t, 0, filter(nil))
assert.Equal(t, 0, filter(&cluster1))
assert.Equal(t, 1, filter(&cluster2))
assert.Equal(t, 2, filter(&cluster3))
assert.Equal(t, 3, filter(&cluster4))
// a cluster with a fixed shard should be processed by the specified exact
// same shard unless the specified shard index is greater than the number of replicas.
var fixedShard int64 = 1
cluster5 := v1alpha1.Cluster{Name: "cluster5", ID: "5", Shard: &fixedShard}
clusters := []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}
clusterAccessor = getClusterAccessor(clusters)
filter = GetDistributionFunction(clusterAccessor, appAccessor, common.RoundRobinShardingAlgorithm, replicasCount)
assert.Equal(t, int(fixedShard), filter(&cluster5))
fixedShard = 1
cluster5 = v1alpha1.Cluster{Name: "cluster5", ID: "5", Shard: &fixedShard}
clusters = []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}
clusterAccessor = getClusterAccessor(clusters)
filter = GetDistributionFunction(clusterAccessor, appAccessor, common.RoundRobinShardingAlgorithm, replicasCount)
assert.Equal(t, int(fixedShard), filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard}))
}
func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) {
clusters, db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters()
t.Run("replicas set to 1", func(t *testing.T) {
replicasCount := 1
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Once()
distributionFunction := RoundRobinDistributionFunction(clusters, replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
assert.Equal(t, 0, distributionFunction(&cluster2))
assert.Equal(t, 0, distributionFunction(&cluster3))
assert.Equal(t, 0, distributionFunction(&cluster4))
assert.Equal(t, 0, distributionFunction(&cluster5))
})
t.Run("replicas set to 2", func(t *testing.T) {
replicasCount := 2
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Once()
distributionFunction := RoundRobinDistributionFunction(clusters, replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
assert.Equal(t, 1, distributionFunction(&cluster2))
assert.Equal(t, 0, distributionFunction(&cluster3))
assert.Equal(t, 1, distributionFunction(&cluster4))
assert.Equal(t, 0, distributionFunction(&cluster5))
})
t.Run("replicas set to 3", func(t *testing.T) {
replicasCount := 3
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Once()
distributionFunction := RoundRobinDistributionFunction(clusters, replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
assert.Equal(t, 1, distributionFunction(&cluster2))
assert.Equal(t, 2, distributionFunction(&cluster3))
assert.Equal(t, 0, distributionFunction(&cluster4))
assert.Equal(t, 1, distributionFunction(&cluster5))
})
}
func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterNumberIsHigh(t *testing.T) {
// Unit test written to evaluate the cost of calling db.ListCluster on every call of distributionFunction
// Doing that allows to accept added and removed clusters on the fly.
// Initial tests where showing that under 1024 clusters, execution time was around 400ms
// and for 4096 clusters, execution time was under 9s
// The other implementation was giving almost linear time of 400ms up to 10'000 clusters
clusterPointers := []*v1alpha1.Cluster{}
for i := range 2048 {
cluster := createCluster(fmt.Sprintf("cluster-%d", i), strconv.Itoa(i))
clusterPointers = append(clusterPointers, &cluster)
}
replicasCount := 2
t.Setenv(common.EnvControllerReplicas, strconv.Itoa(replicasCount))
_, db, _, _, _, _, _ := createTestClusters()
clusterAccessor := func() []*v1alpha1.Cluster { return clusterPointers }
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Maybe()
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
for i, c := range clusterPointers {
assert.Equal(t, i%2, distributionFunction(c))
}
}
func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAddedAndRemoved(t *testing.T) {
db := &dbmocks.ArgoDB{}
cluster1 := createCluster("cluster1", "1")
cluster2 := createCluster("cluster2", "2")
cluster3 := createCluster("cluster3", "3")
cluster4 := createCluster("cluster4", "4")
cluster5 := createCluster("cluster5", "5")
cluster6 := createCluster("cluster6", "6")
clusters := []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}
clusterAccessor := getClusterAccessor(clusters)
clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}}
db.EXPECT().ListClusters(mock.Anything).Return(clusterList, nil)
// Test with replicas set to 2
replicasCount := 2
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount)
distributionFunction := RoundRobinDistributionFunction(clusterAccessor, replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
assert.Equal(t, 0, distributionFunction(&cluster1))
assert.Equal(t, 1, distributionFunction(&cluster2))
assert.Equal(t, 0, distributionFunction(&cluster3))
assert.Equal(t, 1, distributionFunction(&cluster4))
assert.Equal(t, 0, distributionFunction(&cluster5))
assert.Equal(t, -1, distributionFunction(&cluster6)) // as cluster6 is not in the DB, this one should not have a shard assigned
// Now, the database knows cluster6. Shard should be assigned a proper shard
clusterList.Items = append(clusterList.Items, cluster6)
distributionFunction = RoundRobinDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount)
assert.Equal(t, 1, distributionFunction(&cluster6))
// Now, we remove the last added cluster, it should be unassigned as well
clusterList.Items = clusterList.Items[:len(clusterList.Items)-1]
distributionFunction = RoundRobinDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount)
assert.Equal(t, -1, distributionFunction(&cluster6))
}
func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) {
db := &dbmocks.ArgoDB{}
clusterCount := 133
prefix := "cluster"
clusters := []v1alpha1.Cluster{}
for i := range clusterCount {
id := fmt.Sprintf("%06d", i)
cluster := fmt.Sprintf("%s-%s", prefix, id)
clusters = append(clusters, createCluster(cluster, id))
}
clusterAccessor := getClusterAccessor(clusters)
appAccessor, _, _, _, _, _ := createTestApps()
clusterList := &v1alpha1.ClusterList{Items: clusters}
db.EXPECT().ListClusters(mock.Anything).Return(clusterList, nil)
// Test with replicas set to 3
replicasCount := 3
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
distributionMap := map[int]int{}
assignementMap := map[string]int{}
for i := range clusterCount {
assignedShard := distributionFunction(&clusters[i])
assignementMap[clusters[i].ID] = assignedShard
distributionMap[assignedShard]++
}
// We check that the distribution does not differ for more than 20%
var sum float64
sum = 0
for shard, count := range distributionMap {
if shard != -1 {
sum = (sum + float64(count))
}
}
average := sum / float64(replicasCount)
failedTests := false
for shard, count := range distributionMap {
if shard != -1 {
if float64(count) > average*float64(1.1) || float64(count) < average*float64(0.9) {
fmt.Printf("Cluster distribution differs for more than 20%%: %d for shard %d (average: %f)\n", count, shard, average)
failedTests = true
}
if failedTests {
t.Fail()
}
}
}
// Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned
replicasCount = 2
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount)
removedCluster := clusterList.Items[len(clusterList.Items)-1]
for i := range clusterCount {
c := &clusters[i]
assignedShard := distributionFunction(c)
prevıouslyAssignedShard := assignementMap[clusters[i].ID]
if prevıouslyAssignedShard != 2 && prevıouslyAssignedShard != assignedShard {
fmt.Printf("Previously assigned %s cluster has moved from replica %d to %d", c.ID, prevıouslyAssignedShard, assignedShard)
t.Fail()
}
}
// Now, we remove the last added cluster, it should be unassigned
removedCluster = clusterList.Items[len(clusterList.Items)-1]
clusterList.Items = clusterList.Items[:len(clusterList.Items)-1]
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount)
assert.Equal(t, -1, distributionFunction(&removedCluster))
}
func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) {
db := &dbmocks.ArgoDB{}
clusters := []v1alpha1.Cluster{createCluster("cluster-01", "01")}
clusterAccessor := getClusterAccessor(clusters)
clusterList := &v1alpha1.ClusterList{Items: clusters}
db.EXPECT().ListClusters(mock.Anything).Return(clusterList, nil)
appAccessor, _, _, _, _, _ := createTestApps()
// Test with replicas set to 0
replicasCount := 0
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount)
assert.Equal(t, -1, distributionFunction(nil))
}
func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) {
db := &dbmocks.ArgoDB{}
var fixedShard int64 = 1
cluster := &v1alpha1.Cluster{ID: "1", Shard: &fixedShard}
clusters := []v1alpha1.Cluster{*cluster}
clusterAccessor := getClusterAccessor(clusters)
clusterList := &v1alpha1.ClusterList{Items: clusters}
db.EXPECT().ListClusters(mock.Anything).Return(clusterList, nil)
// Test with replicas set to 5
replicasCount := 5
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount)
appAccessor, _, _, _, _, _ := createTestApps()
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount)
assert.Equal(t, fixedShard, int64(distributionFunction(cluster)))
}
func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) {
clusters, db, cluster1, cluster2, _, _, _ := createTestClusters()
replicasCount := 2
db.EXPECT().GetApplicationControllerReplicas().Return(replicasCount).Maybe()
distributionFunction := RoundRobinDistributionFunction(clusters, replicasCount)
// Test that the function returns the correct shard for cluster1 and cluster2
expectedShardForCluster1 := 0
expectedShardForCluster2 := 1
shardForCluster1 := distributionFunction(&cluster1)
shardForCluster2 := distributionFunction(&cluster2)
assert.Equal(t, expectedShardForCluster1, shardForCluster1, "Expected shard for cluster1 to be %d but got %d", expectedShardForCluster1, shardForCluster1)
assert.Equal(t, expectedShardForCluster2, shardForCluster2, "Expected shard for cluster2 to be %d but got %d", expectedShardForCluster2, shardForCluster2)
}
func TestInferShard(t *testing.T) {
// Override the os.Hostname function to return a specific hostname for testing
defer func() { osHostnameFunction = os.Hostname }()
expectedShard := 3
osHostnameFunction = func() (string, error) { return "example-shard-3", nil }
actualShard, _ := InferShard()
assert.Equal(t, expectedShard, actualShard)
osHostnameError := errors.New("cannot resolve hostname")
osHostnameFunction = func() (string, error) { return "exampleshard", osHostnameError }
_, err := InferShard()
require.Error(t, err)
assert.Equal(t, err, osHostnameError)
osHostnameFunction = func() (string, error) { return "exampleshard", nil }
_, err = InferShard()
require.NoError(t, err)
osHostnameFunction = func() (string, error) { return "example-shard", nil }
_, err = InferShard()
require.NoError(t, err)
}
func createTestClusters() (clusterAccessor, *dbmocks.ArgoDB, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster) {
db := &dbmocks.ArgoDB{}
cluster1 := createCluster("cluster1", "1")
cluster2 := createCluster("cluster2", "2")
cluster3 := createCluster("cluster3", "3")
cluster4 := createCluster("cluster4", "4")
cluster5 := createCluster("cluster5", "5")
clusters := []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}
db.EXPECT().ListClusters(mock.Anything).Return(&v1alpha1.ClusterList{Items: []v1alpha1.Cluster{
cluster1, cluster2, cluster3, cluster4, cluster5,
}}, nil)
return getClusterAccessor(clusters), db, cluster1, cluster2, cluster3, cluster4, cluster5
}
func getClusterAccessor(clusters []v1alpha1.Cluster) clusterAccessor {
// Convert the array to a slice of pointers
clusterPointers := getClusterPointers(clusters)
clusterAccessor := func() []*v1alpha1.Cluster { return clusterPointers }
return clusterAccessor
}
func getClusterPointers(clusters []v1alpha1.Cluster) []*v1alpha1.Cluster {
var clusterPointers []*v1alpha1.Cluster
for i := range clusters {
clusterPointers = append(clusterPointers, &clusters[i])
}
return clusterPointers
}
func createCluster(name string, id string) v1alpha1.Cluster {
cluster := v1alpha1.Cluster{
Name: name,
ID: id,
Server: "https://kubernetes.default.svc?" + id,
}
return cluster
}
func Test_getDefaultShardMappingData(t *testing.T) {
expectedData := []shardApplicationControllerMapping{
{
ShardNumber: 0,
ControllerName: "",
}, {
ShardNumber: 1,
ControllerName: "",
},
}
shardMappingData := getDefaultShardMappingData(2)
assert.Equal(t, expectedData, shardMappingData)
}
func Test_generateDefaultShardMappingCM_NoPredefinedShard(t *testing.T) {
replicas := 2
expectedTime := metav1.Now()
defer func() { osHostnameFunction = os.Hostname }()
defer func() { heartbeatCurrentTime = metav1.Now }()
expectedMapping := []shardApplicationControllerMapping{
{
ShardNumber: 0,
ControllerName: "test-example",
HeartbeatTime: expectedTime,
}, {
ShardNumber: 1,
},
}
expectedMappingCM, err := json.Marshal(expectedMapping)
require.NoError(t, err)
expectedShadingCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDAppControllerShardConfigMapName,
Namespace: "test",
},
Data: map[string]string{
"shardControllerMapping": string(expectedMappingCM),
},
}
heartbeatCurrentTime = func() metav1.Time { return expectedTime }
osHostnameFunction = func() (string, error) { return "test-example", nil }
shardingCM, err := generateDefaultShardMappingCM("test", "test-example", replicas, -1)
require.NoError(t, err)
assert.Equal(t, expectedShadingCM, shardingCM)
}
func Test_generateDefaultShardMappingCM_PredefinedShard(t *testing.T) {
replicas := 2
expectedTime := metav1.Now()
defer func() { osHostnameFunction = os.Hostname }()
defer func() { heartbeatCurrentTime = metav1.Now }()
expectedMapping := []shardApplicationControllerMapping{
{
ShardNumber: 0,
}, {
ShardNumber: 1,
ControllerName: "test-example",
HeartbeatTime: expectedTime,
},
}
expectedMappingCM, err := json.Marshal(expectedMapping)
require.NoError(t, err)
expectedShadingCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDAppControllerShardConfigMapName,
Namespace: "test",
},
Data: map[string]string{
"shardControllerMapping": string(expectedMappingCM),
},
}
heartbeatCurrentTime = func() metav1.Time { return expectedTime }
osHostnameFunction = func() (string, error) { return "test-example", nil }
shardingCM, err := generateDefaultShardMappingCM("test", "test-example", replicas, 1)
require.NoError(t, err)
assert.Equal(t, expectedShadingCM, shardingCM)
}
func Test_getOrUpdateShardNumberForController(t *testing.T) {
expectedTime := metav1.Now()
testCases := []struct {
name string
shardApplicationControllerMapping []shardApplicationControllerMapping
hostname string
replicas int
shard int
expectedShard int
expectedShardMappingData []shardApplicationControllerMapping
}{
{
name: "length of shard mapping less than number of replicas - Existing controller",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
},
},
hostname: "test-example",
replicas: 2,
shard: -1,
expectedShard: 0,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "",
ShardNumber: 1,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
},
},
},
{
name: "length of shard mapping less than number of replicas - New controller",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example-1",
replicas: 2,
shard: -1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
{
name: "length of shard mapping more than number of replicas",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example",
replicas: 1,
shard: -1,
expectedShard: 0,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
},
{
name: "shard number is pre-specified and length of shard mapping less than number of replicas - Existing controller",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
}, {
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example-1",
replicas: 2,
shard: 1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
},
{
name: "shard number is pre-specified and length of shard mapping less than number of replicas - New controller",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example-1",
replicas: 2,
shard: 1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
{
name: "shard number is pre-specified and length of shard mapping more than number of replicas",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-2",
ShardNumber: 2,
HeartbeatTime: expectedTime,
},
},
hostname: "test-example",
replicas: 2,
shard: 1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "",
ShardNumber: 0,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
}, {
ControllerName: "test-example",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
{
name: "updating heartbeat",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
},
},
hostname: "test-example-1",
replicas: 2,
shard: -1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
{
name: "updating heartbeat - shard pre-defined",
shardApplicationControllerMapping: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC),
},
},
hostname: "test-example-1",
replicas: 2,
shard: 1,
expectedShard: 1,
expectedShardMappingData: []shardApplicationControllerMapping{
{
ControllerName: "test-example",
ShardNumber: 0,
HeartbeatTime: expectedTime,
}, {
ControllerName: "test-example-1",
ShardNumber: 1,
HeartbeatTime: expectedTime,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer func() { osHostnameFunction = os.Hostname }()
heartbeatCurrentTime = func() metav1.Time { return expectedTime }
shard, shardMappingData := getOrUpdateShardNumberForController(tc.shardApplicationControllerMapping, tc.hostname, tc.replicas, tc.shard)
assert.Equal(t, tc.expectedShard, shard)
assert.Equal(t, tc.expectedShardMappingData, shardMappingData)
})
}
}
func TestGetClusterSharding(t *testing.T) {
IntPtr := func(i int32) *int32 {
return &i
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: common.DefaultApplicationControllerName,
Namespace: "argocd",
},
Spec: appsv1.DeploymentSpec{
Replicas: IntPtr(1),
},
}
deploymentMultiReplicas := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "argocd-application-controller-multi-replicas",
Namespace: "argocd",
},
Spec: appsv1.DeploymentSpec{
Replicas: IntPtr(3),
},
}
objects := append([]runtime.Object{}, deployment, deploymentMultiReplicas)
kubeclientset := kubefake.NewSimpleClientset(objects...)
settingsMgr := settings.NewSettingsManager(t.Context(), kubeclientset, "argocd", settings.WithRepoOrClusterChangedHandler(func() {
}))
testCases := []struct {
name string
useDynamicSharding bool
envsSetter func(t *testing.T)
cleanup func()
expectedShard int
expectedReplicas int
expectedErr error
}{
{
name: "Default sharding with statefulset",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvControllerReplicas, "1")
},
cleanup: func() {},
useDynamicSharding: false,
expectedShard: 0,
expectedReplicas: 1,
expectedErr: nil,
},
{
name: "Default sharding with deployment",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
},
cleanup: func() {},
useDynamicSharding: true,
expectedShard: 0,
expectedReplicas: 1,
expectedErr: nil,
},
{
name: "Default sharding with deployment and multiple replicas",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvAppControllerName, "argocd-application-controller-multi-replicas")
},
cleanup: func() {},
useDynamicSharding: true,
expectedShard: 0,
expectedReplicas: 3,
expectedErr: nil,
},
{
name: "Statefulset multiple replicas",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvControllerReplicas, "3")
osHostnameFunction = func() (string, error) { return "example-shard-3", nil }
},
cleanup: func() {
osHostnameFunction = os.Hostname
},
useDynamicSharding: false,
expectedShard: 3,
expectedReplicas: 3,
expectedErr: nil,
},
{
name: "Explicit shard with statefulset and 1 replica",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvControllerReplicas, "1")
t.Setenv(common.EnvControllerShard, "3")
},
cleanup: func() {},
useDynamicSharding: false,
expectedShard: 0,
expectedReplicas: 1,
expectedErr: nil,
},
{
name: "Explicit shard with statefulset and 2 replica - and to high shard",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvControllerReplicas, "2")
t.Setenv(common.EnvControllerShard, "3")
},
cleanup: func() {},
useDynamicSharding: false,
expectedShard: 0,
expectedReplicas: 2,
expectedErr: nil,
},
{
name: "Explicit shard with statefulset and 2 replica",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvControllerReplicas, "2")
t.Setenv(common.EnvControllerShard, "1")
},
cleanup: func() {},
useDynamicSharding: false,
expectedShard: 1,
expectedReplicas: 2,
expectedErr: nil,
},
{
name: "Explicit shard with deployment",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvControllerShard, "3")
},
cleanup: func() {},
useDynamicSharding: true,
expectedShard: 0,
expectedReplicas: 1,
expectedErr: nil,
},
{
name: "Explicit shard with deployment and multiple replicas will read from configmap",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvAppControllerName, "argocd-application-controller-multi-replicas")
t.Setenv(common.EnvControllerShard, "3")
},
cleanup: func() {},
useDynamicSharding: true,
expectedShard: 0,
expectedReplicas: 3,
expectedErr: nil,
},
{
name: "Dynamic sharding but missing deployment",
envsSetter: func(t *testing.T) {
t.Helper()
t.Setenv(common.EnvAppControllerName, "missing-deployment")
},
cleanup: func() {},
useDynamicSharding: true,
expectedShard: 0,
expectedReplicas: 1,
expectedErr: errors.New("(dynamic cluster distribution) failed to get app controller deployment: deployments.apps \"missing-deployment\" not found"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tc.envsSetter(t)
defer tc.cleanup()
shardingCache, err := GetClusterSharding(kubeclientset, settingsMgr, "round-robin", tc.useDynamicSharding)
if shardingCache != nil {
clusterSharding := shardingCache.(*ClusterSharding)
assert.Equal(t, tc.expectedShard, clusterSharding.Shard)
assert.Equal(t, tc.expectedReplicas, clusterSharding.Replicas)
}
if tc.expectedErr != nil {
assert.EqualError(t, err, tc.expectedErr.Error())
} else {
require.NoError(t, err)
}
})
}
}
func TestAppAwareCache(t *testing.T) {
_, _, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters()
_, app1, app2, app3, app4, app5 := createTestApps()
clusterList := getClusterPointers([]v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5})
appList := getAppPointers([]v1alpha1.Application{app1, app2, app3, app4, app5})
getClusters := func() []*v1alpha1.Cluster { return clusterList }
getApps := func() []*v1alpha1.Application { return appList }
appDistribution := getAppDistribution(getClusters, getApps)
assert.Equal(t, int64(2), appDistribution["cluster1"])
assert.Equal(t, int64(2), appDistribution["cluster2"])
assert.Equal(t, int64(1), appDistribution["cluster3"])
app6 := createApp("app6", "cluster4")
appList = append(appList, &app6)
app1Update := createApp("app1", "cluster2")
// replace app 1
appList[0] = &app1Update
// Remove app 3
appList = append(appList[:2], appList[3:]...)
appDistribution = getAppDistribution(getClusters, getApps)
assert.Equal(t, int64(1), appDistribution["cluster1"])
assert.Equal(t, int64(2), appDistribution["cluster2"])
assert.Equal(t, int64(1), appDistribution["cluster3"])
assert.Equal(t, int64(1), appDistribution["cluster4"])
}
func createTestApps() (appAccessor, v1alpha1.Application, v1alpha1.Application, v1alpha1.Application, v1alpha1.Application, v1alpha1.Application) {
app1 := createApp("app1", "cluster1")
app2 := createApp("app2", "cluster1")
app3 := createApp("app3", "cluster2")
app4 := createApp("app4", "cluster2")
app5 := createApp("app5", "cluster3")
apps := []v1alpha1.Application{app1, app2, app3, app4, app5}
return getAppAccessor(apps), app1, app2, app3, app4, app5
}
func getAppAccessor(apps []v1alpha1.Application) appAccessor {
// Convert the array to a slice of pointers
appPointers := getAppPointers(apps)
appAccessor := func() []*v1alpha1.Application { return appPointers }
return appAccessor
}
func getAppPointers(apps []v1alpha1.Application) []*v1alpha1.Application {
var appPointers []*v1alpha1.Application
for i := range apps {
appPointers = append(appPointers, &apps[i])
}
return appPointers
}
func createApp(name string, server string) v1alpha1.Application {
testApp := `
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: ` + name + `
spec:
destination:
server: ` + server + `
`
var app v1alpha1.Application
err := yaml.Unmarshal([]byte(testApp), &app)
if err != nil {
panic(err)
}
return app
}