Compare commits

...

1 Commits

Author SHA1 Message Date
Jesse Suen
cf3776903d Retry argocd app wait connection errors from EOF watch. Show detailed state changes 2018-06-06 10:45:59 -07:00
3 changed files with 141 additions and 52 deletions

5
Gopkg.lock generated
View File

@@ -747,6 +747,8 @@
"discovery/fake",
"dynamic",
"dynamic/fake",
"informers/core/v1",
"informers/internalinterfaces",
"kubernetes",
"kubernetes/fake",
"kubernetes/scheme",
@@ -806,6 +808,7 @@
"kubernetes/typed/storage/v1alpha1/fake",
"kubernetes/typed/storage/v1beta1",
"kubernetes/typed/storage/v1beta1/fake",
"listers/core/v1",
"pkg/version",
"plugin/pkg/client/auth/gcp",
"plugin/pkg/client/auth/oidc",
@@ -876,6 +879,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "736e8116bcf49bf0889c2caf8e832a78a43fd5af65c5d10c86a443993de63d3c"
inputs-digest = "e336c1acaf22142bbb031deed1513923d09b6fda70d228e48a9556f9d40cb785"
solver-name = "gps-cdcl"
solver-version = 1

View File

@@ -1 +1 @@
0.4.5
0.4.6

View File

