feat: improve api-server and controller performance (#3222)

* group read comparison settings during app reconciliation
* Reduce lock contention in clusterInfo::ensureSynced(). Add getRepoObj stats
* Remove additional source of lock contention
* Exclude the coordination.k8s.io/Lease resource

Co-authored-by: Alexander Matyushentsev <amatyushentsev@gmail.com>
This commit is contained in:
Jesse Suen
2020-03-16 11:51:59 -07:00
committed by GitHub
parent 487d6647d5
commit 476b09cbbf
19 changed files with 338 additions and 225 deletions

2
Gopkg.lock generated
View File

@@ -72,6 +72,7 @@
"errors",
"exec",
"rand",
"stats",
"time",
]
pruneopts = ""
@@ -1930,6 +1931,7 @@
"github.com/TomOnTime/utfutil",
"github.com/argoproj/pkg/errors",
"github.com/argoproj/pkg/exec",
"github.com/argoproj/pkg/stats",
"github.com/argoproj/pkg/time",
"github.com/casbin/casbin",
"github.com/casbin/casbin/model",

View File

@@ -181,7 +181,7 @@ test:
.PHONY: test-e2e
test-e2e:
# NO_PROXY ensures all tests don't go out through a proxy if one is configured on the test system
NO_PROXY=* ./hack/test.sh -timeout 15m ./test/e2e
NO_PROXY=* ./hack/test.sh -timeout 15m -v ./test/e2e
.PHONY: start-e2e
start-e2e: cli

View File

