chore: Add appkey and error fields in appcontroller (#24668) (#24669)

Signed-off-by: Andrii Korotkov <myolymp@gmail.com>
This commit is contained in:
Andrii Korotkov
2025-09-27 14:47:21 -07:00
committed by GitHub
parent ac46a18b55
commit af7ae18189
2 changed files with 56 additions and 50 deletions

View File

@@ -466,7 +466,7 @@ func (ctrl *ApplicationController) handleObjectUpdated(managedByApp map[string]b
// Enforce application's permission for the source namespace
_, err = ctrl.getAppProj(app)
if err != nil {
logCtx.Errorf("Unable to determine project for app '%s': %v", app.QualifiedName(), err)
logCtx.WithError(err).Errorf("Unable to determine project for app")
continue
}
@@ -902,12 +902,12 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int
clusters, err := ctrl.db.ListClusters(ctx)
if err != nil {
log.Warnf("Cannot init sharding. Error while querying clusters list from database: %v", err)
log.WithError(err).Warn("Cannot init sharding. Error while querying clusters list from database")
} else {
appItems, err := ctrl.getAppList(metav1.ListOptions{})
if err != nil {
log.Warnf("Cannot init sharding. Error while querying application list from database: %v", err)
log.WithError(err).Warn("Cannot init sharding. Error while querying application list from database")
} else {
ctrl.clusterSharding.Init(clusters, appItems)
}
@@ -1005,14 +1005,14 @@ func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext b
processNext = true
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
log.WithField("appkey", appKey).Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
ctrl.appOperationQueue.Done(appKey)
}()
obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey)
if err != nil {
log.Errorf("Failed to get application '%s' from informer index: %+v", appKey, err)
log.WithField("appkey", appKey).WithError(err).Error("Failed to get application from informer index")
return processNext
}
if !exists {
@@ -1021,7 +1021,7 @@ func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext b
}
origApp, ok := obj.(*appv1.Application)
if !ok {
log.Warnf("Key '%s' in index is not an application", appKey)
log.WithField("appkey", appKey).Warn("Key in index is not an application")
return processNext
}
app := origApp.DeepCopy()
@@ -1041,7 +1041,7 @@ func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext b
// We cannot rely on informer since applications might be updated by both application controller and api server.
freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace).Get(context.Background(), app.Name, metav1.GetOptions{})
if err != nil {
logCtx.Errorf("Failed to retrieve latest application state: %v", err)
logCtx.WithError(err).Error("Failed to retrieve latest application state")
return processNext
}
app = freshApp
@@ -1073,7 +1073,7 @@ func (ctrl *ApplicationController) processAppComparisonTypeQueueItem() (processN
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
log.WithField("appkey", key).Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
ctrl.appComparisonTypeRefreshQueue.Done(key)
}()
@@ -1083,11 +1083,11 @@ func (ctrl *ApplicationController) processAppComparisonTypeQueueItem() (processN
}
if parts := strings.Split(key, "/"); len(parts) != 3 {
log.Warnf("Unexpected key format in appComparisonTypeRefreshTypeQueue. Key should consists of namespace/name/comparisonType but got: %s", key)
log.WithField("appkey", key).Warn("Unexpected key format in appComparisonTypeRefreshTypeQueue. Key should consist of namespace/name/comparisonType")
} else {
compareWith, err := strconv.Atoi(parts[2])
if err != nil {
log.Warnf("Unable to parse comparison type: %v", err)
log.WithField("appkey", key).WithError(err).Warn("Unable to parse comparison type")
return processNext
}
ctrl.requestAppRefresh(ctrl.toAppQualifiedName(parts[1], parts[0]), CompareWith(compareWith).Pointer(), nil)
@@ -1101,7 +1101,7 @@ func (ctrl *ApplicationController) processProjectQueueItem() (processNext bool)
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
log.WithField("key", key).Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
ctrl.projectRefreshQueue.Done(key)
}()
@@ -1111,7 +1111,7 @@ func (ctrl *ApplicationController) processProjectQueueItem() (processNext bool)
}
obj, exists, err := ctrl.projInformer.GetIndexer().GetByKey(key)
if err != nil {
log.Errorf("Failed to get project '%s' from informer index: %+v", key, err)
log.WithField("key", key).WithError(err).Error("Failed to get project from informer index")
return processNext
}
if !exists {
@@ -1120,13 +1120,13 @@ func (ctrl *ApplicationController) processProjectQueueItem() (processNext bool)
}
origProj, ok := obj.(*appv1.AppProject)
if !ok {
log.Warnf("Key '%s' in index is not an appproject", key)
log.WithField("key", key).Warnf("Key in index is not an appproject")
return processNext
}
if origProj.DeletionTimestamp != nil && origProj.HasFinalizer() {
if err := ctrl.finalizeProjectDeletion(origProj.DeepCopy()); err != nil {
log.Warnf("Failed to finalize project deletion: %v", err)
log.WithError(err).Warn("Failed to finalize project deletion")
}
}
return processNext
@@ -1194,7 +1194,7 @@ func (ctrl *ApplicationController) finalizeApplicationDeletion(app *appv1.Applic
app, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Get(context.Background(), app.Name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
logCtx.Errorf("Unable to get refreshed application info prior deleting resources: %v", err)
logCtx.WithError(err).Error("Unable to get refreshed application info prior deleting resources")
}
return nil
}
@@ -1204,7 +1204,7 @@ func (ctrl *ApplicationController) finalizeApplicationDeletion(app *appv1.Applic
}
destCluster, err := argo.GetDestinationCluster(context.Background(), app.Spec.Destination, ctrl.db)
if err != nil {
logCtx.Warnf("Unable to get destination cluster: %v", err)
logCtx.WithError(err).Warn("Unable to get destination cluster")
app.UnSetCascadedDeletion()
app.UnSetPostDeleteFinalizerAll()
if err := ctrl.updateFinalizers(app); err != nil {
@@ -1367,7 +1367,7 @@ func (ctrl *ApplicationController) setAppCondition(app *appv1.Application, condi
_, err = ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Patch(context.Background(), app.Name, types.MergePatchType, patch, metav1.PatchOptions{})
}
if err != nil {
logCtx.Errorf("Unable to set application condition: %v", err)
logCtx.WithError(err).Error("Unable to set application condition")
}
}
@@ -1511,7 +1511,7 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
// force app refresh with using CompareWithLatest comparison type and trigger app reconciliation loop
ctrl.requestAppRefresh(app.QualifiedName(), CompareWithLatestForceResolve.Pointer(), nil)
} else {
logCtx.Warnf("Fails to requeue application: %v", err)
logCtx.WithError(err).Warn("Fails to requeue application")
}
}
ts.AddCheckpoint("request_app_refresh_ms")
@@ -1543,13 +1543,13 @@ func (ctrl *ApplicationController) setOperationState(app *appv1.Application, sta
}
patchJSON, err := json.Marshal(patch)
if err != nil {
logCtx.Errorf("error marshaling json: %v", err)
logCtx.WithError(err).Error("error marshaling json")
return
}
if app.Status.OperationState != nil && app.Status.OperationState.FinishedAt != nil && state.FinishedAt == nil {
patchJSON, err = jsonpatch.MergeMergePatches(patchJSON, []byte(`{"status": {"operationState": {"finishedAt": null}}}`))
if err != nil {
logCtx.Errorf("error merging operation state patch: %v", err)
logCtx.WithError(err).Error("error merging operation state patch")
return
}
}
@@ -1563,7 +1563,7 @@ func (ctrl *ApplicationController) setOperationState(app *appv1.Application, sta
}
// kube.RetryUntilSucceed logs failed attempts at "debug" level, but we want to know if this fails. Log a
// warning.
logCtx.Warnf("error patching application with operation state: %v", err)
logCtx.WithError(err).Warn("error patching application with operation state")
return fmt.Errorf("error patching application with operation state: %w", err)
}
return nil
@@ -1592,7 +1592,7 @@ func (ctrl *ApplicationController) setOperationState(app *appv1.Application, sta
destCluster, err := argo.GetDestinationCluster(context.Background(), app.Spec.Destination, ctrl.db)
if err != nil {
logCtx.Warnf("Unable to get destination cluster, setting dest_server label to empty string in sync metric: %v", err)
logCtx.WithError(err).Warn("Unable to get destination cluster, setting dest_server label to empty string in sync metric")
}
destServer := ""
if destCluster != nil {
@@ -1609,7 +1609,7 @@ func (ctrl *ApplicationController) writeBackToInformer(app *appv1.Application) {
logCtx := log.WithFields(applog.GetAppLogFields(app)).WithField("informer-writeBack", true)
err := ctrl.appInformer.GetStore().Update(app)
if err != nil {
logCtx.Errorf("failed to update informer store: %v", err)
logCtx.WithError(err).Error("failed to update informer store")
return
}
}
@@ -1635,7 +1635,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
processNext = true
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
log.WithField("appkey", appKey).Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
// We want to have app operation update happen after the sync, so there's no race condition
// and app updates not proceeding. See https://github.com/argoproj/argo-cd/issues/18500.
@@ -1644,7 +1644,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
}()
obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey)
if err != nil {
log.Errorf("Failed to get application '%s' from informer index: %+v", appKey, err)
log.WithField("appkey", appKey).WithError(err).Error("Failed to get application from informer index")
return processNext
}
if !exists {
@@ -1653,7 +1653,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
}
origApp, ok := obj.(*appv1.Application)
if !ok {
log.Warnf("Key '%s' in index is not an application", appKey)
log.WithField("appkey", appKey).Warn("Key in index is not an application")
return processNext
}
origApp = origApp.DeepCopy()
@@ -1702,7 +1702,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
if tree, err = ctrl.getResourceTree(destCluster, app, managedResources); err == nil {
app.Status.Summary = tree.GetSummary(app)
if err := ctrl.cache.SetAppResourcesTree(app.InstanceName(ctrl.namespace), tree); err != nil {
logCtx.Errorf("Failed to cache resources tree: %v", err)
logCtx.WithError(err).Error("Failed to cache resources tree")
return processNext
}
}
@@ -1724,10 +1724,10 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
patchDuration = ctrl.persistAppStatus(origApp, &app.Status)
if err := ctrl.cache.SetAppResourcesTree(app.InstanceName(ctrl.namespace), &appv1.ApplicationTree{}); err != nil {
logCtx.Warnf("failed to set app resource tree: %v", err)
logCtx.WithError(err).Warn("failed to set app resource tree")
}
if err := ctrl.cache.SetAppManagedResources(app.InstanceName(ctrl.namespace), nil); err != nil {
logCtx.Warnf("failed to set app managed resources tree: %v", err)
logCtx.WithError(err).Warn("failed to set app managed resources tree")
}
ts.AddCheckpoint("process_refresh_app_conditions_errors_ms")
return processNext
@@ -1735,7 +1735,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
destCluster, err = argo.GetDestinationCluster(context.Background(), app.Spec.Destination, ctrl.db)
if err != nil {
logCtx.Errorf("Failed to get destination cluster: %v", err)
logCtx.WithError(err).Error("Failed to get destination cluster")
// exit the reconciliation. ctrl.refreshAppConditions should have caught the error
return processNext
}
@@ -1777,7 +1777,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
ts.AddCheckpoint("compare_app_state_ms")
if stderrors.Is(err, ErrCompareStateRepo) {
logCtx.Warnf("Ignoring temporary failed attempt to compare app state against repo: %v", err)
logCtx.WithError(err).Warn("Ignoring temporary failed attempt to compare app state against repo")
return processNext // short circuit if git error is encountered
}
@@ -1791,7 +1791,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
tree, err := ctrl.setAppManagedResources(destCluster, app, compareResult)
ts.AddCheckpoint("set_app_managed_resources_ms")
if err != nil {
logCtx.Errorf("Failed to cache app resources: %v", err)
logCtx.WithError(err).Error("Failed to cache app resources")
} else {
app.Status.Summary = tree.GetSummary(app)
}
@@ -1843,7 +1843,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
}
if err := ctrl.updateFinalizers(app); err != nil {
logCtx.Errorf("Failed to update finalizers: %v", err)
logCtx.WithError(err).Error("Failed to update finalizers")
}
}
ts.AddCheckpoint("process_finalizers_ms")
@@ -1859,13 +1859,13 @@ func (ctrl *ApplicationController) processAppHydrateQueueItem() (processNext boo
processNext = true
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
log.WithField("appkey", appKey).Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
ctrl.appHydrateQueue.Done(appKey)
}()
obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey)
if err != nil {
log.Errorf("Failed to get application '%s' from informer index: %+v", appKey, err)
log.WithField("appkey", appKey).WithError(err).Error("Failed to get application from informer index")
return processNext
}
if !exists {
@@ -1874,7 +1874,7 @@ func (ctrl *ApplicationController) processAppHydrateQueueItem() (processNext boo
}
origApp, ok := obj.(*appv1.Application)
if !ok {
log.Warnf("Key '%s' in index is not an application", appKey)
log.WithField("appkey", appKey).Warn("Key in index is not an application")
return processNext
}
@@ -1891,12 +1891,6 @@ func (ctrl *ApplicationController) processHydrationQueueItem() (processNext bool
return processNext
}
processNext = true
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
ctrl.hydrationQueue.Done(hydrationKey)
}()
logCtx := log.WithFields(log.Fields{
"sourceRepoURL": hydrationKey.SourceRepoURL,
@@ -1904,6 +1898,13 @@ func (ctrl *ApplicationController) processHydrationQueueItem() (processNext bool
"destinationBranch": hydrationKey.DestinationBranch,
})
defer func() {
if r := recover(); r != nil {
logCtx.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
ctrl.hydrationQueue.Done(hydrationKey)
}()
logCtx.Debug("Processing hydration queue item")
ctrl.hydrator.ProcessHydrationQueueItem(hydrationKey)
@@ -2012,11 +2013,11 @@ func (ctrl *ApplicationController) normalizeApplication(orig, app *appv1.Applica
patch, modified, err := diff.CreateTwoWayMergePatch(orig, app, appv1.Application{})
if err != nil {
logCtx.Errorf("error constructing app spec patch: %v", err)
logCtx.WithError(err).Error("error constructing app spec patch")
} else if modified {
_, err := ctrl.PatchAppWithWriteBack(context.Background(), app.Name, app.Namespace, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
logCtx.Errorf("Error persisting normalized application spec: %v", err)
logCtx.WithError(err).Error("Error persisting normalized application spec")
} else {
logCtx.Infof("Normalized app spec: %s", string(patch))
}
@@ -2071,7 +2072,7 @@ func (ctrl *ApplicationController) persistAppStatus(orig *appv1.Application, new
&appv1.Application{ObjectMeta: metav1.ObjectMeta{Annotations: orig.GetAnnotations()}, Status: orig.Status},
&appv1.Application{ObjectMeta: metav1.ObjectMeta{Annotations: newAnnotations}, Status: *newStatus})
if err != nil {
logCtx.Errorf("Error constructing app status patch: %v", err)
logCtx.WithError(err).Error("Error constructing app status patch")
return patchDuration
}
if !modified {
@@ -2085,7 +2086,7 @@ func (ctrl *ApplicationController) persistAppStatus(orig *appv1.Application, new
}()
_, err = ctrl.PatchAppWithWriteBack(context.Background(), orig.Name, orig.Namespace, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
logCtx.Warnf("Error updating application: %v", err)
logCtx.WithError(err).Warn("Error updating application")
} else {
logCtx.Infof("Update successful")
}
@@ -2230,11 +2231,11 @@ func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *
if stderrors.Is(err, argo.ErrAnotherOperationInProgress) {
// skipping auto-sync because another operation is in progress and was not noticed due to stale data in informer
// it is safe to skip auto-sync because it is already running
logCtx.Warnf("Failed to initiate auto-sync to %s: %v", desiredRevisions, err)
logCtx.WithError(err).Warnf("Failed to initiate auto-sync to %s", desiredRevisions)
return nil, 0
}
logCtx.Errorf("Failed to initiate auto-sync to %s: %v", desiredRevisions, err)
logCtx.WithError(err).Errorf("Failed to initiate auto-sync to %s", desiredRevisions)
return &appv1.ApplicationCondition{Type: appv1.ApplicationConditionSyncError, Message: err.Error()}, setOpTime
}
ctrl.writeBackToInformer(updatedApp)
@@ -2360,7 +2361,7 @@ func (ctrl *ApplicationController) canProcessApp(obj any) bool {
return false
}
} else {
logCtx.Debugf("Unable to determine if Application should skip reconcile based on annotation %s: %v", common.AnnotationKeyAppSkipReconcile, err)
logCtx.WithError(err).Debugf("Unable to determine if Application should skip reconcile based on annotation %s", common.AnnotationKeyAppSkipReconcile)
}
}
}

View File

@@ -1438,7 +1438,12 @@ func TestSetOperationStateLogRetries(t *testing.T) {
})
ctrl.setOperationState(newFakeApp(), &v1alpha1.OperationState{Phase: synccommon.OperationSucceeded})
assert.True(t, patched)
assert.Contains(t, hook.Entries[0].Message, "fake error")
require.GreaterOrEqual(t, len(hook.Entries), 1)
entry := hook.Entries[0]
require.Contains(t, entry.Data, "error")
errorVal, ok := entry.Data["error"].(error)
require.True(t, ok, "error field should be of type error")
assert.Contains(t, errorVal.Error(), "fake error")
}
func TestNeedRefreshAppStatus(t *testing.T) {