@@ -3,11 +3,13 @@ package commands
import (
"context"
"fmt"
"io"
"net/url"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/argoproj/argo-cd/errors"
argocdclient "github.com/argoproj/argo-cd/pkg/apiclient"
@@ -21,7 +23,9 @@ import (
"github.com/spf13/pflag"
"github.com/yudai/gojsondiff/formatter"
"golang.org/x/crypto/ssh/terminal"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@@ -156,15 +160,7 @@ func NewApplicationGetCommand(clientOpts *argocdclient.ClientOptions) *cobra.Com
fmt.Println()
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintf(w, "KIND\tNAME\tSTATUS\tHEALTH\n")
for _, res := range app.Status.ComparisonResult.Resources {
obj, err := argoappv1.UnmarshalToUnstructured(res.TargetState)
errors.CheckError(err)
if obj == nil {
obj, err = argoappv1.UnmarshalToUnstructured(res.LiveState)
errors.CheckError(err)
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", obj.GetKind(), obj.GetName(), res.Status, res.Health.Status)
}
printAppResources(w, app)
_ = w.Flush()
}
},
@@ -470,51 +466,43 @@ func NewApplicationWaitCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
if syncOnly && healthOnly {
log.Fatalln("Please specify at most one of --sync-only or --health-only.")
}
appName := args[0]
conn, appIf := argocdclient.NewClientOrDie(clientOpts).NewApplicationClientOrDie()
defer util.Close(conn)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
appName := args[0]
wc, err := appIf.Watch(context.Background(), &application.ApplicationQuery{
Name: &appName,
})
errors.CheckError(err)
success := util.Wait(timeout, func(done chan<- bool) {
for {
appEvent, err := wc.Recv()
errors.CheckError(err)
app := appEvent.Application
healthStatus := app.Status.Health.Status
syncStatus := app.Status.ComparisonResult.Status
log.Printf("App %q has sync status %q and health status %q", appName, syncStatus, healthStatus)
synced := (syncStatus == argoappv1.ComparisonStatusSynced)
healthy := (healthStatus == argoappv1.HealthStatusHealthy)
if (synced && healthy) || (synced && syncOnly) || (healthy && healthOnly) {
done <- true
}
}
})
if success {
log.Printf("App %q matches desired state", appName)
} else {
app, err := appIf.Get(context.Background(), &application.ApplicationQuery{Name: &appName})
errors.CheckError(err)
if len(app.Status.ComparisonResult.Resources) > 0 {
for _, res := range app.Status.ComparisonResult.Resources {
targetObj, err := argoappv1.UnmarshalToUnstructured(res.TargetState)
errors.CheckError(err)
if res.Status != argoappv1.ComparisonStatusSynced || res.Health.Status != argoappv1.HealthStatusHealthy {
log.Warnf("%s %q has sync status %q and health status %q: %s", targetObj.GetKind(), targetObj.GetName(), res.Status, res.Health.Status, res.Health.StatusDetails)
}
}
}
log.Fatalf("Timed out before seeing app %q match desired state", appName)
if timeout != 0 {
time.AfterFunc(time.Duration(timeout)*time.Second, func() {
cancel()
})
}
// print the initial components to format the tabwriter columns
app, err := appIf.Get(ctx, &application.ApplicationQuery{Name: &appName})
errors.CheckError(err)
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintf(w, "KIND\tNAME\tSTATUS\tHEALTH\n")
printAppResources(w, app)
_ = w.Flush()
prevCompRes := &app.Status.ComparisonResult
appEventCh := watchApp(ctx, appIf, appName)
for appEvent := range appEventCh {
app := appEvent.Application
printAppStateChange(w, prevCompRes, &app)
_ = w.Flush()
prevCompRes = &app.Status.ComparisonResult
synced := (app.Status.ComparisonResult.Status == argoappv1.ComparisonStatusSynced)
healthy := (app.Status.Health.Status == argoappv1.HealthStatusHealthy)
if (synced && healthy) || (synced && syncOnly) || (healthy && healthOnly) {
log.Printf("App %q matches desired state", appName)
return
}
}
log.Fatalf("Timed out (%ds) waiting for app %q match desired state", timeout, appName)
},
}
command.Flags().BoolVar(&syncOnly, "sync-only", false, "Wait only for sync")
@@ -523,6 +511,104 @@ func NewApplicationWaitCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
return command
}
func isCanceledContextErr(err error) bool {
if err == context.Canceled {
return true
}
if stat, ok := status.FromError(err); ok {
if stat.Code() == codes.Canceled {
return true
}
}
return false
}
// watchApp returns a channel of watch events for an app, retrying the watch upon errors. Closes
// the returned channel when the context is discovered to be canceled.
func watchApp(ctx context.Context, appIf application.ApplicationServiceClient, appName string) chan *argoappv1.ApplicationWatchEvent {
appEventsCh := make(chan *argoappv1.ApplicationWatchEvent)
go func() {
defer close(appEventsCh)
for {
wc, err := appIf.Watch(ctx, &application.ApplicationQuery{
Name: &appName,
})
if err != nil {
if isCanceledContextErr(err) {
return
}
if err != io.EOF {
log.Warnf("watch err: %v", err)
}
time.Sleep(1 * time.Second)
continue
}
for {
appEvent, err := wc.Recv()
if err != nil {
if isCanceledContextErr(err) {
return
}
if err != io.EOF {
log.Warnf("recv err: %v", err)
}
time.Sleep(1 * time.Second)
break
} else {
appEventsCh <- appEvent
}
}
}
}()
return appEventsCh
}
// printAppResources prints the resources of an application in a tabwriter table
func printAppResources(w io.Writer, app *argoappv1.Application) {
for _, res := range app.Status.ComparisonResult.Resources {
obj, err := argoappv1.UnmarshalToUnstructured(res.TargetState)
errors.CheckError(err)
if obj == nil {
obj, err = argoappv1.UnmarshalToUnstructured(res.LiveState)
errors.CheckError(err)
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", obj.GetKind(), obj.GetName(), res.Status, res.Health.Status)
}
}
// printAppStateChange prints a component state change if it was different from the last time we saw it
func printAppStateChange(w io.Writer, prevComp *argoappv1.ComparisonResult, app *argoappv1.Application) {
getPrevResState := func(kind, name string) (argoappv1.ComparisonStatus, argoappv1.HealthStatusCode) {
for _, res := range prevComp.Resources {
obj, err := argoappv1.UnmarshalToUnstructured(res.TargetState)
errors.CheckError(err)
if obj == nil {
obj, err = argoappv1.UnmarshalToUnstructured(res.LiveState)
errors.CheckError(err)
}
if obj.GetKind() == kind && obj.GetName() == name {
return res.Status, res.Health.Status
}
}
return "", ""
}
if len(app.Status.ComparisonResult.Resources) > 0 {
for _, res := range app.Status.ComparisonResult.Resources {
obj, err := argoappv1.UnmarshalToUnstructured(res.TargetState)
errors.CheckError(err)
if obj == nil {
obj, err = argoappv1.UnmarshalToUnstructured(res.LiveState)
errors.CheckError(err)
}
prevSync, prevHealth := getPrevResState(obj.GetKind(), obj.GetName())
if prevSync != res.Status || prevHealth != res.Health.Status {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", obj.GetKind(), obj.GetName(), res.Status, res.Health.Status)
}
}
}
}
// NewApplicationSyncCommand returns a new instance of an `argocd app sync` command
func NewApplicationSyncCommand(clientOpts *argocdclient.ClientOptions) *cobra.Command {
var (