fix(hooks): always remove finalizers (#23226) (#25916)

Signed-off-by: Alexandre Gaudreault <alexandre_gaudreault@intuit.com>
Co-authored-by: Leonardo Luz Almeida <leoluz@users.noreply.github.com>
This commit is contained in:
Alexandre Gaudreault
2026-01-19 11:42:03 -05:00
committed by GitHub
parent a66fe2af24
commit 228378474a
8 changed files with 1119 additions and 347 deletions

View File

@@ -32,8 +32,9 @@ func getResourceResult(resources []synccommon.ResourceSyncResult, resourceKey ku
return nil
}
func newHook(hookType synccommon.HookType, deletePolicy synccommon.HookDeletePolicy) *unstructured.Unstructured {
func newHook(name string, hookType synccommon.HookType, deletePolicy synccommon.HookDeletePolicy) *unstructured.Unstructured {
obj := testingutils.NewPod()
obj.SetName(name)
obj.SetNamespace(testingutils.FakeArgoCDNamespace)
testingutils.Annotate(obj, synccommon.AnnotationKeyHook, string(hookType))
testingutils.Annotate(obj, synccommon.AnnotationKeyHookDeletePolicy, string(deletePolicy))

View File

@@ -430,7 +430,7 @@ func (sc *syncContext) setRunningPhase(tasks syncTasks, isPendingDeletion bool)
sc.setOperationPhase(common.OperationRunning, message)
}
// sync has performs the actual apply or hook based sync
// Sync executes next synchronization step and updates operation status.
func (sc *syncContext) Sync() {
sc.log.WithValues("skipHooks", sc.skipHooks, "started", sc.started()).Info("Syncing")
tasks, ok := sc.getSyncTasks()
@@ -552,9 +552,10 @@ func (sc *syncContext) Sync() {
// syncFailTasks only run during failure, so separate them from regular tasks
syncFailTasks, tasks := tasks.Split(func(t *syncTask) bool { return t.phase == common.SyncPhaseSyncFail })
syncFailedTasks, _ := tasks.Split(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
syncFailedTasks := tasks.Filter(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
// if there are any completed but unsuccessful tasks, sync is a failure.
// we already know tasks do not contain running tasks
if tasks.Any(func(t *syncTask) bool { return t.completed() && !t.successful() }) {
sc.deleteHooks(hooksPendingDeletionFailed)
sc.executeSyncFailPhase(syncFailTasks, syncFailedTasks, "one or more synchronization tasks completed unsuccessfully")
@@ -578,18 +579,12 @@ func (sc *syncContext) Sync() {
return
}
// remove any tasks not in this wave
phase := tasks.phase()
wave := tasks.wave()
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()
// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
// EVEN if those objects subsequently degraded
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
tasks, remainingTasks := tasks.Split(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
@@ -599,8 +594,10 @@ func (sc *syncContext) Sync() {
if sc.syncWaveHook != nil && runState != failed {
err := sc.syncWaveHook(phase, wave, finalWave)
if err != nil {
sc.deleteHooks(hooksPendingDeletionFailed)
sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err))
// Since this is an unexpected error and is not related to a specific task, terminate the sync with error
// without triggering the syncFailTasks
sc.terminateHooksPreemptively(tasks.Filter(func(task *syncTask) bool { return task.isHook() }))
sc.setOperationPhase(common.OperationError, fmt.Sprintf("SyncWaveHook failed: %v", err))
sc.log.Error(err, "SyncWaveHook failed")
return
}
@@ -608,24 +605,81 @@ func (sc *syncContext) Sync() {
switch runState {
case failed:
syncFailedTasks, _ := tasks.Split(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
sc.deleteHooks(hooksPendingDeletionFailed)
sc.executeSyncFailPhase(syncFailTasks, syncFailedTasks, "one or more objects failed to apply")
// If we failed to apply at least one resource, we need to start the syncFailTasks and wait
// for the completion of any running hooks. In this case, the operation should be running.
syncFailedTasks := tasks.Filter(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
runningHooks := tasks.Filter(func(t *syncTask) bool { return t.running() })
if len(runningHooks) > 0 {
if len(syncFailTasks) > 0 {
completed := sc.executeSyncFailPhase(syncFailTasks, syncFailedTasks, "one or more objects failed to apply")
if !completed {
runningHooks = append(runningHooks, syncFailTasks...)
}
}
sc.setRunningPhase(runningHooks, false)
} else {
completed := sc.executeSyncFailPhase(syncFailTasks, syncFailedTasks, "one or more objects failed to apply")
if completed {
sc.deleteHooks(hooksPendingDeletionFailed)
}
}
case successful:
if remainingTasks.Len() == 0 {
if remainingTasks.Len() == 0 && !tasks.Any(func(task *syncTask) bool { return task.isHook() }) {
// if it is the last phase/wave and the only running tasks are non-hooks, then we are successful
// EVEN if those objects subsequently degrades
// This handles the common case where neither hooks or waves are used and a sync equates to simply
// an (asynchronous) kubectl apply of manifests, which succeeds immediately.
// delete all completed hooks which have appropriate delete policy
sc.deleteHooks(hooksPendingDeletionSuccessful)
sc.setOperationPhase(common.OperationSucceeded, "successfully synced (all tasks run)")
} else {
sc.setRunningPhase(remainingTasks, false)
sc.setRunningPhase(tasks, false)
}
default:
sc.setRunningPhase(tasks.Filter(func(task *syncTask) bool {
return task.deleteOnPhaseCompletion()
return task.deleteBeforeCreation() || (task.isPrune() && task.pending())
}), true)
}
}
// Terminate terminates sync operation. The method is asynchronous: it starts deletion is related K8S resources
// such as in-flight resource hooks, updates operation status, and exists without waiting for resource completion.
func (sc *syncContext) Terminate() {
sc.log.V(1).Info("terminating")
tasks, _ := sc.getSyncTasks()
// Remove completed hook finalizers
hooksCompleted := tasks.Filter(func(task *syncTask) bool {
return task.isHook() && task.completed()
})
for _, task := range hooksCompleted {
if err := sc.removeHookFinalizer(task); err != nil {
sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to remove hook finalizer: %v", err))
}
}
// Terminate running hooks
terminateSuccessful := sc.terminateHooksPreemptively(tasks.Filter(func(task *syncTask) bool { return task.isHook() }))
if terminateSuccessful {
sc.setOperationPhase(common.OperationFailed, "Operation terminated")
} else {
sc.setOperationPhase(common.OperationError, "Operation termination had errors")
}
}
// GetState returns current sync operation state and information about resources synchronized so far.
func (sc *syncContext) GetState() (common.OperationPhase, string, []common.ResourceSyncResult) {
var resourceRes []common.ResourceSyncResult
for _, v := range sc.syncRes {
resourceRes = append(resourceRes, v)
}
sort.Slice(resourceRes, func(i, j int) bool {
return resourceRes[i].Order < resourceRes[j].Order
})
return sc.phase, sc.message, resourceRes
}
// filter out out-of-sync tasks
func (sc *syncContext) filterOutOfSyncTasks(tasks syncTasks) syncTasks {
return tasks.Filter(func(t *syncTask) bool {
@@ -653,6 +707,68 @@ func (sc *syncContext) getNamespaceCreationTask(tasks syncTasks) *syncTask {
return nil
}
// terminateHooksPreemptively terminates ongoing hook tasks, Usually, when a hook is running,
// the gitops engine controller waits for its completion. However, when terminating a sync operation,
// or when we encounter an unexpected error, we need to preemptively terminate any running hooks
// by removing their finalizers and deleting them.
func (sc *syncContext) terminateHooksPreemptively(tasks syncTasks) bool {
terminateSuccessful := true
for _, task := range tasks {
if !task.isHook() || !task.running() {
continue
}
if task.liveObj == nil {
// if we terminate preemtively after the task was run, it will not contain the live object yet
liveObj, err := sc.getResource(task)
if err != nil && !apierrors.IsNotFound(err) {
sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to get live resource: %v", err))
terminateSuccessful = false
continue
}
task.liveObj = liveObj
}
if task.liveObj == nil {
sc.setResourceResult(task, task.syncStatus, common.OperationSucceeded, "Terminated")
continue
}
// get the latest status of the running hook. Perhaps it already completed
phase, msg, statusErr := sc.getOperationPhase(task.liveObj)
if statusErr != nil {
sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to get hook health: %v", statusErr))
}
// Now that we have the latest status, we can remove the finalizer.
if err := sc.removeHookFinalizer(task); err != nil {
sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to remove hook finalizer: %v", err))
terminateSuccessful = false
continue
}
// delete the hook if it is running, if we dont know that it is running,
// or if it has just completed and is meant to be deleted on sync failed
if statusErr != nil || phase.Running() || task.deleteOnPhaseFailed() {
err := sc.deleteResource(task)
if err != nil && !apierrors.IsNotFound(err) {
sc.setResourceResult(task, task.syncStatus, common.OperationFailed, fmt.Sprintf("Failed to delete: %v", err))
terminateSuccessful = false
continue
}
}
if phase.Completed() {
// If the hook has completed, we can update it to it's real status
sc.setResourceResult(task, task.syncStatus, phase, msg)
} else if task.operationState != common.OperationError {
// update the status if the resource is not in error to preserve the error message
sc.setResourceResult(task, task.syncStatus, common.OperationFailed, "Terminated")
}
}
return terminateSuccessful
}
func (sc *syncContext) removeHookFinalizer(task *syncTask) error {
if task.liveObj == nil {
return nil
@@ -701,32 +817,6 @@ func (sc *syncContext) removeHookFinalizer(task *syncTask) error {
})
}
func (sc *syncContext) getResource(task *syncTask) (*unstructured.Unstructured, error) {
sc.log.WithValues("task", task).V(1).Info("Getting resource")
resIf, err := sc.getResourceIf(task, "get")
if err != nil {
return nil, err
}
liveObj, err := resIf.Get(context.TODO(), task.name(), metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get resource: %w", err)
}
return liveObj, nil
}
func (sc *syncContext) updateResource(task *syncTask) error {
sc.log.WithValues("task", task).V(1).Info("Updating resource")
resIf, err := sc.getResourceIf(task, "update")
if err != nil {
return err
}
_, err = resIf.Update(context.TODO(), task.liveObj, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update resource: %w", err)
}
return nil
}
func (sc *syncContext) deleteHooks(hooksPendingDeletion syncTasks) {
for _, task := range hooksPendingDeletion {
err := sc.deleteResource(task)
@@ -736,18 +826,7 @@ func (sc *syncContext) deleteHooks(hooksPendingDeletion syncTasks) {
}
}
func (sc *syncContext) GetState() (common.OperationPhase, string, []common.ResourceSyncResult) {
var resourceRes []common.ResourceSyncResult
for _, v := range sc.syncRes {
resourceRes = append(resourceRes, v)
}
sort.Slice(resourceRes, func(i, j int) bool {
return resourceRes[i].Order < resourceRes[j].Order
})
return sc.phase, sc.message, resourceRes
}
func (sc *syncContext) executeSyncFailPhase(syncFailTasks, syncFailedTasks syncTasks, message string) {
func (sc *syncContext) executeSyncFailPhase(syncFailTasks, syncFailedTasks syncTasks, message string) (completed bool) {
errorMessageFactory := func(tasks syncTasks, message string) string {
messages := tasks.Map(func(task *syncTask) string {
return task.message
@@ -760,23 +839,30 @@ func (sc *syncContext) executeSyncFailPhase(syncFailTasks, syncFailedTasks syncT
errorMessage := errorMessageFactory(syncFailedTasks, message)
if len(syncFailTasks) > 0 {
// if all the failure hooks are completed, don't run them again, and mark the sync as failed
if syncFailTasks.All(func(task *syncTask) bool { return task.completed() }) {
sc.setOperationPhase(common.OperationFailed, errorMessage)
return
}
// otherwise, we need to start the failure hooks, and then return without setting
// the phase, so we make sure we have at least one more sync
sc.log.WithValues("syncFailTasks", syncFailTasks).V(1).Info("Running sync fail tasks")
if sc.runTasks(syncFailTasks, false) == failed {
failedSyncFailTasks := syncFailTasks.Filter(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
syncFailTasksMessage := errorMessageFactory(failedSyncFailTasks, "one or more SyncFail hooks failed")
sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("%s\n%s", errorMessage, syncFailTasksMessage))
}
} else {
// if there is no failure hook, there is nothing more to do and we can fail
if len(syncFailTasks) == 0 {
sc.setOperationPhase(common.OperationFailed, errorMessage)
return true
}
// if all the failure hooks are completed, mark the sync as failed
if syncFailTasks.All(func(task *syncTask) bool { return task.completed() }) {
failedSyncFailTasks := syncFailTasks.Filter(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
if len(failedSyncFailTasks) > 0 {
syncFailTasksMessage := errorMessageFactory(failedSyncFailTasks, "one or more SyncFail hooks failed")
errorMessage = fmt.Sprintf("%s\n%s", errorMessage, syncFailTasksMessage)
}
sc.setOperationPhase(common.OperationFailed, errorMessage)
return true
}
// otherwise, we need to start the pending failure hooks, and then return WITHOUT setting
// the phase to failed, since we want the failure hooks to complete their running state before failing
pendingSyncFailTasks := syncFailTasks.Filter(func(task *syncTask) bool { return !task.completed() && !task.running() })
sc.log.WithValues("syncFailTasks", pendingSyncFailTasks).V(1).Info("Running sync fail tasks")
sc.runTasks(pendingSyncFailTasks, false)
sc.setRunningPhase(pendingSyncFailTasks, false)
return false
}
func (sc *syncContext) started() bool {
@@ -1301,42 +1387,30 @@ func (sc *syncContext) hasCRDOfGroupKind(group string, kind string) bool {
return false
}
// terminate looks for any running jobs/workflow hooks and deletes the resource
func (sc *syncContext) Terminate() {
terminateSuccessful := true
sc.log.V(1).Info("terminating")
tasks, _ := sc.getSyncTasks()
for _, task := range tasks {
if !task.isHook() || task.liveObj == nil {
continue
}
if err := sc.removeHookFinalizer(task); err != nil {
sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to remove hook finalizer: %v", err))
terminateSuccessful = false
continue
}
phase, msg, err := sc.getOperationPhase(task.liveObj)
if err != nil {
sc.setOperationPhase(common.OperationError, fmt.Sprintf("Failed to get hook health: %v", err))
return
}
if phase == common.OperationRunning {
err := sc.deleteResource(task)
if err != nil && !apierrors.IsNotFound(err) {
sc.setResourceResult(task, task.syncStatus, common.OperationFailed, fmt.Sprintf("Failed to delete: %v", err))
terminateSuccessful = false
} else {
sc.setResourceResult(task, task.syncStatus, common.OperationSucceeded, "Deleted")
}
} else {
sc.setResourceResult(task, task.syncStatus, phase, msg)
}
func (sc *syncContext) getResource(task *syncTask) (*unstructured.Unstructured, error) {
sc.log.WithValues("task", task).V(1).Info("Getting resource")
resIf, err := sc.getResourceIf(task, "get")
if err != nil {
return nil, err
}
if terminateSuccessful {
sc.setOperationPhase(common.OperationFailed, "Operation terminated")
} else {
sc.setOperationPhase(common.OperationError, "Operation termination had errors")
liveObj, err := resIf.Get(context.TODO(), task.name(), metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get resource: %w", err)
}
return liveObj, nil
}
func (sc *syncContext) updateResource(task *syncTask) error {
sc.log.WithValues("task", task).V(1).Info("Updating resource")
resIf, err := sc.getResourceIf(task, "update")
if err != nil {
return err
}
_, err = resIf.Update(context.TODO(), task.liveObj, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update resource: %w", err)
}
return nil
}
func (sc *syncContext) deleteResource(task *syncTask) error {
@@ -1384,16 +1458,7 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
sc.log.WithValues("numTasks", len(tasks), "dryRun", dryRun).V(1).Info("Running tasks")
state := successful
var createTasks syncTasks
var pruneTasks syncTasks
for _, task := range tasks {
if task.isPrune() {
pruneTasks = append(pruneTasks, task)
} else {
createTasks = append(createTasks, task)
}
}
pruneTasks, createTasks := tasks.Split(func(task *syncTask) bool { return task.isPrune() })
// remove finalizers from previous sync on existing hooks to make sure the operation is idempotent
{
@@ -1458,7 +1523,6 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
}
state = ss.Wait()
}
if state != successful {
return state
}
@@ -1476,7 +1540,7 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
err := sc.deleteResource(t)
if err != nil {
// it is possible to get a race condition here, such that the resource does not exist when
// delete is requested, we treat this as a nopand remove the liveObj
// delete is requested, we treat this as a no-op and remove the liveObj
if !apierrors.IsNotFound(err) {
state = failed
sc.setResourceResult(t, t.syncStatus, common.OperationError, fmt.Sprintf("failed to delete resource: %v", err))
@@ -1495,7 +1559,6 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
}
state = ss.Wait()
}
if state != successful {
return state
}

File diff suppressed because it is too large Load Diff

View File

@@ -21,9 +21,9 @@ func Test_syncTask_hookType(t *testing.T) {
want common.HookType
}{
{"Empty", fields{common.SyncPhaseSync, testingutils.NewPod()}, ""},
{"PreSyncHook", fields{common.SyncPhasePreSync, newHook(common.HookTypePreSync, common.HookDeletePolicyBeforeHookCreation)}, common.HookTypePreSync},
{"SyncHook", fields{common.SyncPhaseSync, newHook(common.HookTypeSync, common.HookDeletePolicyBeforeHookCreation)}, common.HookTypeSync},
{"PostSyncHook", fields{common.SyncPhasePostSync, newHook(common.HookTypePostSync, common.HookDeletePolicyBeforeHookCreation)}, common.HookTypePostSync},
{"PreSyncHook", fields{common.SyncPhasePreSync, newHook("hook", common.HookTypePreSync, common.HookDeletePolicyBeforeHookCreation)}, common.HookTypePreSync},
{"SyncHook", fields{common.SyncPhaseSync, newHook("hook", common.HookTypeSync, common.HookDeletePolicyBeforeHookCreation)}, common.HookTypeSync},
{"PostSyncHook", fields{common.SyncPhasePostSync, newHook("hook", common.HookTypePostSync, common.HookDeletePolicyBeforeHookCreation)}, common.HookTypePostSync},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@@ -1,13 +1,17 @@
package app
import (
"fmt"
"strings"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/stretchr/testify/require"
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v3/test"
"github.com/argoproj/argo-cd/v3/test/e2e/fixture"
"github.com/argoproj/argo-cd/v3/test/e2e/fixture/certs"
"github.com/argoproj/argo-cd/v3/test/e2e/fixture/gpgkeys"
@@ -481,3 +485,16 @@ func (c *Context) RegisterKustomizeVersion(version, path string) *Context {
require.NoError(c.T(), fixture.RegisterKustomizeVersion(version, path))
return c
}
func (c *Context) Resource(content string) *Context {
c.T().Helper()
u := test.YamlToUnstructured(content)
mapping, err := fixture.Mapper.RESTMapping(u.GroupVersionKind().GroupKind(), u.GroupVersionKind().Version)
require.NoError(c.T(), err)
if mapping == nil {
require.NoError(c.T(), fmt.Errorf("cannot find mapping for %s", u.GroupVersionKind().String()))
}
_, err = fixture.DynamicClientset.Resource(mapping.Resource).Namespace(c.DeploymentNamespace()).Apply(c.T().Context(), u.GetName(), u, metav1.ApplyOptions{FieldManager: "e2e-given-step"})
require.NoError(c.T(), err)
return c
}

View File

@@ -28,6 +28,26 @@ const (
type Expectation func(c *Consequences) (state state, message string)
func Or(e1 Expectation, e2 Expectation) Expectation {
return func(c *Consequences) (state, string) {
s1, m1 := e1(c)
if s1 == succeeded {
return s1, m1
}
s2, m2 := e2(c)
if s2 == succeeded {
return s2, m2
}
if s1 == pending {
return s1, m1
}
if s2 == pending {
return s2, m2
}
return failed, fmt.Sprintf("expectations unsuccessful: %s and %s", m1, m2)
}
}
func OperationPhaseIs(expected common.OperationPhase) Expectation {
return func(c *Consequences) (state, string) {
operationState := c.app().Status.OperationState
@@ -199,6 +219,9 @@ func ResourceHealthWithNamespaceIs(kind, resource, namespace string, expected he
func ResourceResultNumbering(num int) Expectation {
return func(c *Consequences) (state, string) {
if c.app().Status.OperationState == nil || c.app().Status.OperationState.SyncResult == nil {
return pending, "no sync result yet"
}
actualNum := len(c.app().Status.OperationState.SyncResult.Resources)
if actualNum < num {
return pending, fmt.Sprintf("not enough results yet, want %d, got %d", num, actualNum)
@@ -211,6 +234,9 @@ func ResourceResultNumbering(num int) Expectation {
func ResourceResultIs(result v1alpha1.ResourceResult) Expectation {
return func(c *Consequences) (state, string) {
if c.app().Status.OperationState == nil || c.app().Status.OperationState.SyncResult == nil {
return pending, "no sync result yet"
}
results := c.app().Status.OperationState.SyncResult.Resources
for _, res := range results {
if reflect.DeepEqual(*res, result) {
@@ -233,6 +259,9 @@ func sameResourceResult(res1, res2 v1alpha1.ResourceResult) bool {
func ResourceResultMatches(result v1alpha1.ResourceResult) Expectation {
return func(c *Consequences) (state, string) {
if c.app().Status.OperationState == nil || c.app().Status.OperationState.SyncResult == nil {
return pending, "no sync result yet"
}
results := c.app().Status.OperationState.SyncResult.Resources
for _, res := range results {
if sameResourceResult(*res, result) {

View File

@@ -20,11 +20,14 @@ import (
jsonpatch "github.com/evanphx/json-patch"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/yaml"
@@ -91,6 +94,7 @@ var (
DynamicClientset dynamic.Interface
AppClientset appclientset.Interface
ArgoCDClientset apiclient.Client
Mapper meta.RESTMapper
adminUsername string
AdminPassword string
apiServerAddress string
@@ -195,6 +199,7 @@ func init() {
AppClientset = appclientset.NewForConfigOrDie(config)
KubeClientset = kubernetes.NewForConfigOrDie(config)
DynamicClientset = dynamic.NewForConfigOrDie(config)
Mapper = restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(KubeClientset.Discovery()))
KubeConfig = config
apiServerAddress = GetEnvWithDefault(apiclient.EnvArgoCDServer, defaultAPIServer)

View File

@@ -10,9 +10,11 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"github.com/argoproj/gitops-engine/pkg/health"
. "github.com/argoproj/gitops-engine/pkg/sync/common"
"github.com/argoproj/gitops-engine/pkg/sync/hook"
. "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
. "github.com/argoproj/argo-cd/v3/test/e2e/fixture"
@@ -50,7 +52,13 @@ func testHookSuccessful(t *testing.T, hookType HookType) {
Expect(ResourceSyncStatusIs("Pod", "pod", SyncStatusCodeSynced)).
Expect(ResourceHealthIs("Pod", "pod", health.HealthStatusHealthy)).
Expect(ResourceResultNumbering(2)).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "hook", Message: "pod/hook created", HookType: hookType, Status: ResultCodeSynced, HookPhase: OperationSucceeded, SyncPhase: SyncPhase(hookType)}))
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "hook", Status: ResultCodeSynced, Message: "pod/hook created", HookType: hookType, HookPhase: OperationSucceeded, SyncPhase: SyncPhase(hookType)})).
Expect(Pod(func(p corev1.Pod) bool {
// Completed hooks should not have a finalizer
_, isHook := p.GetAnnotations()[AnnotationKeyHook]
hasFinalizer := controllerutil.ContainsFinalizer(&p, hook.HookFinalizer)
return isHook && !hasFinalizer
}))
}
func TestPreDeleteHook(t *testing.T) {
@@ -133,8 +141,8 @@ func TestHookDiff(t *testing.T) {
// make sure that if pre-sync fails, we fail the app and we do not create the pod
func TestPreSyncHookFailure(t *testing.T) {
Given(t).
Path("hook").
ctx := Given(t)
ctx.Path("hook").
When().
PatchFile("hook.yaml", `[{"op": "replace", "path": "/metadata/annotations", "value": {"argocd.argoproj.io/hook": "PreSync"}}]`).
// make hook fail
@@ -143,14 +151,19 @@ func TestPreSyncHookFailure(t *testing.T) {
IgnoreErrors().
Sync().
Then().
Expect(Error("hook Failed Synced PreSync container \"main\" failed", "")).
// make sure resource are also printed
Expect(Error("pod OutOfSync Missing", "")).
Expect(OperationPhaseIs(OperationFailed)).
// if a pre-sync hook fails, we should not start the main sync
Expect(SyncStatusIs(SyncStatusCodeOutOfSync)).
Expect(OperationPhaseIs(OperationFailed)).
Expect(ResourceResultNumbering(1)).
Expect(ResourceSyncStatusIs("Pod", "pod", SyncStatusCodeOutOfSync))
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "hook", Status: ResultCodeSynced, Message: `container "main" failed with exit code 1`, HookType: HookTypePreSync, HookPhase: OperationFailed, SyncPhase: SyncPhase(HookTypePreSync)})).
Expect(ResourceHealthIs("Pod", "pod", health.HealthStatusMissing)).
Expect(ResourceSyncStatusIs("Pod", "pod", SyncStatusCodeOutOfSync)).
Expect(Pod(func(p corev1.Pod) bool {
// Completed hooks should not have a finalizer
_, isHook := p.GetAnnotations()[AnnotationKeyHook]
hasFinalizer := controllerutil.ContainsFinalizer(&p, hook.HookFinalizer)
return isHook && !hasFinalizer
}))
}
// make sure that if sync fails, we fail the app and we did create the pod
@@ -168,7 +181,13 @@ func TestSyncHookFailure(t *testing.T) {
// even thought the hook failed, we expect the pod to be in sync
Expect(SyncStatusIs(SyncStatusCodeSynced)).
Expect(ResourceResultNumbering(2)).
Expect(ResourceSyncStatusIs("Pod", "pod", SyncStatusCodeSynced))
Expect(ResourceSyncStatusIs("Pod", "pod", SyncStatusCodeSynced)).
Expect(Pod(func(p corev1.Pod) bool {
// Completed hooks should not have a finalizer
_, isHook := p.GetAnnotations()[AnnotationKeyHook]
hasFinalizer := controllerutil.ContainsFinalizer(&p, hook.HookFinalizer)
return isHook && !hasFinalizer
}))
}
// make sure that if the deployments fails, we still get success and synced
@@ -184,7 +203,7 @@ func TestSyncHookResourceFailure(t *testing.T) {
Expect(HealthIs(health.HealthStatusProgressing))
}
// make sure that if post-sync fails, we fail the app and we did not create the pod
// make sure that if post-sync fails, we fail the app and we did create the pod
func TestPostSyncHookFailure(t *testing.T) {
Given(t).
Path("hook").
@@ -199,7 +218,13 @@ func TestPostSyncHookFailure(t *testing.T) {
Expect(OperationPhaseIs(OperationFailed)).
Expect(SyncStatusIs(SyncStatusCodeSynced)).
Expect(ResourceResultNumbering(2)).
Expect(ResourceSyncStatusIs("Pod", "pod", SyncStatusCodeSynced))
Expect(ResourceSyncStatusIs("Pod", "pod", SyncStatusCodeSynced)).
Expect(Pod(func(p corev1.Pod) bool {
// Completed hooks should not have a finalizer
_, isHook := p.GetAnnotations()[AnnotationKeyHook]
hasFinalizer := controllerutil.ContainsFinalizer(&p, hook.HookFinalizer)
return isHook && !hasFinalizer
}))
}
// make sure that if the pod fails, we do not run the post-sync hook
@@ -298,9 +323,119 @@ spec:
CreateApp().
Sync().
Then().
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Name: "successful-sync-fail-hook", Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Message: "pod/successful-sync-fail-hook created", HookType: HookTypeSyncFail, Status: ResultCodeSynced, HookPhase: OperationSucceeded, SyncPhase: SyncPhaseSyncFail})).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Name: "failed-sync-fail-hook", Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Message: `container "main" failed with exit code 1`, HookType: HookTypeSyncFail, Status: ResultCodeSynced, HookPhase: OperationFailed, SyncPhase: SyncPhaseSyncFail})).
Expect(OperationPhaseIs(OperationFailed))
Expect(ResourceResultNumbering(4)).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Name: "successful-sync-fail-hook", Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Status: ResultCodeSynced, Message: "pod/successful-sync-fail-hook created", HookType: HookTypeSyncFail, HookPhase: OperationSucceeded, SyncPhase: SyncPhaseSyncFail})).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Name: "failed-sync-fail-hook", Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Status: ResultCodeSynced, Message: `container "main" failed with exit code 1`, HookType: HookTypeSyncFail, HookPhase: OperationFailed, SyncPhase: SyncPhaseSyncFail})).
Expect(OperationPhaseIs(OperationFailed)).
Expect(Pod(func(p corev1.Pod) bool {
// Completed hooks should not have a finalizer
_, isHook := p.GetAnnotations()[AnnotationKeyHook]
hasFinalizer := controllerutil.ContainsFinalizer(&p, hook.HookFinalizer)
return p.GetName() == "failed-sync-fail-hook" && isHook && !hasFinalizer
}))
}
// Make sure that if a hook is invalid (must pass the dry-run client), it fails without affecting other hooks.
func TestInvalidlHookWaitsForOtherHooksToComplete(t *testing.T) {
existingHook := `
apiVersion: v1
kind: Pod
metadata:
annotations:
argocd.argoproj.io/hook: Sync
argocd.argoproj.io/hook-delete-policy: HookFailed # To preserve existence before sync
name: invalid-hook
spec:
containers:
- command:
- "true"
image: "quay.io/argoprojlabs/argocd-e2e-container:0.1"
imagePullPolicy: IfNotPresent
name: main
restartPolicy: Never`
ctx := Given(t)
ctx.Path("hook").
Resource(existingHook).
When().
AddFile("invalid-hook.yaml", existingHook).
// The invalid hook needs to be valid in dry-run, but fail at apply time
// We change an immutable field to make it happen, and hook should already exist since delete policy was HookFailed on last sync
PatchFile("invalid-hook.yaml", `[{"op": "replace", "path": "/spec/containers/0/name", "value": "immutable" }]`).
CreateApp().
IgnoreErrors().
Sync().
Then().
Expect(ResourceResultNumbering(3)).
Expect(ResourceResultMatches(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "invalid-hook", Status: ResultCodeSyncFailed, Message: `Pod "invalid-hook" is invalid`, HookType: HookTypeSync, HookPhase: OperationFailed, SyncPhase: SyncPhase(HookTypeSync)})).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "hook", Status: ResultCodeSynced, Message: "pod/hook created", HookType: HookTypeSync, HookPhase: OperationSucceeded, SyncPhase: SyncPhase(HookTypeSync)})).
Expect(OperationPhaseIs(OperationFailed)).
Expect(Pod(func(p corev1.Pod) bool {
// Completed hooks should not have a finalizer
_, isHook := p.GetAnnotations()[AnnotationKeyHook]
hasFinalizer := controllerutil.ContainsFinalizer(&p, hook.HookFinalizer)
return p.GetName() == "hook" && isHook && !hasFinalizer
}))
}
func TestInvalidSyncFailureHookWaitsForOtherHooksToComplete(t *testing.T) {
existingHook := `
apiVersion: v1
kind: Pod
metadata:
annotations:
argocd.argoproj.io/hook: SyncFail
argocd.argoproj.io/hook-delete-policy: HookSucceeded # To preserve existence before sync
name: invalid-sync-fail-hook
spec:
containers:
- command:
- "true"
image: "quay.io/argoprojlabs/argocd-e2e-container:0.1"
imagePullPolicy: IfNotPresent
name: main
restartPolicy: Never`
ctx := Given(t)
ctx.Path("hook").
Resource(existingHook).
When().
AddFile("successful-sync-fail-hook.yaml", `
apiVersion: v1
kind: Pod
metadata:
annotations:
argocd.argoproj.io/hook: SyncFail
name: successful-sync-fail-hook
spec:
containers:
- command:
- "true"
image: "quay.io/argoprojlabs/argocd-e2e-container:0.1"
imagePullPolicy: IfNotPresent
name: main
restartPolicy: Never
`).
AddFile("invalid-sync-fail-hook.yaml", existingHook).
// The invalid hook needs to be valid in dry-run, but fail at apply time
// We change an immutable field to make it happen, and hook should already exist since delete policy was HookFailed on last sync
PatchFile("invalid-sync-fail-hook.yaml", `[{"op": "replace", "path": "/spec/containers/0/name", "value": "immutable" }]`).
// Make the sync fail
PatchFile("hook.yaml", `[{"op": "replace", "path": "/spec/containers/0/command/0", "value": "false"}]`).
CreateApp().
IgnoreErrors().
Sync().
Then().
Expect(ResourceResultNumbering(4)).
Expect(ResourceResultMatches(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "invalid-sync-fail-hook", Status: ResultCodeSyncFailed, Message: `Pod "invalid-sync-fail-hook" is invalid`, HookType: HookTypeSyncFail, HookPhase: OperationFailed, SyncPhase: SyncPhase(HookTypeSyncFail)})).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "successful-sync-fail-hook", Status: ResultCodeSynced, Message: "pod/successful-sync-fail-hook created", HookType: HookTypeSyncFail, HookPhase: OperationSucceeded, SyncPhase: SyncPhase(HookTypeSyncFail)})).
Expect(OperationPhaseIs(OperationFailed)).
Expect(Pod(func(p corev1.Pod) bool {
// Completed hooks should not have a finalizer
_, isHook := p.GetAnnotations()[AnnotationKeyHook]
hasFinalizer := controllerutil.ContainsFinalizer(&p, hook.HookFinalizer)
return p.GetName() == "successful-sync-fail-hook" && isHook && !hasFinalizer
}))
}
// make sure that we delete the hook on success
@@ -491,6 +626,8 @@ func TestHookFinalizerPostSync(t *testing.T) {
}
func testHookFinalizer(t *testing.T, hookType HookType) {
// test that the finalizer prevents hooks from being deleted by Kubernetes without observing
// its health to evaluate completion first.
t.Helper()
ctx := Given(t)
ctx.
@@ -531,3 +668,79 @@ func testHookFinalizer(t *testing.T, hookType HookType) {
Expect(ResourceResultNumbering(2)).
Expect(ResourceResultIs(ResourceResult{Group: "batch", Version: "v1", Kind: "Job", Namespace: ctx.DeploymentNamespace(), Name: "hook", Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Message: "Resource has finalizer", HookType: hookType, Status: ResultCodeSynced, HookPhase: OperationSucceeded, SyncPhase: SyncPhase(hookType)}))
}
// test terminate operation stops running hooks
func TestTerminateWithRunningHooks(t *testing.T) {
newHook := func(name string, deletePolicy HookDeletePolicy, cmd string) string {
return fmt.Sprintf(`
apiVersion: v1
kind: Pod
metadata:
annotations:
argocd.argoproj.io/hook: PreSync
argocd.argoproj.io/hook-delete-policy: %s
name: %s
spec:
containers:
- command: [ "/bin/sh", "-c", "--" ]
args: [ "%s" ]
image: "quay.io/argoprojlabs/argocd-e2e-container:0.1"
imagePullPolicy: IfNotPresent
name: main
restartPolicy: Never`, deletePolicy, name, cmd)
}
podDeletedOrTerminatingWithoutFinalizer := func(name string) Expectation {
return Or(
NotPod(func(p corev1.Pod) bool {
return p.GetName() == name
}),
Pod(func(p corev1.Pod) bool {
_, isHook := p.GetAnnotations()[AnnotationKeyHook]
hasFinalizer := controllerutil.ContainsFinalizer(&p, hook.HookFinalizer)
return p.GetName() == name && isHook && !hasFinalizer && p.GetDeletionTimestamp() != nil
}))
}
podWithoutFinalizer := func(name string) Expectation {
return Pod(func(p corev1.Pod) bool {
_, isHook := p.GetAnnotations()[AnnotationKeyHook]
hasFinalizer := controllerutil.ContainsFinalizer(&p, hook.HookFinalizer)
return p.GetName() == name && isHook && !hasFinalizer
})
}
ctx := Given(t)
ctx.Path("hook").
Async(true).
When().
AddFile("running-delete-on-success.yaml", newHook("running-delete-on-success", HookDeletePolicyHookSucceeded, "sleep 300")).
AddFile("running-delete-on-create.yaml", newHook("running-delete-on-create", HookDeletePolicyBeforeHookCreation, "sleep 300")).
AddFile("running-delete-on-failed.yaml", newHook("running-delete-on-failed", HookDeletePolicyHookFailed, "sleep 300")).
AddFile("complete-delete-on-success.yaml", newHook("complete-delete-on-success", HookDeletePolicyHookSucceeded, "true")).
AddFile("complete-delete-on-create.yaml", newHook("complete-delete-on-create", HookDeletePolicyBeforeHookCreation, "true")).
AddFile("complete-delete-on-failed.yaml", newHook("complete-delete-on-failed", HookDeletePolicyHookFailed, "true")).
CreateApp().
Sync().
Then().
Expect(ResourceResultNumbering(6)).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "complete-delete-on-success", Status: ResultCodeSynced, Message: "pod/complete-delete-on-success created", HookType: HookTypePreSync, HookPhase: OperationSucceeded, SyncPhase: SyncPhase(HookTypePreSync)})).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "complete-delete-on-create", Status: ResultCodeSynced, Message: "pod/complete-delete-on-create created", HookType: HookTypePreSync, HookPhase: OperationSucceeded, SyncPhase: SyncPhase(HookTypePreSync)})).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "complete-delete-on-failed", Status: ResultCodeSynced, Message: "pod/complete-delete-on-failed created", HookType: HookTypePreSync, HookPhase: OperationSucceeded, SyncPhase: SyncPhase(HookTypePreSync)})).
Expect(OperationPhaseIs(OperationRunning)).
When().
TerminateOp().
Then().
Expect(OperationPhaseIs(OperationFailed)).
// Running hooks are terminated
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "running-delete-on-success", Status: ResultCodeSynced, Message: "Terminated", HookType: HookTypePreSync, HookPhase: OperationFailed, SyncPhase: SyncPhase(HookTypePreSync)})).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "running-delete-on-create", Status: ResultCodeSynced, Message: "Terminated", HookType: HookTypePreSync, HookPhase: OperationFailed, SyncPhase: SyncPhase(HookTypePreSync)})).
Expect(ResourceResultIs(ResourceResult{Version: "v1", Kind: "Pod", Namespace: ctx.DeploymentNamespace(), Images: []string{"quay.io/argoprojlabs/argocd-e2e-container:0.1"}, Name: "running-delete-on-failed", Status: ResultCodeSynced, Message: "Terminated", HookType: HookTypePreSync, HookPhase: OperationFailed, SyncPhase: SyncPhase(HookTypePreSync)})).
// terminated hooks finalizer is removed and are deleted successfully
Expect(podDeletedOrTerminatingWithoutFinalizer("running-delete-on-success")).
Expect(podDeletedOrTerminatingWithoutFinalizer("running-delete-on-create")).
Expect(podDeletedOrTerminatingWithoutFinalizer("running-delete-on-failed")).
Expect(podWithoutFinalizer("complete-delete-on-success")).
Expect(podWithoutFinalizer("complete-delete-on-create")).
Expect(podWithoutFinalizer("complete-delete-on-failed"))
}