@@ -6,6 +6,7 @@ import (
"os"
"time"
"github.com/argoproj/pkg/stats"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
@@ -25,7 +26,6 @@ import (
"github.com/argoproj/argo-cd/util/cli"
"github.com/argoproj/argo-cd/util/kube"
"github.com/argoproj/argo-cd/util/settings"
"github.com/argoproj/argo-cd/util/stats"
)
const (

View File

@@ -7,6 +7,7 @@ import (
"os"
"time"
"github.com/argoproj/pkg/stats"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@@ -16,7 +17,6 @@ import (
reposervercache "github.com/argoproj/argo-cd/reposerver/cache"
"github.com/argoproj/argo-cd/reposerver/metrics"
"github.com/argoproj/argo-cd/util/cli"
"github.com/argoproj/argo-cd/util/stats"
"github.com/argoproj/argo-cd/util/tls"
)

View File

@@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/argoproj/pkg/stats"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
@@ -15,7 +16,6 @@ import (
"github.com/argoproj/argo-cd/server"
servercache "github.com/argoproj/argo-cd/server/cache"
"github.com/argoproj/argo-cd/util/cli"
"github.com/argoproj/argo-cd/util/stats"
"github.com/argoproj/argo-cd/util/tls"
)

View File

@@ -821,22 +821,20 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
return
}
app := origApp.DeepCopy()
logCtx := log.WithFields(log.Fields{"application": app.Name})
startTime := time.Now()
defer func() {
reconcileDuration := time.Since(startTime)
ctrl.metricsServer.IncReconcile(origApp, reconcileDuration)
logCtx := log.WithFields(log.Fields{
"application": origApp.Name,
"time_ms": reconcileDuration.Seconds() * 1e3,
logCtx.WithFields(log.Fields{
"time_ms": reconcileDuration.Milliseconds(),
"level": comparisonLevel,
"dest-server": origApp.Spec.Destination.Server,
"dest-namespace": origApp.Spec.Destination.Namespace,
})
logCtx.Info("Reconciliation completed")
}).Info("Reconciliation completed")
}()
app := origApp.DeepCopy()
logCtx := log.WithFields(log.Fields{"application": app.Name})
if comparisonLevel == ComparisonWithNothing {
managedResources := make([]*appv1.ResourceDiff, 0)
if err := ctrl.cache.GetAppManagedResources(app.Name, &managedResources); err != nil {
@@ -888,6 +886,9 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
observedAt := metav1.Now()
compareResult := ctrl.appStateManager.CompareAppState(app, project, revision, app.Spec.Source, refreshType == appv1.RefreshTypeHard, localManifests)
for k, v := range compareResult.timings {
logCtx = logCtx.WithField(k, v.Milliseconds())
}
ctrl.normalizeApplication(origApp, app)
@@ -912,7 +913,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
)
}
} else {
logCtx.Infof("Sync prevented by sync window")
logCtx.Info("Sync prevented by sync window")
}
if app.Status.ReconciledAt == nil || comparisonLevel == CompareWithLatest {

View File

@@ -70,7 +70,7 @@ func NewLiveStateCache(
appInformer: appInformer,
db: db,
clusters: make(map[string]*clusterInfo),
lock: &sync.Mutex{},
lock: &sync.RWMutex{},
onObjectUpdated: onObjectUpdated,
kubectl: kubectl,
settingsMgr: settingsMgr,
@@ -82,7 +82,7 @@ func NewLiveStateCache(
type liveStateCache struct {
db db.ArgoDB
clusters map[string]*clusterInfo
lock *sync.Mutex
lock *sync.RWMutex
appInformer cache.SharedIndexInformer
onObjectUpdated ObjectUpdatedHandler
kubectl kube.Kubectl
@@ -109,31 +109,35 @@ func (c *liveStateCache) loadCacheSettings() (*cacheSettings, error) {
}
func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) {
c.lock.Lock()
defer c.lock.Unlock()
c.lock.RLock()
info, ok := c.clusters[server]
c.lock.RUnlock()
if !ok {
logCtx := log.WithField("server", server)
logCtx.Info("initializing cluster")
cluster, err := c.db.GetCluster(context.Background(), server)
if err != nil {
return nil, err
}
info = &clusterInfo{
apisMeta: make(map[schema.GroupKind]*apiMeta),
lock: &sync.Mutex{},
lock: &sync.RWMutex{},
nodes: make(map[kube.ResourceKey]*node),
nsIndex: make(map[string]map[kube.ResourceKey]*node),
onObjectUpdated: c.onObjectUpdated,
kubectl: c.kubectl,
cluster: cluster,
syncTime: nil,
log: log.WithField("server", cluster.Server),
log: logCtx,
cacheSettingsSrc: c.getCacheSettings,
onEventReceived: func(event watch.EventType, un *unstructured.Unstructured) {
c.metricsServer.IncClusterEventsCount(cluster.Server)
gvk := un.GroupVersionKind()
c.metricsServer.IncClusterEventsCount(cluster.Server, gvk.Group, gvk.Kind)
},
}
c.lock.Lock()
c.clusters[cluster.Server] = info
c.lock.Unlock()
}
return info, nil
}
@@ -152,8 +156,8 @@ func (c *liveStateCache) getSyncedCluster(server string) (*clusterInfo, error) {
func (c *liveStateCache) Invalidate() {
log.Info("invalidating live state cache")
c.lock.Lock()
defer c.lock.Unlock()
c.lock.RLock()
defer c.lock.RLock()
for _, clust := range c.clusters {
clust.invalidate()
}
@@ -210,8 +214,6 @@ func isClusterHasApps(apps []interface{}, cluster *appv1.Cluster) bool {
}
func (c *liveStateCache) getCacheSettings() *cacheSettings {
c.cacheSettingsLock.Lock()
defer c.cacheSettingsLock.Unlock()
return c.cacheSettings
}
@@ -261,8 +263,9 @@ func (c *liveStateCache) Run(ctx context.Context) error {
util.RetryUntilSucceed(func() error {
clusterEventCallback := func(event *db.ClusterEvent) {
c.lock.Lock()
defer c.lock.Unlock()
if cluster, ok := c.clusters[event.Cluster.Server]; ok {
cluster, ok := c.clusters[event.Cluster.Server]
if ok {
defer c.lock.Unlock()
if event.Type == watch.Deleted {
cluster.invalidate()
delete(c.clusters, event.Cluster.Server)
@@ -270,11 +273,14 @@ func (c *liveStateCache) Run(ctx context.Context) error {
cluster.cluster = event.Cluster
cluster.invalidate()
}
} else if event.Type == watch.Added && isClusterHasApps(c.appInformer.GetStore().List(), event.Cluster) {
go func() {
// warm up cache for cluster with apps
_, _ = c.getSyncedCluster(event.Cluster.Server)
}()
} else {
c.lock.Unlock()
if event.Type == watch.Added && isClusterHasApps(c.appInformer.GetStore().List(), event.Cluster) {
go func() {
// warm up cache for cluster with apps
_, _ = c.getSyncedCluster(event.Cluster.Server)
}()
}
}
}
@@ -287,8 +293,8 @@ func (c *liveStateCache) Run(ctx context.Context) error {
}
func (c *liveStateCache) GetClustersInfo() []metrics.ClusterInfo {
c.lock.Lock()
defer c.lock.Unlock()
c.lock.RLock()
defer c.lock.RUnlock()
res := make([]metrics.ClusterInfo, 0)
for _, info := range c.clusters {
res = append(res, info.getClusterInfo())

View File

@@ -11,11 +11,11 @@ import (
func TestGetServerVersion(t *testing.T) {
now := time.Now()
cache := &liveStateCache{
lock: &sync.Mutex{},
lock: &sync.RWMutex{},
clusters: map[string]*clusterInfo{
"http://localhost": {
syncTime: &now,
lock: &sync.Mutex{},
lock: &sync.RWMutex{},
serverVersion: "123",
},
}}

View File

@@ -9,19 +9,16 @@ import (
"sync"
"time"
"k8s.io/client-go/dynamic"
"k8s.io/apimachinery/pkg/types"
"github.com/argoproj/argo-cd/controller/metrics"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"github.com/argoproj/argo-cd/controller/metrics"
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/util"
"github.com/argoproj/argo-cd/util/health"
@@ -45,8 +42,11 @@ type clusterInfo struct {
syncError error
apisMeta map[schema.GroupKind]*apiMeta
serverVersion string
// namespacedResources is a simple map which indicates a groupKind is namespaced
namespacedResources map[schema.GroupKind]bool
lock *sync.Mutex
// lock is a rw lock which protects the fields of clusterInfo
lock *sync.RWMutex
nodes map[kube.ResourceKey]*node
nsIndex map[string]map[kube.ResourceKey]*node
@@ -176,16 +176,19 @@ func (c *clusterInfo) invalidate() {
c.apisMeta[i].watchCancel()
}
c.apisMeta = nil
c.namespacedResources = nil
c.log.Warnf("invalidated cluster")
}
func (c *clusterInfo) synced() bool {
if c.syncTime == nil {
syncTime := c.syncTime
if syncTime == nil {
return false
}
if c.syncError != nil {
return time.Now().Before(c.syncTime.Add(clusterRetryTimeout))
return time.Now().Before(syncTime.Add(clusterRetryTimeout))
}
return time.Now().Before(c.syncTime.Add(clusterSyncTimeout))
return time.Now().Before(syncTime.Add(clusterSyncTimeout))
}
func (c *clusterInfo) stopWatching(gk schema.GroupKind, ns string) {
@@ -195,7 +198,7 @@ func (c *clusterInfo) stopWatching(gk schema.GroupKind, ns string) {
info.watchCancel()
delete(c.apisMeta, gk)
c.replaceResourceCache(gk, "", []unstructured.Unstructured{}, ns)
log.Warnf("Stop watching %s not found on %s.", gk, c.cluster.Server)
c.log.Warnf("Stop watching: %s not found", gk)
}
}
@@ -211,9 +214,10 @@ func (c *clusterInfo) startMissingWatches() error {
if err != nil {
return err
}
namespacedResources := make(map[schema.GroupKind]bool)
for i := range apis {
api := apis[i]
namespacedResources[api.GroupKind] = api.Meta.Namespaced
if _, ok := c.apisMeta[api.GroupKind]; !ok {
ctx, cancel := context.WithCancel(context.Background())
info := &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}
@@ -228,10 +232,11 @@ func (c *clusterInfo) startMissingWatches() error {
}
}
}
c.namespacedResources = namespacedResources
return nil
}
func runSynced(lock *sync.Mutex, action func() error) error {
func runSynced(lock sync.Locker, action func() error) error {
lock.Lock()
defer lock.Unlock()
return action()
@@ -265,15 +270,10 @@ func (c *clusterInfo) watchEvents(ctx context.Context, api kube.APIResourceInfo,
c.stopWatching(api.GroupKind, ns)
return nil
}
err = runSynced(c.lock, func() error {
if errors.IsGone(err) {
info.resourceVersion = ""
log.Warnf("Resource version of %s on %s is too old.", api.GroupKind, c.cluster.Server)
}
return err
})
if errors.IsGone(err) {
info.resourceVersion = ""
c.log.Warnf("Resource version of %s is too old", api.GroupKind)
}
if err != nil {
return err
}
@@ -304,7 +304,7 @@ func (c *clusterInfo) watchEvents(ctx context.Context, api kube.APIResourceInfo,
}
}
if err != nil {
log.Warnf("Failed to start missing watch: %v", err)
c.log.Warnf("Failed to start missing watch: %v", err)
}
} else {
return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.cluster.Server)
@@ -388,12 +388,17 @@ func (c *clusterInfo) sync() (err error) {
}
func (c *clusterInfo) ensureSynced() error {
c.lock.Lock()
defer c.lock.Unlock()
// first check if cluster is synced *without lock*
if c.synced() {
return c.syncError
}
c.lock.Lock()
defer c.lock.Unlock()
// before doing any work, check once again now that we have the lock, to see if it got
// synced between the first check and now
if c.synced() {
return c.syncError
}
err := c.sync()
syncTime := time.Now()
c.syncTime = &syncTime
@@ -402,8 +407,8 @@ func (c *clusterInfo) ensureSynced() error {
}
func (c *clusterInfo) getNamespaceTopLevelResources(namespace string) map[kube.ResourceKey]appv1.ResourceNode {
c.lock.Lock()
defer c.lock.Unlock()
c.lock.RLock()
defer c.lock.RUnlock()
nodes := make(map[kube.ResourceKey]appv1.ResourceNode)
for _, node := range c.nsIndex[namespace] {
if len(node.ownerRefs) == 0 {
@@ -444,15 +449,17 @@ func (c *clusterInfo) iterateHierarchy(key kube.ResourceKey, action func(child a
}
func (c *clusterInfo) isNamespaced(gk schema.GroupKind) bool {
if api, ok := c.apisMeta[gk]; ok && !api.namespaced {
return false
// this is safe to access without a lock since we always replace the entire map instead of mutating keys
if isNamespaced, ok := c.namespacedResources[gk]; ok {
return isNamespaced
}
log.Warnf("group/kind %s scope is unknown (known objects: %d). assuming namespaced object", gk, len(c.namespacedResources))
return true
}
func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*unstructured.Unstructured, metricsServer *metrics.MetricsServer) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
c.lock.Lock()
defer c.lock.Unlock()
c.lock.RLock()
defer c.lock.RUnlock()
managedObjs := make(map[kube.ResourceKey]*unstructured.Unstructured)
// iterate all objects in live state cache to find ones associated with app
@@ -462,7 +469,7 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
}
}
config := metrics.AddMetricsTransportWrapper(metricsServer, a, c.cluster.RESTConfig())
// iterate target objects and identify ones that already exist in the cluster,\
// iterate target objects and identify ones that already exist in the cluster,
// but are simply missing our label
lock := &sync.Mutex{}
err := util.RunAllAsync(len(targetObjs), func(i int) error {
@@ -586,8 +593,8 @@ var (
)
func (c *clusterInfo) getClusterInfo() metrics.ClusterInfo {
c.lock.Lock()
defer c.lock.Unlock()
c.lock.RLock()
defer c.lock.RUnlock()
return metrics.ClusterInfo{
APIsCount: len(c.apisMeta),
K8SVersion: c.serverVersion,

View File

@@ -153,7 +153,7 @@ func newCluster(objs ...*unstructured.Unstructured) *clusterInfo {
func newClusterExt(kubectl kube.Kubectl) *clusterInfo {
return &clusterInfo{
lock: &sync.Mutex{},
lock: &sync.RWMutex{},
nodes: make(map[kube.ResourceKey]*node),
onObjectUpdated: func(managedByApp map[string]bool, reference corev1.ObjectReference) {},
kubectl: kubectl,

View File

@@ -119,7 +119,7 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, health
clusterEventsCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "argocd_cluster_events_total",
Help: "Number of processes k8s resource events.",
}, descClusterDefaultLabels)
}, append(descClusterDefaultLabels, "group", "kind"))
registry.MustRegister(clusterEventsCounter)
return &MetricsServer{
@@ -164,8 +164,8 @@ func (m *MetricsServer) DecKubectlExecPending(command string) {
}
// IncClusterEventsCount increments the number of cluster events
func (m *MetricsServer) IncClusterEventsCount(server string) {
m.clusterEventsCounter.WithLabelValues(server).Inc()
func (m *MetricsServer) IncClusterEventsCount(server, group, kind string) {
m.clusterEventsCounter.WithLabelValues(server, group, kind).Inc()
}
// IncKubernetesRequest increments the kubernetes requests counter for an application

View File

@@ -31,6 +31,7 @@ import (
"github.com/argoproj/argo-cd/util/resource"
"github.com/argoproj/argo-cd/util/resource/ignore"
"github.com/argoproj/argo-cd/util/settings"
"github.com/argoproj/argo-cd/util/stats"
)
type managedResource struct {
@@ -71,6 +72,8 @@ type comparisonResult struct {
hooks []*unstructured.Unstructured
diffNormalizer diff.Normalizer
appSourceType v1alpha1.ApplicationSourceType
// timings maps phases of comparison to the duration it took to complete (for statistical purposes)
timings map[string]time.Duration
}
func (cr *comparisonResult) targetObjs() []*unstructured.Unstructured {
@@ -97,14 +100,17 @@ type appStateManager struct {
}
func (m *appStateManager) getRepoObjs(app *v1alpha1.Application, source v1alpha1.ApplicationSource, appLabelKey, revision string, noCache bool) ([]*unstructured.Unstructured, []*unstructured.Unstructured, *apiclient.ManifestResponse, error) {
ts := stats.NewTimingStats()
helmRepos, err := m.db.ListHelmRepositories(context.Background())
if err != nil {
return nil, nil, nil, err
}
ts.AddCheckpoint("helm_ms")
repo, err := m.db.GetRepository(context.Background(), source.RepoURL)
if err != nil {
return nil, nil, nil, err
}
ts.AddCheckpoint("repo_ms")
conn, repoClient, err := m.repoClientset.NewRepoServerClient()
if err != nil {
return nil, nil, nil, err
@@ -119,7 +125,7 @@ func (m *appStateManager) getRepoObjs(app *v1alpha1.Application, source v1alpha1
if err != nil {
return nil, nil, nil, err
}
ts.AddCheckpoint("plugins_ms")
tools := make([]*appv1.ConfigManagementPlugin, len(plugins))
for i := range plugins {
tools[i] = &plugins[i]
@@ -129,10 +135,12 @@ func (m *appStateManager) getRepoObjs(app *v1alpha1.Application, source v1alpha1
if err != nil {
return nil, nil, nil, err
}
ts.AddCheckpoint("build_options_ms")
serverVersion, err := m.liveStateCache.GetServerVersion(app.Spec.Destination.Server)
if err != nil {
return nil, nil, nil, err
}
ts.AddCheckpoint("version_ms")
manifestInfo, err := repoClient.GenerateManifest(context.Background(), &apiclient.ManifestRequest{
Repo: repo,
Repos: helmRepos,
@@ -151,10 +159,18 @@ func (m *appStateManager) getRepoObjs(app *v1alpha1.Application, source v1alpha1
if err != nil {
return nil, nil, nil, err
}
ts.AddCheckpoint("manifests_ms")
targetObjs, hooks, err := unmarshalManifests(manifestInfo.Manifests)
if err != nil {
return nil, nil, nil, err
}
ts.AddCheckpoint("unmarshal_ms")
logCtx := log.WithField("application", app.Name)
for k, v := range ts.Timings() {
logCtx = logCtx.WithField(k, v.Milliseconds())
}
logCtx = logCtx.WithField("time_ms", time.Since(ts.StartTime).Milliseconds())
logCtx.Info("getRepoObjs stats")
return targetObjs, hooks, manifestInfo, nil
}
@@ -253,27 +269,33 @@ func dedupLiveResources(targetObjs []*unstructured.Unstructured, liveObjsByKey m
}
}
func (m *appStateManager) getComparisonSettings(app *appv1.Application) (string, map[string]v1alpha1.ResourceOverride, diff.Normalizer, error) {
func (m *appStateManager) getComparisonSettings(app *appv1.Application) (string, map[string]v1alpha1.ResourceOverride, diff.Normalizer, *settings.ResourcesFilter, error) {
resourceOverrides, err := m.settingsMgr.GetResourceOverrides()
if err != nil {
return "", nil, nil, err
return "", nil, nil, nil, err
}
appLabelKey, err := m.settingsMgr.GetAppInstanceLabelKey()
if err != nil {
return "", nil, nil, err
return "", nil, nil, nil, err
}
diffNormalizer, err := argo.NewDiffNormalizer(app.Spec.IgnoreDifferences, resourceOverrides)
if err != nil {
return "", nil, nil, err
return "", nil, nil, nil, err
}
return appLabelKey, resourceOverrides, diffNormalizer, nil
resFilter, err := m.settingsMgr.GetResourcesFilter()
if err != nil {
return "", nil, nil, nil, err
}
return appLabelKey, resourceOverrides, diffNormalizer, resFilter, nil
}
// CompareAppState compares application git state to the live app state, using the specified
// revision and supplied source. If revision or overrides are empty, then compares against
// revision and overrides in the app spec.
func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *appv1.AppProject, revision string, source v1alpha1.ApplicationSource, noCache bool, localManifests []string) *comparisonResult {
appLabelKey, resourceOverrides, diffNormalizer, err := m.getComparisonSettings(app)
ts := stats.NewTimingStats()
appLabelKey, resourceOverrides, diffNormalizer, resFilter, err := m.getComparisonSettings(app)
ts.AddCheckpoint("settings_ms")
// return unknown comparison result if basic comparison settings cannot be loaded
if err != nil {
@@ -314,32 +336,27 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap
}
manifestInfo = nil
}
ts.AddCheckpoint("git_ms")
targetObjs, dedupConditions, err := DeduplicateTargetObjects(app.Spec.Destination.Server, app.Spec.Destination.Namespace, targetObjs, m.liveStateCache)
if err != nil {
conditions = append(conditions, v1alpha1.ApplicationCondition{Type: v1alpha1.ApplicationConditionComparisonError, Message: err.Error(), LastTransitionTime: &now})
}
conditions = append(conditions, dedupConditions...)
resFilter, err := m.settingsMgr.GetResourcesFilter()
if err != nil {
conditions = append(conditions, v1alpha1.ApplicationCondition{Type: v1alpha1.ApplicationConditionComparisonError, Message: err.Error(), LastTransitionTime: &now})
} else {
for i := len(targetObjs) - 1; i >= 0; i-- {
targetObj := targetObjs[i]
gvk := targetObj.GroupVersionKind()
if resFilter.IsExcludedResource(gvk.Group, gvk.Kind, app.Spec.Destination.Server) {
targetObjs = append(targetObjs[:i], targetObjs[i+1:]...)
conditions = append(conditions, v1alpha1.ApplicationCondition{
Type: v1alpha1.ApplicationConditionExcludedResourceWarning,
Message: fmt.Sprintf("Resource %s/%s %s is excluded in the settings", gvk.Group, gvk.Kind, targetObj.GetName()),
LastTransitionTime: &now,
})
}
for i := len(targetObjs) - 1; i >= 0; i-- {
targetObj := targetObjs[i]
gvk := targetObj.GroupVersionKind()
if resFilter.IsExcludedResource(gvk.Group, gvk.Kind, app.Spec.Destination.Server) {
targetObjs = append(targetObjs[:i], targetObjs[i+1:]...)
conditions = append(conditions, v1alpha1.ApplicationCondition{
Type: v1alpha1.ApplicationConditionExcludedResourceWarning,
Message: fmt.Sprintf("Resource %s/%s %s is excluded in the settings", gvk.Group, gvk.Kind, targetObj.GetName()),
LastTransitionTime: &now,
})
}
}
ts.AddCheckpoint("dedup_ms")
logCtx.Debugf("Generated config manifests")
liveObjByKey, err := m.liveStateCache.GetManagedLiveObjs(app, targetObjs)
if err != nil {
liveObjByKey = make(map[kubeutil.ResourceKey]*unstructured.Unstructured)
@@ -354,7 +371,6 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap
}
}
logCtx.Debugf("Retrieved lived manifests")
for _, liveObj := range liveObjByKey {
if liveObj != nil {
appInstanceName := kubeutil.GetAppInstanceLabel(liveObj, appLabelKey)
@@ -383,7 +399,8 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap
managedLiveObj[i] = nil
}
}
logCtx.Debugf("built managed objects list")
ts.AddCheckpoint("live_ms")
// Everything remaining in liveObjByKey are "extra" resources that aren't tracked in git.
// The following adds all the extras to the managedLiveObj list and backfills the targetObj
// list with nils, so that the lists are of equal lengths for comparison purposes.
@@ -399,6 +416,7 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap
failedToLoadObjs = true
conditions = append(conditions, v1alpha1.ApplicationCondition{Type: v1alpha1.ApplicationConditionComparisonError, Message: err.Error(), LastTransitionTime: &now})
}
ts.AddCheckpoint("diff_ms")
syncCode := v1alpha1.SyncStatusCodeSynced
managedResources := make([]managedResource, len(targetObjs))
@@ -488,6 +506,7 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap
if manifestInfo != nil {
syncStatus.Revision = manifestInfo.Revision
}
ts.AddCheckpoint("sync_ms")
healthStatus, err := health.SetApplicationHealth(resourceSummaries, GetLiveObjs(managedResources), resourceOverrides, func(obj *unstructured.Unstructured) bool {
return !isSelfReferencedApp(app, kubeutil.GetObjectRef(obj))
@@ -514,6 +533,8 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap
appv1.ApplicationConditionRepeatedResourceWarning: true,
appv1.ApplicationConditionExcludedResourceWarning: true,
})
ts.AddCheckpoint("health_ms")
compRes.timings = ts.Timings()
return &compRes
}

View File

@@ -20,4 +20,4 @@ report() {
trap 'report' EXIT
go test -v $* 2>&1 | tee $TEST_RESULTS/test.out
go test -failfast $* 2>&1 | tee $TEST_RESULTS/test.out

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"time"
@@ -20,6 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
@@ -30,6 +32,7 @@ import (
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
applisters "github.com/argoproj/argo-cd/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/reposerver/apiclient"
servercache "github.com/argoproj/argo-cd/server/cache"
"github.com/argoproj/argo-cd/server/rbacpolicy"
@@ -52,6 +55,7 @@ type Server struct {
ns string
kubeclientset kubernetes.Interface
appclientset appclientset.Interface
appLister applisters.ApplicationNamespaceLister
repoClientset apiclient.Clientset
kubectl kube.Kubectl
db db.ArgoDB
@@ -67,6 +71,7 @@ func NewServer(
namespace string,
kubeclientset kubernetes.Interface,
appclientset appclientset.Interface,
appLister applisters.ApplicationNamespaceLister,
repoClientset apiclient.Clientset,
cache *servercache.Cache,
kubectl kube.Kubectl,
@@ -79,6 +84,7 @@ func NewServer(
return &Server{
ns: namespace,
appclientset: appclientset,
appLister: appLister,
kubeclientset: kubeclientset,
cache: cache,
db: db,
@@ -98,23 +104,25 @@ func appRBACName(app appv1.Application) string {
// List returns list of applications
func (s *Server) List(ctx context.Context, q *application.ApplicationQuery) (*appv1.ApplicationList, error) {
appList, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).List(metav1.ListOptions{LabelSelector: q.Selector})
labelsMap, err := labels.ConvertSelectorToLabelsMap(q.Selector)
if err != nil {
return nil, err
}
apps, err := s.appLister.List(labelsMap.AsSelector())
if err != nil {
return nil, err
}
newItems := make([]appv1.Application, 0)
for _, a := range appList.Items {
if s.enf.Enforce(ctx.Value("claims"), rbacpolicy.ResourceApplications, rbacpolicy.ActionGet, appRBACName(a)) {
newItems = append(newItems, a)
for _, a := range apps {
if s.enf.Enforce(ctx.Value("claims"), rbacpolicy.ResourceApplications, rbacpolicy.ActionGet, appRBACName(*a)) {
newItems = append(newItems, *a)
}
}
newItems = argoutil.FilterByProjects(newItems, q.Projects)
for i := range newItems {
app := newItems[i]
newItems[i] = app
appList := appv1.ApplicationList{
Items: newItems,
}
appList.Items = newItems
return appList, nil
return &appList, nil
}
// Create creates an application
@@ -131,39 +139,44 @@ func (s *Server) Create(ctx context.Context, q *application.ApplicationCreateReq
if err != nil {
return nil, err
}
out, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Create(&a)
if apierr.IsAlreadyExists(err) {
// act idempotent if existing spec matches new spec
existing, getErr := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(a.Name, metav1.GetOptions{})
if getErr != nil {
return nil, status.Errorf(codes.Internal, "unable to check existing application details: %v", getErr)
}
if q.Upsert != nil && *q.Upsert {
if err := s.enf.EnforceErr(ctx.Value("claims"), rbacpolicy.ResourceApplications, rbacpolicy.ActionUpdate, appRBACName(a)); err != nil {
return nil, err
}
out, err = s.updateApp(existing, &a, ctx, true)
} else {
if !reflect.DeepEqual(existing.Spec, a.Spec) ||
!reflect.DeepEqual(existing.Labels, a.Labels) ||
!reflect.DeepEqual(existing.Annotations, a.Annotations) ||
!reflect.DeepEqual(existing.Finalizers, a.Finalizers) {
return nil, status.Errorf(codes.InvalidArgument, "existing application spec is different, use upsert flag to force update")
}
return existing, nil
}
}
created, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Create(&a)
if err == nil {
s.logAppEvent(out, ctx, argo.EventReasonResourceCreated, "created application")
s.logAppEvent(created, ctx, argo.EventReasonResourceCreated, "created application")
s.waitSync(created)
return created, nil
}
return out, err
if !apierr.IsAlreadyExists(err) {
return nil, err
}
// act idempotent if existing spec matches new spec
existing, err := s.appLister.Get(a.Name)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to check existing application details: %v", err)
}
equalSpecs := reflect.DeepEqual(existing.Spec, a.Spec) &&
reflect.DeepEqual(existing.Labels, a.Labels) &&
reflect.DeepEqual(existing.Annotations, a.Annotations) &&
reflect.DeepEqual(existing.Finalizers, a.Finalizers)
if equalSpecs {
return existing, nil
}
if q.Upsert == nil || !*q.Upsert {
return nil, status.Errorf(codes.InvalidArgument, "existing application spec is different, use upsert flag to force update")
}
if err := s.enf.EnforceErr(ctx.Value("claims"), rbacpolicy.ResourceApplications, rbacpolicy.ActionUpdate, appRBACName(a)); err != nil {
return nil, err
}
updated, err := s.updateApp(existing, &a, ctx, true)
if err != nil {
return nil, err
}
return updated, nil
}
// GetManifests returns application manifests
func (s *Server) GetManifests(ctx context.Context, q *application.ApplicationManifestQuery) (*apiclient.ManifestResponse, error) {
a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.Name, metav1.GetOptions{})
a, err := s.appLister.Get(*q.Name)
if err != nil {
return nil, err
}
@@ -251,8 +264,10 @@ func (s *Server) GetManifests(ctx context.Context, q *application.ApplicationMan
// Get returns an application by name
func (s *Server) Get(ctx context.Context, q *application.ApplicationQuery) (*appv1.Application, error) {
appIf := s.appclientset.ArgoprojV1alpha1().Applications(s.ns)
a, err := appIf.Get(*q.Name, metav1.GetOptions{})
// We must use a client Get instead of an informer Get, because it's common to call Get immediately
// following a Watch (which is not yet powered by an informer), and the Get must reflect what was
// previously seen by the client.
a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
@@ -264,6 +279,7 @@ func (s *Server) Get(ctx context.Context, q *application.ApplicationQuery) (*app
if *q.Refresh == string(appv1.RefreshTypeHard) {
refreshType = appv1.RefreshTypeHard
}
appIf := s.appclientset.ArgoprojV1alpha1().Applications(s.ns)
_, err = argoutil.RefreshApp(appIf, *q.Name, refreshType)
if err != nil {
return nil, err
@@ -278,7 +294,7 @@ func (s *Server) Get(ctx context.Context, q *application.ApplicationQuery) (*app
// ListResourceEvents returns a list of event resources
func (s *Server) ListResourceEvents(ctx context.Context, q *application.ApplicationResourceEventsQuery) (*v1.EventList, error) {
a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.Name, metav1.GetOptions{})
a, err := s.appLister.Get(*q.Name)
if err != nil {
return nil, err
}
@@ -354,6 +370,39 @@ func mergeStringMaps(items ...map[string]string) map[string]string {
return res
}
var informerSyncTimeout = 2 * time.Second
// waitSync is a helper to wait until the application informer cache is synced after create/update.
// It waits until the app in the informer, has a resource version greater than the version in the
// supplied app, or after 2 seconds, whichever comes first. Returns true if synced.
// We use an informer cache for read operations (Get, List). Since the cache is only
// eventually consistent, it is possible that it doesn't reflect an application change immediately
// after a mutating API call (create/update). This function should be called after a creates &
// update to give a probable (but not guaranteed) chance of being up-to-date after the create/update.
func (s *Server) waitSync(app *appv1.Application) {
logCtx := log.WithField("application", app.Name)
deadline := time.Now().Add(informerSyncTimeout)
minVersion, err := strconv.Atoi(app.ResourceVersion)
if err != nil {
logCtx.Warnf("waitSync failed: could not parse resource version %s", app.ResourceVersion)
time.Sleep(50 * time.Millisecond) // sleep anyways
return
}
for {
if currApp, err := s.appLister.Get(app.Name); err == nil {
currVersion, err := strconv.Atoi(currApp.ResourceVersion)
if err == nil && currVersion >= minVersion {
return
}
}
if time.Now().After(deadline) {
break
}
time.Sleep(20 * time.Millisecond)
}
logCtx.Warnf("waitSync failed: timed out")
}
func (s *Server) updateApp(app *appv1.Application, newApp *appv1.Application, ctx context.Context, merge bool) (*appv1.Application, error) {
for i := 0; i < 10; i++ {
app.Spec = newApp.Spec
@@ -370,6 +419,7 @@ func (s *Server) updateApp(app *appv1.Application, newApp *appv1.Application, ct
res, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Update(app)
if err == nil {
s.logAppEvent(app, ctx, argo.EventReasonResourceUpdated, "updated application spec")
s.waitSync(res)
return res, nil
}
if !apierr.IsConflict(err) {
@@ -504,7 +554,6 @@ func (s *Server) Delete(ctx context.Context, q *application.ApplicationDeleteReq
if err != nil && !apierr.IsNotFound(err) {
return nil, err
}
s.logAppEvent(a, ctx, argo.EventReasonResourceDeleted, "deleted application")
return &application.ApplicationResponse{}, nil
}
@@ -688,7 +737,7 @@ func (s *Server) getAppResources(ctx context.Context, a *appv1.Application) (*ap
}
func (s *Server) getAppResource(ctx context.Context, action string, q *application.ApplicationResourceRequest) (*appv1.ResourceNode, *rest.Config, *appv1.Application, error) {
a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.Name, metav1.GetOptions{})
a, err := s.appLister.Get(*q.Name)
if err != nil {
return nil, nil, nil, err
}
@@ -810,7 +859,7 @@ func (s *Server) DeleteResource(ctx context.Context, q *application.ApplicationR
}
func (s *Server) ResourceTree(ctx context.Context, q *application.ResourcesQuery) (*appv1.ApplicationTree, error) {
a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.ApplicationName, metav1.GetOptions{})
a, err := s.appLister.Get(*q.ApplicationName)
if err != nil {
return nil, err
}
@@ -821,7 +870,7 @@ func (s *Server) ResourceTree(ctx context.Context, q *application.ResourcesQuery
}
func (s *Server) RevisionMetadata(ctx context.Context, q *application.RevisionMetadataQuery) (*v1alpha1.RevisionMetadata, error) {
a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(q.GetName(), metav1.GetOptions{})
a, err := s.appLister.Get(q.GetName())
if err != nil {
return nil, err
}
@@ -848,7 +897,7 @@ func isMatchingResource(q *application.ResourcesQuery, key kube.ResourceKey) boo
}
func (s *Server) ManagedResources(ctx context.Context, q *application.ResourcesQuery) (*application.ManagedResourcesResponse, error) {
a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.ApplicationName, metav1.GetOptions{})
a, err := s.appLister.Get(*q.ApplicationName)
if err != nil {
return nil, err
}
@@ -1156,8 +1205,10 @@ func (s *Server) TerminateOperation(ctx context.Context, termOpReq *application.
return nil, status.Errorf(codes.InvalidArgument, "Unable to terminate operation. No operation is in progress")
}
a.Status.OperationState.Phase = appv1.OperationTerminating
_, err = s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Update(a)
updated, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Update(a)
if err == nil {
s.waitSync(updated)
s.logAppEvent(a, ctx, argo.EventReasonResourceUpdated, "terminated running operation")
return &application.OperationTerminateResponse{}, nil
}
if !apierr.IsConflict(err) {
@@ -1169,7 +1220,6 @@ func (s *Server) TerminateOperation(ctx context.Context, termOpReq *application.
if err != nil {
return nil, err
}
s.logAppEvent(a, ctx, argo.EventReasonResourceUpdated, "terminated running operation")
}
return nil, status.Errorf(codes.Internal, "Failed to terminate app. Too many conflicts")
}

View File

@@ -18,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
kubetesting "k8s.io/client-go/testing"
k8scache "k8s.io/client-go/tools/cache"
"github.com/argoproj/argo-cd/common"
"github.com/argoproj/argo-cd/errors"
@@ -92,8 +93,8 @@ func newTestAppServer(objects ...runtime.Object) *Server {
"server.secretkey": []byte("test"),
},
})
db := db.NewDB(testNamespace, settings.NewSettingsManager(context.Background(), kubeclientset, testNamespace), kubeclientset)
ctx := context.Background()
db := db.NewDB(testNamespace, settings.NewSettingsManager(ctx, kubeclientset, testNamespace), kubeclientset)
_, err := db.CreateRepository(ctx, fakeRepo())
errors.CheckError(err)
_, err = db.CreateCluster(ctx, fakeCluster())
@@ -148,12 +149,22 @@ func newTestAppServer(objects ...runtime.Object) *Server {
enforcer.SetDefaultRole("role:admin")
enforcer.SetClaimsEnforcerFunc(rbacpolicy.NewRBACPolicyEnforcer(enforcer, fakeProjLister).EnforceClaims)
settingsMgr := settings.NewSettingsManager(context.Background(), kubeclientset, testNamespace)
settingsMgr := settings.NewSettingsManager(ctx, kubeclientset, testNamespace)
// populate the app informer with the fake objects
appInformer := factory.Argoproj().V1alpha1().Applications().Informer()
// TODO(jessesuen): probably should return cancel function so tests can stop background informer
//ctx, cancel := context.WithCancel(context.Background())
go appInformer.Run(ctx.Done())
if !k8scache.WaitForCacheSync(ctx.Done(), appInformer.HasSynced) {
panic("Timed out waiting forfff caches to sync")
}
server := NewServer(
testNamespace,
kubeclientset,
fakeAppsClientset,
factory.Argoproj().V1alpha1().Applications().Lister().Applications(testNamespace),
mockRepoClient,
nil,
&kubetest.MockKubectlCmd{},

View File

@@ -15,9 +15,6 @@ import (
"strings"
"time"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"github.com/dgrijalva/jwt-go"
golang_proto "github.com/golang/protobuf/proto"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@@ -36,6 +33,8 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
@@ -58,6 +57,7 @@ import (
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
appinformer "github.com/argoproj/argo-cd/pkg/client/informers/externalversions"
applisters "github.com/argoproj/argo-cd/pkg/client/listers/application/v1alpha1"
repoapiclient "github.com/argoproj/argo-cd/reposerver/apiclient"
"github.com/argoproj/argo-cd/server/account"
"github.com/argoproj/argo-cd/server/application"
@@ -128,6 +128,8 @@ type ArgoCDServer struct {
enf *rbac.Enforcer
projInformer cache.SharedIndexInformer
policyEnforcer *rbacpolicy.RBACPolicyEnforcer
appInformer cache.SharedIndexInformer
appLister applisters.ApplicationNamespaceLister
// stopCh is the channel which when closed, will shutdown the Argo CD server
stopCh chan struct{}
@@ -182,6 +184,9 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts) *ArgoCDServer {
projInformer := factory.Argoproj().V1alpha1().AppProjects().Informer()
projLister := factory.Argoproj().V1alpha1().AppProjects().Lister().AppProjects(opts.Namespace)
appInformer := factory.Argoproj().V1alpha1().Applications().Informer()
appLister := factory.Argoproj().V1alpha1().Applications().Lister().Applications(opts.Namespace)
enf := rbac.NewEnforcer(opts.KubeClientset, opts.Namespace, common.ArgoCDRBACConfigMapName, nil)
enf.EnableEnforce(!opts.DisableAuth)
err = enf.SetBuiltinPolicy(assets.BuiltinPolicyCSV)
@@ -199,6 +204,8 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts) *ArgoCDServer {
settingsMgr: settingsMgr,
enf: enf,
projInformer: projInformer,
appInformer: appInformer,
appLister: appLister,
policyEnforcer: policyEnf,
}
}
@@ -267,6 +274,7 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {
common.GetVersion(), port, a.settings.URL, a.useTLS(), a.Namespace, a.settings.IsSSOConfigured())
go a.projInformer.Run(ctx.Done())
go a.appInformer.Run(ctx.Done())
go func() { a.checkServeErr("grpcS", grpcS.Serve(grpcL)) }()
go func() { a.checkServeErr("httpS", httpS.Serve(httpL)) }()
if a.useTLS() {
@@ -277,7 +285,7 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {
go a.rbacPolicyLoader(ctx)
go func() { a.checkServeErr("tcpm", tcpm.Serve()) }()
go func() { a.checkServeErr("metrics", metricsServ.ListenAndServe()) }()
if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced, a.appInformer.HasSynced) {
log.Fatal("Timed out waiting for project cache to sync")
}
@@ -464,7 +472,7 @@ func (a *ArgoCDServer) newGRPCServer() *grpc.Server {
repoCredsService := repocreds.NewServer(a.RepoClientset, db, a.enf, a.settingsMgr)
sessionService := session.NewServer(a.sessionMgr, a)
projectLock := util.NewKeyLock()
applicationService := application.NewServer(a.Namespace, a.KubeClientset, a.AppClientset, a.RepoClientset, a.Cache, kubectl, db, a.enf, projectLock, a.settingsMgr)
applicationService := application.NewServer(a.Namespace, a.KubeClientset, a.AppClientset, a.appLister, a.RepoClientset, a.Cache, kubectl, db, a.enf, projectLock, a.settingsMgr)
projectService := project.NewServer(a.Namespace, a.KubeClientset, a.AppClientset, a.enf, projectLock, a.sessionMgr)
settingsService := settings.NewServer(a.settingsMgr, a.DisableAdmin)
accountService := account.NewServer(a.sessionMgr, a.settingsMgr, a.enf)

View File

@@ -1,5 +1,15 @@
package settings
// The core exclusion list are K8s resources that we assume will never be managed by operators,
// and are never child objects of managed resources that need to be presented in the resource tree.
// This list contains high volume and high churn metadata objects which we exclude for performance
// reasons, reducing connections and load to the K8s API servers of managed clusters.
var coreExcludedResources = []FilteredResource{
{APIGroups: []string{"events.k8s.io", "metrics.k8s.io"}},
{APIGroups: []string{""}, Kinds: []string{"Event", "Node"}},
{APIGroups: []string{"coordination.k8s.io"}, Kinds: []string{"Lease"}},
}
type ResourcesFilter struct {
// ResourceExclusions holds the api groups, kinds per cluster to exclude from Argo CD's watch
ResourceExclusions []FilteredResource
@@ -8,10 +18,6 @@ type ResourcesFilter struct {
}
func (rf *ResourcesFilter) getExcludedResources() []FilteredResource {
coreExcludedResources := []FilteredResource{
{APIGroups: []string{"events.k8s.io", "metrics.k8s.io"}},
{APIGroups: []string{""}, Kinds: []string{"Event"}},
}
return append(coreExcludedResources, rf.ResourceExclusions...)
}

View File

@@ -1,82 +1,52 @@
package stats
import (
"os"
"os/signal"
"runtime"
"runtime/pprof"
"syscall"
"time"
log "github.com/sirupsen/logrus"
)
// StartStatsTicker starts a goroutine which dumps stats at a specified interval
func StartStatsTicker(d time.Duration) {
ticker := time.NewTicker(d)
go func() {
for {
<-ticker.C
LogStats()
}
}()
// mock out time.Now() for unit tests
var now = time.Now
// TimingStats is a helper to breakdown the timing of an expensive function call
// Usage:
// ts := NewTimingStats()
// ts.AddCheckpoint("checkpoint-1")
// ...
// ts.AddCheckpoint("checkpoint-2")
// ...
// ts.AddCheckpoint("checkpoint-3")
// ts.Timings()
type TimingStats struct {
StartTime time.Time
checkpoints []tsCheckpoint
}
// RegisterStackDumper spawns a goroutine which dumps stack trace upon a SIGUSR1
func RegisterStackDumper() {
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGUSR1)
for {
<-sigs
LogStack()
}
}()
type tsCheckpoint struct {
name string
time time.Time
}
// RegisterHeapDumper spawns a goroutine which dumps heap profile upon a SIGUSR2
func RegisterHeapDumper(filePath string) {
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGUSR2)
for {
<-sigs
runtime.GC()
if _, err := os.Stat(filePath); err == nil {
err = os.Remove(filePath)
if err != nil {
log.Warnf("could not delete heap profile file: %v", err)
return
}
}
f, err := os.Create(filePath)
if err != nil {
log.Warnf("could not create heap profile file: %v", err)
return
}
if err := pprof.WriteHeapProfile(f); err != nil {
log.Warnf("could not write heap profile: %v", err)
return
} else {
log.Infof("dumped heap profile to %s", filePath)
}
}
}()
func NewTimingStats() *TimingStats {
return &TimingStats{
StartTime: now(),
}
}
// LogStats logs runtime statistics
func LogStats() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Infof("Alloc=%v TotalAlloc=%v Sys=%v NumGC=%v Goroutines=%d",
m.Alloc/1024, m.TotalAlloc/1024, m.Sys/1024, m.NumGC, runtime.NumGoroutine())
func (t *TimingStats) AddCheckpoint(name string) {
cp := tsCheckpoint{
name: name,
time: now(),
}
t.checkpoints = append(t.checkpoints, cp)
}
// LogStack will log the current stack
func LogStack() {
buf := make([]byte, 1<<20)
stacklen := runtime.Stack(buf, true)
log.Infof("*** goroutine dump...\n%s\n*** end\n", buf[:stacklen])
func (t *TimingStats) Timings() map[string]time.Duration {
timings := make(map[string]time.Duration)
prev := t.StartTime
for _, cp := range t.checkpoints {
timings[cp.name] = cp.time.Sub(prev)
prev = cp.time
}
return timings
}

31
util/stats/stats_test.go Normal file
View File

@@ -0,0 +1,31 @@
package stats
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestTimingStats(t *testing.T) {
start := time.Now()
now = func() time.Time {
return start
}
defer func() {
now = time.Now
}()
ts := NewTimingStats()
now = func() time.Time {
return start.Add(100 * time.Millisecond)
}
ts.AddCheckpoint("checkpoint-1")
now = func() time.Time {
return start.Add(300 * time.Millisecond)
}
ts.AddCheckpoint("checkpoint-2")
timings := ts.Timings()
assert.Len(t, timings, 2)
assert.Equal(t, 100*time.Millisecond, timings["checkpoint-1"])
assert.Equal(t, 200*time.Millisecond, timings["checkpoint-2"])
}