Compare commits

...

12 Commits

Author SHA1 Message Date
Alexander Matyushentsev
22fe538645 Update manifests to v0.12.0-rc4 2019-03-12 14:27:35 -07:00
Alexander Matyushentsev
24d73e56f1 Issue #1252 - Application controller incorrectly build application objects tree (#1253) 2019-03-12 12:21:50 -07:00
Alexander Matyushentsev
8f93bdc2a5 Issue #1247 - Fix CRD creation/deletion handling (#1249) 2019-03-12 12:21:45 -07:00
Alex Collins
6d982ca397 Migrates from gometalinter to golangci-lint. Closes #1225 (#1226) 2019-03-12 12:21:34 -07:00
Jesse Suen
eff67bffad Replace git fetch implementation with git CLI (from go-git) (#1244) 2019-03-08 14:09:22 -08:00
Jesse Suen
2b781eea49 Bump version and manifests to v0.12.0-rc3 2019-03-06 16:53:14 -08:00
Jesse Suen
2f0dc20235 Fix nil pointer dereference in CompareAppState (#1234) 2019-03-06 13:46:52 -08:00
Jesse Suen
36dc50c121 Bump version and manifests to v0.12.0-rc2 2019-03-06 02:01:07 -08:00
Alexander Matyushentsev
f8f974e871 Issue #1231 - Deprecated resource kinds from 'extensions' groups are not reconciled correctly (#1232) 2019-03-06 01:59:33 -08:00
Alexander Matyushentsev
c4b474ae98 Issue #1229 - App creation failed for public repository (#1230) 2019-03-06 01:59:26 -08:00
Jesse Suen
8d98d6e058 Sort kustomize params in GetAppDetails 2019-03-05 15:45:12 -08:00
Jesse Suen
233708ecdd Set release to v0.12.0-rc1 2019-03-05 15:23:34 -08:00
47 changed files with 489 additions and 785 deletions

View File

@@ -44,7 +44,7 @@ spec:
repo: "{{workflow.parameters.repo}}"
revision: "{{workflow.parameters.revision}}"
container:
image: argoproj/argo-cd-ci-builder:v0.12.0
image: argoproj/argo-cd-ci-builder:v0.13.0
imagePullPolicy: Always
command: [bash, -c]
args: ["{{inputs.parameters.cmd}}"]
@@ -73,7 +73,7 @@ spec:
repo: "{{workflow.parameters.repo}}"
revision: "{{workflow.parameters.revision}}"
container:
image: argoproj/argo-cd-ci-builder:v0.12.0
image: argoproj/argo-cd-ci-builder:v0.13.0
imagePullPolicy: Always
command: [sh, -c]
args: ["until docker ps; do sleep 3; done && {{inputs.parameters.cmd}}"]

22
.golangci.yml Normal file
View File

@@ -0,0 +1,22 @@
run:
deadline: 8m
skip-files:
- ".*\\.pb\\.go"
skip-dirs:
- pkg/client
- vendor
linter-settings:
goimports:
local-prefixes: github.com/argoproj/argo-cd
linters:
enable:
- vet
- gofmt
- goimports
- deadcode
- varcheck
- structcheck
- ineffassign
- unconvert
- misspell
disable-all: true

View File

@@ -40,10 +40,8 @@ go get -u github.com/golang/protobuf/protoc-gen-go
go get -u github.com/go-swagger/go-swagger/cmd/swagger
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u gopkg.in/alecthomas/gometalinter.v2
go get -u github.com/golangci/golangci-lint/cmd/golangci-lint
go get -u github.com/mattn/goreman
gometalinter.v2 --install
```
## Building

View File

@@ -79,6 +79,17 @@ ENV AWS_IAM_AUTHENTICATOR_VERSION=0.4.0-alpha.1
RUN curl -L -o /usr/local/bin/aws-iam-authenticator https://github.com/kubernetes-sigs/aws-iam-authenticator/releases/download/${AWS_IAM_AUTHENTICATOR_VERSION}/aws-iam-authenticator_${AWS_IAM_AUTHENTICATOR_VERSION}_linux_amd64 && \
chmod +x /usr/local/bin/aws-iam-authenticator
# Install golangci-lint
RUN wget https://install.goreleaser.com/github.com/golangci/golangci-lint.sh && \
chmod +x ./golangci-lint.sh && \
./golangci-lint.sh -b $GOPATH/bin && \
golangci-lint linters
COPY .golangci.yml ${GOPATH}/src/dummy/.golangci.yml
RUN cd ${GOPATH}/src/dummy && \
touch dummy.go \
golangci-lint run
####################################################################################################
# Argo CD Base - used as the base for both the release and dev argocd images
@@ -94,6 +105,7 @@ RUN groupadd -g 999 argocd && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
COPY hack/git-ask-pass.sh /usr/local/bin/git-ask-pass.sh
COPY --from=builder /usr/local/bin/ks /usr/local/bin/ks
COPY --from=builder /usr/local/bin/helm /usr/local/bin/helm
COPY --from=builder /usr/local/bin/kubectl /usr/local/bin/kubectl

33
Gopkg.lock generated
View File

@@ -48,25 +48,6 @@
pruneopts = ""
revision = "09c41003ee1d5015b75f331e52215512e7145b8d"
[[projects]]
branch = "master"
digest = "1:a74730e052a45a3fab1d310fdef2ec17ae3d6af16228421e238320846f2aaec8"
name = "github.com/alecthomas/template"
packages = [
".",
"parse",
]
pruneopts = ""
revision = "a0175ee3bccc567396460bf5acd36800cb10c49c"
[[projects]]
branch = "master"
digest = "1:8483994d21404c8a1d489f6be756e25bfccd3b45d65821f25695577791a08e68"
name = "github.com/alecthomas/units"
packages = ["."]
pruneopts = ""
revision = "2efee857e7cfd4f3d0138cc3cbb1b4966962b93a"
[[projects]]
branch = "master"
digest = "1:0caf9208419fa5db5a0ca7112affaa9550c54291dda8e2abac0c0e76181c959e"
@@ -760,7 +741,6 @@
packages = [
"expfmt",
"internal/bitbucket.org/ww/goautoneg",
"log",
"model",
]
pruneopts = ""
@@ -992,8 +972,6 @@
packages = [
"unix",
"windows",
"windows/registry",
"windows/svc/eventlog",
]
pruneopts = ""
revision = "d0be0721c37eeb5299f245a996a483160fc36940"
@@ -1115,14 +1093,6 @@
revision = "8dea3dc473e90c8179e519d91302d0597c0ca1d1"
version = "v1.15.0"
[[projects]]
digest = "1:15d017551627c8bb091bde628215b2861bed128855343fdd570c62d08871f6e1"
name = "gopkg.in/alecthomas/kingpin.v2"
packages = ["."]
pruneopts = ""
revision = "947dcec5ba9c011838740e680966fd7087a71d0d"
version = "v2.2.6"
[[projects]]
digest = "1:bf7444e1e6a36e633f4f1624a67b9e4734cfb879c27ac0a2082ac16aff8462ac"
name = "gopkg.in/go-playground/webhooks.v3"
@@ -1378,6 +1348,7 @@
"discovery",
"discovery/fake",
"dynamic",
"dynamic/fake",
"informers/core/v1",
"informers/internalinterfaces",
"kubernetes",
@@ -1610,7 +1581,6 @@
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promhttp",
"github.com/prometheus/common/log",
"github.com/sirupsen/logrus",
"github.com/skratchdot/open-golang/open",
"github.com/soheilhy/cmux",
@@ -1673,6 +1643,7 @@
"k8s.io/client-go/discovery",
"k8s.io/client-go/discovery/fake",
"k8s.io/client-go/dynamic",
"k8s.io/client-go/dynamic/fake",
"k8s.io/client-go/informers/core/v1",
"k8s.io/client-go/kubernetes",
"k8s.io/client-go/kubernetes/fake",

View File

@@ -119,10 +119,11 @@ endif
.PHONY: builder-image
builder-image:
docker build -t $(IMAGE_PREFIX)argo-cd-ci-builder:$(IMAGE_TAG) --target builder .
docker push $(IMAGE_PREFIX)argo-cd-ci-builder:$(IMAGE_TAG)
.PHONY: dep-ensure
dep-ensure:
dep ensure
dep ensure -no-vendor
.PHONY: format-code
format-code:
@@ -130,7 +131,7 @@ format-code:
.PHONY: lint
lint:
gometalinter.v2 --config gometalinter.json ./...
golangci-lint run
.PHONY: test
test:
@@ -159,4 +160,4 @@ release-precheck: manifests
@if [ "$(GIT_TAG)" != "v`cat VERSION`" ]; then echo 'VERSION does not match git tag'; exit 1; fi
.PHONY: release
release: release-precheck precheckin image release-cli
release: release-precheck pre-commit image release-cli

View File

@@ -1 +1 @@
0.12.0
0.12.0-rc4

View File

@@ -9,7 +9,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/argoproj/argo-cd"
argocd "github.com/argoproj/argo-cd"
"github.com/argoproj/argo-cd/errors"
"github.com/argoproj/argo-cd/reposerver"
"github.com/argoproj/argo-cd/util/cache"

View File

@@ -15,7 +15,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/argoproj/argo-cd/errors"
argocdclient "github.com/argoproj/argo-cd/pkg/apiclient"

View File

@@ -659,7 +659,7 @@ func (ctrl *ApplicationController) refreshAppConditions(app *appv1.Application)
})
}
} else {
specConditions, err := argo.GetSpecErrors(context.Background(), &app.Spec, proj, ctrl.repoClientset, ctrl.db)
specConditions, _, err := argo.GetSpecErrors(context.Background(), &app.Spec, proj, ctrl.repoClientset, ctrl.db)
if err != nil {
conditions = append(conditions, appv1.ApplicationCondition{
Type: appv1.ApplicationConditionUnknownError,

View File

@@ -2,7 +2,6 @@ package cache
import (
"context"
"fmt"
"sync"
log "github.com/sirupsen/logrus"
@@ -19,7 +18,7 @@ import (
)
type LiveStateCache interface {
IsNamespaced(server string, gvk schema.GroupVersionKind) (bool, error)
IsNamespaced(server string, obj *unstructured.Unstructured) (bool, error)
// Returns child nodes for a given k8s resource
GetChildren(server string, obj *unstructured.Unstructured) ([]appv1.ResourceNode, error)
// Returns state of live nodes which correspond for target nodes of specified application.
@@ -73,13 +72,6 @@ func (c *liveStateCache) processEvent(event watch.EventType, obj *unstructured.U
return info.processEvent(event, obj)
}
func (c *liveStateCache) removeCluster(server string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.clusters, server)
log.Infof("Dropped cluster %s cache", server)
}
func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) {
c.lock.Lock()
defer c.lock.Unlock()
@@ -90,7 +82,7 @@ func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) {
return nil, err
}
info = &clusterInfo{
apis: make(map[schema.GroupKind]*gkInfo),
apisMeta: make(map[schema.GroupKind]*apiMeta),
lock: &sync.Mutex{},
nodes: make(map[kube.ResourceKey]*node),
nsIndex: make(map[string]map[kube.ResourceKey]*node),
@@ -140,12 +132,12 @@ func (c *liveStateCache) Delete(server string, obj *unstructured.Unstructured) e
return clusterInfo.delete(obj)
}
func (c *liveStateCache) IsNamespaced(server string, gvk schema.GroupVersionKind) (bool, error) {
func (c *liveStateCache) IsNamespaced(server string, obj *unstructured.Unstructured) (bool, error) {
clusterInfo, err := c.getSyncedCluster(server)
if err != nil {
return false, err
}
return clusterInfo.isNamespaced(gvk.GroupKind()), nil
return clusterInfo.isNamespaced(obj), nil
}
func (c *liveStateCache) GetChildren(server string, obj *unstructured.Unstructured) ([]appv1.ResourceNode, error) {
@@ -175,135 +167,29 @@ func isClusterHasApps(apps []interface{}, cluster *appv1.Cluster) bool {
// Run watches for resource changes annotated with application label on all registered clusters and schedule corresponding app refresh.
func (c *liveStateCache) Run(ctx context.Context) {
watchingClustersLock := sync.Mutex{}
watchingClusters := make(map[string]struct {
cancel context.CancelFunc
cluster *appv1.Cluster
})
util.RetryUntilSucceed(func() error {
clusterEventCallback := func(event *db.ClusterEvent) {
info, ok := watchingClusters[event.Cluster.Server]
hasApps := isClusterHasApps(c.appInformer.GetStore().List(), event.Cluster)
// cluster resources must be watched only if cluster has at least one app
if (event.Type == watch.Deleted || !hasApps) && ok {
info.cancel()
watchingClustersLock.Lock()
delete(watchingClusters, event.Cluster.Server)
watchingClustersLock.Unlock()
} else if event.Type != watch.Deleted && !ok && hasApps {
ctx, cancel := context.WithCancel(ctx)
watchingClustersLock.Lock()
watchingClusters[event.Cluster.Server] = struct {
cancel context.CancelFunc
cluster *appv1.Cluster
}{
cancel: func() {
c.removeCluster(event.Cluster.Server)
cancel()
},
cluster: event.Cluster,
}
watchingClustersLock.Unlock()
go c.watchClusterResources(ctx, *event.Cluster)
}
}
onAppModified := func(obj interface{}) {
if app, ok := obj.(*appv1.Application); ok {
var cluster *appv1.Cluster
info, infoOk := watchingClusters[app.Spec.Destination.Server]
if infoOk {
cluster = info.cluster
} else {
cluster, _ = c.db.GetCluster(ctx, app.Spec.Destination.Server)
}
if cluster != nil {
// trigger cluster event every time when app created/deleted to either start or stop watching resources
clusterEventCallback(&db.ClusterEvent{Cluster: cluster, Type: watch.Modified})
c.lock.Lock()
defer c.lock.Unlock()
if cluster, ok := c.clusters[event.Cluster.Server]; ok {
if event.Type == watch.Deleted {
cluster.invalidate()
delete(c.clusters, event.Cluster.Server)
} else if event.Type == watch.Modified {
cluster.cluster = event.Cluster
cluster.invalidate()
} else if event.Type == watch.Added && isClusterHasApps(c.appInformer.GetStore().List(), event.Cluster) {
go func() {
// warm up cache for cluster with apps
_ = cluster.ensureSynced()
}()
}
}
}
c.appInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAppModified,
UpdateFunc: func(oldObj, newObj interface{}) {
oldApp, oldOk := oldObj.(*appv1.Application)
newApp, newOk := newObj.(*appv1.Application)
if oldOk && newOk {
if oldApp.Spec.Destination.Server != newApp.Spec.Destination.Server {
onAppModified(oldObj)
onAppModified(newApp)
}
}
},
DeleteFunc: onAppModified,
})
return c.db.WatchClusters(ctx, clusterEventCallback)
}, "watch clusters", ctx, clusterRetryTimeout)
<-ctx.Done()
}
// watchClusterResources watches for resource changes annotated with application label on specified cluster and schedule corresponding app refresh.
func (c *liveStateCache) watchClusterResources(ctx context.Context, item appv1.Cluster) {
util.RetryUntilSucceed(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Recovered from panic: %v\n", r)
}
}()
config := item.RESTConfig()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ch, err := c.kubectl.WatchResources(ctx, config, c.settings, func(gk schema.GroupKind) (s string, e error) {
clusterInfo, err := c.getSyncedCluster(item.Server)
if err != nil {
return "", err
}
return clusterInfo.getResourceVersion(gk), nil
})
if err != nil {
return err
}
for event := range ch {
if event.WatchEvent != nil {
eventObj := event.WatchEvent.Object.(*unstructured.Unstructured)
if kube.IsCRD(eventObj) {
// restart if new CRD has been created after watch started
if event.WatchEvent.Type == watch.Added {
c.removeCluster(item.Server)
return fmt.Errorf("Restarting the watch because a new CRD %s was added", eventObj.GetName())
} else if event.WatchEvent.Type == watch.Deleted {
c.removeCluster(item.Server)
return fmt.Errorf("Restarting the watch because CRD %s was deleted", eventObj.GetName())
}
}
err = c.processEvent(event.WatchEvent.Type, eventObj, item.Server)
if err != nil {
log.Warnf("Failed to process event %s for obj %v: %v", event.WatchEvent.Type, event.WatchEvent.Object, err)
}
} else {
err = c.updateCache(item.Server, event.CacheRefresh.GVK.GroupKind(), event.CacheRefresh.ResourceVersion, event.CacheRefresh.Objects)
if err != nil {
log.Warnf("Failed to process event %s for obj %v: %v", event.WatchEvent.Type, event.WatchEvent.Object, err)
}
}
}
return fmt.Errorf("resource updates channel has closed")
}, fmt.Sprintf("watch app resources on %s", item.Server), ctx, clusterRetryTimeout)
}
func (c *liveStateCache) updateCache(server string, gk schema.GroupKind, resourceVersion string, objs []unstructured.Unstructured) error {
clusterInfo, err := c.getSyncedCluster(server)
if err != nil {
return err
}
clusterInfo.updateCache(gk, resourceVersion, objs)
return nil
}

View File

@@ -1,129 +0,0 @@
package cache
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/util/kube"
"github.com/argoproj/argo-cd/util/kube/kubetest"
)
const (
pollInterval = 500 * time.Millisecond
)
func TestWatchClusterResourcesHandlesResourceEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
events := make(chan kube.WatchEvent)
defer func() {
cancel()
close(events)
}()
pod := testPod.DeepCopy()
kubeMock := &kubetest.MockKubectlCmd{
Resources: []kube.ResourcesBatch{{
GVK: pod.GroupVersionKind(),
Objects: make([]unstructured.Unstructured, 0),
}},
Events: events,
}
server := "https://test"
clusterCache := newClusterExt(kubeMock)
cache := &liveStateCache{
clusters: map[string]*clusterInfo{server: clusterCache},
lock: &sync.Mutex{},
kubectl: kubeMock,
}
go cache.watchClusterResources(ctx, v1alpha1.Cluster{Server: server})
assert.False(t, clusterCache.synced())
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: pod, Type: watch.Added}}
err := wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
_, hasPod := clusterCache.nodes[kube.GetResourceKey(pod)]
return hasPod, nil
})
assert.Nil(t, err)
pod.SetResourceVersion("updated-resource-version")
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: pod, Type: watch.Modified}}
err = wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
updatedPodInfo, hasPod := clusterCache.nodes[kube.GetResourceKey(pod)]
return hasPod && updatedPodInfo.resourceVersion == "updated-resource-version", nil
})
assert.Nil(t, err)
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: pod, Type: watch.Deleted}}
err = wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
_, hasPod := clusterCache.nodes[kube.GetResourceKey(pod)]
return !hasPod, nil
})
assert.Nil(t, err)
}
func TestClusterCacheDroppedOnCreatedDeletedCRD(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
events := make(chan kube.WatchEvent)
defer func() {
cancel()
close(events)
}()
kubeMock := &kubetest.MockKubectlCmd{
Resources: []kube.ResourcesBatch{{
GVK: testCRD.GroupVersionKind(),
Objects: make([]unstructured.Unstructured, 0),
}},
Events: events,
}
server := "https://test"
clusterCache := newClusterExt(kubeMock)
cache := &liveStateCache{
clusters: map[string]*clusterInfo{server: clusterCache},
lock: &sync.Mutex{},
kubectl: kubeMock,
}
go cache.watchClusterResources(ctx, v1alpha1.Cluster{Server: server})
err := clusterCache.ensureSynced()
assert.Nil(t, err)
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: testCRD, Type: watch.Added}}
err = wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
cache.lock.Lock()
defer cache.lock.Unlock()
_, hasCache := cache.clusters[server]
return !hasCache, nil
})
assert.Nil(t, err)
cache.clusters[server] = clusterCache
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: testCRD, Type: watch.Deleted}}
err = wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
cache.lock.Lock()
defer cache.lock.Unlock()
_, hasCache := cache.clusters[server]
return !hasCache, nil
})
assert.Nil(t, err)
}

View File

@@ -1,6 +1,7 @@
package cache
import (
"context"
"fmt"
"runtime/debug"
"sync"
@@ -21,44 +22,38 @@ import (
)
const (
clusterSyncTimeout = 24 * time.Hour
clusterRetryTimeout = 10 * time.Second
clusterSyncTimeout = 24 * time.Hour
clusterRetryTimeout = 10 * time.Second
watchResourcesRetryTimeout = 1 * time.Second
)
type gkInfo struct {
resource metav1.APIResource
type apiMeta struct {
namespaced bool
resourceVersion string
watchCancel context.CancelFunc
}
type clusterInfo struct {
apis map[schema.GroupKind]*gkInfo
nodes map[kube.ResourceKey]*node
nsIndex map[string]map[kube.ResourceKey]*node
lock *sync.Mutex
syncLock *sync.Mutex
syncTime *time.Time
syncError error
apisMeta map[schema.GroupKind]*apiMeta
lock *sync.Mutex
nodes map[kube.ResourceKey]*node
nsIndex map[string]map[kube.ResourceKey]*node
onAppUpdated func(appName string)
kubectl kube.Kubectl
cluster *appv1.Cluster
syncLock *sync.Mutex
syncTime *time.Time
syncError error
log *log.Entry
settings *settings.ArgoCDSettings
}
func (c *clusterInfo) getResourceVersion(gk schema.GroupKind) string {
func (c *clusterInfo) replaceResourceCache(gk schema.GroupKind, resourceVersion string, objs []unstructured.Unstructured) {
c.lock.Lock()
defer c.lock.Unlock()
info, ok := c.apis[gk]
if ok {
return info.resourceVersion
}
return ""
}
func (c *clusterInfo) updateCache(gk schema.GroupKind, resourceVersion string, objs []unstructured.Unstructured) {
c.lock.Lock()
defer c.lock.Unlock()
info, ok := c.apis[gk]
info, ok := c.apisMeta[gk]
if ok {
objByKind := make(map[kube.ResourceKey]*unstructured.Unstructured)
for i := range objs {
@@ -136,7 +131,13 @@ func (c *clusterInfo) removeNode(key kube.ResourceKey) {
}
func (c *clusterInfo) invalidate() {
c.syncLock.Lock()
defer c.syncLock.Unlock()
c.syncTime = nil
for i := range c.apisMeta {
c.apisMeta[i].watchCancel()
}
c.apisMeta = nil
}
func (c *clusterInfo) synced() bool {
@@ -149,38 +150,160 @@ func (c *clusterInfo) synced() bool {
return time.Now().Before(c.syncTime.Add(clusterSyncTimeout))
}
func (c *clusterInfo) sync() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
}()
func (c *clusterInfo) stopWatching(gk schema.GroupKind) {
c.syncLock.Lock()
defer c.syncLock.Unlock()
if info, ok := c.apisMeta[gk]; ok {
info.watchCancel()
delete(c.apisMeta, gk)
c.replaceResourceCache(gk, "", []unstructured.Unstructured{})
log.Warnf("Stop watching %s not found on %s.", gk, c.cluster.Server)
}
}
c.log.Info("Start syncing cluster")
// startMissingWatches lists supported cluster resources and start watching for changes unless watch is already running
func (c *clusterInfo) startMissingWatches() error {
c.syncLock.Lock()
defer c.syncLock.Unlock()
c.apis = make(map[schema.GroupKind]*gkInfo)
c.nodes = make(map[kube.ResourceKey]*node)
resources, err := c.kubectl.GetResources(c.cluster.RESTConfig(), c.settings, "")
apis, err := c.kubectl.GetAPIResources(c.cluster.RESTConfig(), c.settings)
if err != nil {
log.Errorf("Failed to sync cluster %s: %v", c.cluster.Server, err)
return err
}
appLabelKey := c.settings.GetAppInstanceLabelKey()
for res := range resources {
if res.Error != nil {
return res.Error
for i := range apis {
api := apis[i]
if _, ok := c.apisMeta[api.GroupKind]; !ok {
ctx, cancel := context.WithCancel(context.Background())
info := &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}
c.apisMeta[api.GroupKind] = info
go c.watchEvents(ctx, api, info)
}
if _, ok := c.apis[res.GVK.GroupKind()]; !ok {
c.apis[res.GVK.GroupKind()] = &gkInfo{
resourceVersion: res.ListResourceVersion,
resource: res.ResourceInfo,
}
return nil
}
func runSynced(lock *sync.Mutex, action func() error) error {
lock.Lock()
defer lock.Unlock()
return action()
}
func (c *clusterInfo) watchEvents(ctx context.Context, api kube.APIResourceInfo, info *apiMeta) {
util.RetryUntilSucceed(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
}()
err = runSynced(c.syncLock, func() error {
if info.resourceVersion == "" {
list, err := api.Interface.List(metav1.ListOptions{})
if err != nil {
return err
}
c.replaceResourceCache(api.GroupKind, list.GetResourceVersion(), list.Items)
}
return nil
})
if err != nil {
return err
}
w, err := api.Interface.Watch(metav1.ListOptions{ResourceVersion: info.resourceVersion})
if errors.IsNotFound(err) {
c.stopWatching(api.GroupKind)
return nil
}
err = runSynced(c.syncLock, func() error {
if errors.IsGone(err) {
info.resourceVersion = ""
log.Warnf("Resource version of %s on %s is too old.", api.GroupKind, c.cluster.Server)
}
return err
})
if err != nil {
return err
}
defer w.Stop()
for {
select {
case <-ctx.Done():
return nil
case event, ok := <-w.ResultChan():
if ok {
obj := event.Object.(*unstructured.Unstructured)
info.resourceVersion = obj.GetResourceVersion()
err = c.processEvent(event.Type, obj)
if err != nil {
log.Warnf("Failed to process event %s %s/%s/%s: %v", event.Type, obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName(), err)
continue
}
if kube.IsCRD(obj) {
if event.Type == watch.Deleted {
group, groupOk, groupErr := unstructured.NestedString(obj.Object, "spec", "group")
kind, kindOk, kindErr := unstructured.NestedString(obj.Object, "spec", "names", "kind")
if groupOk && groupErr == nil && kindOk && kindErr == nil {
gk := schema.GroupKind{Group: group, Kind: kind}
c.stopWatching(gk)
}
} else {
err = c.startMissingWatches()
}
}
if err != nil {
log.Warnf("Failed to start missing watch: %v", err)
}
} else {
return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.cluster.Server)
}
}
}
for i := range res.Objects {
c.setNode(createObjInfo(&res.Objects[i], appLabelKey))
}, fmt.Sprintf("watch %s on %s", api.GroupKind, c.cluster.Server), ctx, watchResourcesRetryTimeout)
}
func (c *clusterInfo) sync() (err error) {
c.log.Info("Start syncing cluster")
c.apisMeta = make(map[schema.GroupKind]*apiMeta)
c.nodes = make(map[kube.ResourceKey]*node)
apis, err := c.kubectl.GetAPIResources(c.cluster.RESTConfig(), c.settings)
if err != nil {
return err
}
lock := sync.Mutex{}
err = util.RunAllAsync(len(apis), func(i int) error {
api := apis[i]
list, err := api.Interface.List(metav1.ListOptions{})
if err != nil {
return err
}
lock.Lock()
for i := range list.Items {
c.setNode(createObjInfo(&list.Items[i], c.settings.GetAppInstanceLabelKey()))
}
ctx, cancel := context.WithCancel(context.Background())
info := &apiMeta{namespaced: api.Meta.Namespaced, resourceVersion: list.GetResourceVersion(), watchCancel: cancel}
c.apisMeta[api.GroupKind] = info
lock.Unlock()
go c.watchEvents(ctx, api, info)
return nil
})
if err != nil {
log.Errorf("Failed to sync cluster %s: %v", c.cluster.Server, err)
return err
}
c.log.Info("Cluster successfully synced")
@@ -219,8 +342,8 @@ func (c *clusterInfo) getChildren(obj *unstructured.Unstructured) []appv1.Resour
return children
}
func (c *clusterInfo) isNamespaced(gk schema.GroupKind) bool {
if api, ok := c.apis[gk]; ok && !api.resource.Namespaced {
func (c *clusterInfo) isNamespaced(obj *unstructured.Unstructured) bool {
if api, ok := c.apisMeta[kube.GetResourceKey(obj).GroupKind()]; ok && !api.namespaced {
return false
}
return true
@@ -242,7 +365,7 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
lock := &sync.Mutex{}
err := util.RunAllAsync(len(targetObjs), func(i int) error {
targetObj := targetObjs[i]
key := GetTargetObjKey(a, targetObj, c.isNamespaced(targetObj.GroupVersionKind().GroupKind()))
key := GetTargetObjKey(a, targetObj, c.isNamespaced(targetObj))
lock.Lock()
managedObj := managedObjs[key]
lock.Unlock()
@@ -311,9 +434,6 @@ func (c *clusterInfo) processEvent(event watch.EventType, un *unstructured.Unstr
c.lock.Lock()
defer c.lock.Unlock()
key := kube.GetResourceKey(un)
if info, ok := c.apis[schema.GroupKind{Group: key.Group, Kind: key.Kind}]; ok {
info.resourceVersion = un.GetResourceVersion()
}
existingNode, exists := c.nodes[key]
if event == watch.Deleted {
if exists {

View File

@@ -6,6 +6,9 @@ import (
"sync"
"testing"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic/fake"
"github.com/ghodss/yaml"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@@ -49,8 +52,7 @@ var (
resourceVersion: "123"`)
testRS = strToUnstructured(`
apiVersion: v1
apiVersion: extensions/v1beta1
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: helm-guestbook-rs
@@ -62,39 +64,39 @@ var (
resourceVersion: "123"`)
testDeploy = strToUnstructured(`
apiVersion: extensions/v1beta1
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app.kubernetes.io/instance: helm-guestbook
name: helm-guestbook
namespace: default
resourceVersion: "123"`)
testCRD = strToUnstructured(`
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: my-custom-resource-definition
resourceVersion: "123"`)
)
func newCluster(objs ...*unstructured.Unstructured) *clusterInfo {
resByGVK := make(map[schema.GroupVersionKind][]unstructured.Unstructured)
runtimeObjs := make([]runtime.Object, len(objs))
for i := range objs {
resByGVK[objs[i].GroupVersionKind()] = append(resByGVK[objs[i].GroupVersionKind()], *objs[i])
runtimeObjs[i] = objs[i]
}
resources := make([]kube.ResourcesBatch, 0)
for gvk, objects := range resByGVK {
resources = append(resources, kube.ResourcesBatch{
ListResourceVersion: "1",
GVK: gvk,
Objects: objects,
})
}
return newClusterExt(kubetest.MockKubectlCmd{
Resources: resources,
})
scheme := runtime.NewScheme()
client := fake.NewSimpleDynamicClient(scheme, runtimeObjs...)
apiResources := []kube.APIResourceInfo{{
GroupKind: schema.GroupKind{Group: "", Kind: "Pod"},
Interface: client.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}),
Meta: metav1.APIResource{Namespaced: true},
}, {
GroupKind: schema.GroupKind{Group: "apps", Kind: "ReplicaSet"},
Interface: client.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}),
Meta: metav1.APIResource{Namespaced: true},
}, {
GroupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"},
Interface: client.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}),
Meta: metav1.APIResource{Namespaced: true},
}}
return newClusterExt(kubetest.MockKubectlCmd{APIResources: apiResources})
}
func newClusterExt(kubectl kube.Kubectl) *clusterInfo {
@@ -107,7 +109,7 @@ func newClusterExt(kubectl kube.Kubectl) *clusterInfo {
cluster: &appv1.Cluster{},
syncTime: nil,
syncLock: &sync.Mutex{},
apis: make(map[schema.GroupKind]*gkInfo),
apisMeta: make(map[schema.GroupKind]*apiMeta),
log: log.WithField("cluster", "test"),
settings: &settings.ArgoCDSettings{},
}
@@ -135,8 +137,8 @@ func TestGetChildren(t *testing.T) {
Kind: "ReplicaSet",
Namespace: "default",
Name: "helm-guestbook-rs",
Group: "extensions",
Version: "v1beta1",
Group: "apps",
Version: "v1",
ResourceVersion: "123",
Children: rsChildren,
Info: []appv1.InfoItem{},
@@ -149,7 +151,7 @@ func TestGetManagedLiveObjs(t *testing.T) {
assert.Nil(t, err)
targetDeploy := strToUnstructured(`
apiVersion: extensions/v1beta1
apiVersion: apps/v1
kind: Deployment
metadata:
name: helm-guestbook
@@ -279,7 +281,7 @@ func TestUpdateAppResource(t *testing.T) {
err = cluster.processEvent(watch.Modified, mustToUnstructured(testPod))
assert.Nil(t, err)
assert.Equal(t, []string{"helm-guestbook"}, updatesReceived)
assert.Contains(t, updatesReceived, "helm-guestbook")
}
func TestCircularReference(t *testing.T) {
@@ -316,7 +318,7 @@ func TestWatchCacheUpdated(t *testing.T) {
podGroupKind := testPod.GroupVersionKind().GroupKind()
cluster.updateCache(podGroupKind, "updated-list-version", []unstructured.Unstructured{*updated, *added})
cluster.replaceResourceCache(podGroupKind, "updated-list-version", []unstructured.Unstructured{*updated, *added})
_, ok := cluster.nodes[kube.GetResourceKey(removed)]
assert.False(t, ok)
@@ -327,6 +329,4 @@ func TestWatchCacheUpdated(t *testing.T) {
_, ok = cluster.nodes[kube.GetResourceKey(added)]
assert.True(t, ok)
assert.Equal(t, cluster.getResourceVersion(podGroupKind), "updated-list-version")
}

View File

@@ -3,7 +3,7 @@ package cache
import (
"fmt"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
k8snode "k8s.io/kubernetes/pkg/util/node"

View File

@@ -2,14 +2,11 @@
package mocks
import (
"context"
)
import "github.com/argoproj/argo-cd/util/kube"
import "github.com/stretchr/testify/mock"
import "k8s.io/apimachinery/pkg/runtime/schema"
import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
import "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
import context "context"
import kube "github.com/argoproj/argo-cd/util/kube"
import mock "github.com/stretchr/testify/mock"
import unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
import v1alpha1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
// LiveStateCache is an autogenerated mock type for the LiveStateCache type
type LiveStateCache struct {
@@ -81,20 +78,20 @@ func (_m *LiveStateCache) Invalidate() {
_m.Called()
}
// IsNamespaced provides a mock function with given fields: server, gvk
func (_m *LiveStateCache) IsNamespaced(server string, gvk schema.GroupVersionKind) (bool, error) {
ret := _m.Called(server, gvk)
// IsNamespaced provides a mock function with given fields: server, obj
func (_m *LiveStateCache) IsNamespaced(server string, obj *unstructured.Unstructured) (bool, error) {
ret := _m.Called(server, obj)
var r0 bool
if rf, ok := ret.Get(0).(func(string, schema.GroupVersionKind) bool); ok {
r0 = rf(server, gvk)
if rf, ok := ret.Get(0).(func(string, *unstructured.Unstructured) bool); ok {
r0 = rf(server, obj)
} else {
r0 = ret.Get(0).(bool)
}
var r1 error
if rf, ok := ret.Get(1).(func(string, schema.GroupVersionKind) error); ok {
r1 = rf(server, gvk)
if rf, ok := ret.Get(1).(func(string, *unstructured.Unstructured) error); ok {
r1 = rf(server, obj)
} else {
r1 = ret.Error(1)
}

View File

@@ -6,7 +6,7 @@ import (
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/util/kube"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -26,8 +26,8 @@ func (n *node) resourceKey() kube.ResourceKey {
}
func (n *node) isParentOf(child *node) bool {
ownerGvk := n.ref.GroupVersionKind()
for _, ownerRef := range child.ownerRefs {
ownerGvk := schema.FromAPIVersionAndKind(ownerRef.APIVersion, ownerRef.Kind)
if kube.NewResourceKey(ownerGvk.Group, ownerRef.Kind, n.ref.Namespace, ownerRef.Name) == n.resourceKey() {
return true
}

25
controller/cache/node_test.go vendored Normal file
View File

@@ -0,0 +1,25 @@
package cache
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestIsParentOf(t *testing.T) {
child := createObjInfo(testPod, "")
parent := createObjInfo(testRS, "")
grandParent := createObjInfo(testDeploy, "")
assert.True(t, parent.isParentOf(child))
assert.False(t, grandParent.isParentOf(child))
}
func TestIsParentOfSameKindDifferentGroup(t *testing.T) {
rs := testRS.DeepCopy()
rs.SetAPIVersion("somecrd.io/v1")
child := createObjInfo(testPod, "")
invalidParent := createObjInfo(rs, "")
assert.False(t, invalidParent.isParentOf(child))
}

View File

@@ -220,7 +220,7 @@ func TestReconcileMetrics(t *testing.T) {
metricsServ := NewMetricsServer("localhost:8082", appLister)
fakeApp := newFakeApp(fakeApp)
metricsServ.IncReconcile(fakeApp, time.Duration(5*time.Second))
metricsServ.IncReconcile(fakeApp, 5*time.Second)
req, err := http.NewRequest("GET", "/metrics", nil)
assert.NoError(t, err)

View File

@@ -175,7 +175,7 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, revision st
for i, obj := range targetObjs {
gvk := obj.GroupVersionKind()
ns := util.FirstNonEmpty(obj.GetNamespace(), app.Spec.Destination.Namespace)
if namespaced, err := m.liveStateCache.IsNamespaced(app.Spec.Destination.Server, obj.GroupVersionKind()); err == nil && !namespaced {
if namespaced, err := m.liveStateCache.IsNamespaced(app.Spec.Destination.Server, obj); err == nil && !namespaced {
ns = ""
}
key := kubeutil.NewResourceKey(gvk.Group, gvk.Kind, ns, obj.GetName())
@@ -278,7 +278,9 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, revision st
conditions: conditions,
hooks: hooks,
diffNormalizer: diffNormalizer,
appSourceType: v1alpha1.ApplicationSourceType(manifestInfo.SourceType),
}
if manifestInfo != nil {
compRes.appSourceType = v1alpha1.ApplicationSourceType(manifestInfo.SourceType)
}
return &compRes, nil
}

View File

@@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"

View File

@@ -1,31 +0,0 @@
{
"Vendor": true,
"DisableAll": true,
"Deadline": "8m",
"Enable": [
"vet",
"gofmt",
"goimports",
"deadcode",
"errcheck",
"varcheck",
"structcheck",
"ineffassign",
"unconvert",
"misspell"
],
"Linters": {
"goimports": {"Command": "goimports -l --local github.com/argoproj/argo-cd"}
},
"Skip": [
"pkg/client",
"vendor/",
".pb.go"
],
"Exclude": [
"pkg/client",
"vendor/",
".pb.go",
".*warning.*fmt.Fprint"
]
}

7
hack/git-ask-pass.sh Executable file
View File

@@ -0,0 +1,7 @@
#!/bin/sh
# This script is used as the commaned supplied to GIT_ASKPASS as a way to supply username/password
# credentials to git, without having to use git credentials helpers, or having on-disk config.
case "$1" in
Username*) echo "${GIT_USERNAME}" ;;
Password*) echo "${GIT_PASSWORD}" ;;
esac

View File

@@ -12,7 +12,7 @@ bases:
images:
- name: argoproj/argocd
newName: argoproj/argocd
newTag: latest
newTag: v0.12.0-rc4
- name: argoproj/argocd-ui
newName: argoproj/argocd-ui
newTag: latest
newTag: v0.12.0-rc4

View File

@@ -17,7 +17,7 @@ patchesStrategicMerge:
images:
- name: argoproj/argocd
newName: argoproj/argocd
newTag: latest
newTag: v0.12.0-rc4
- name: argoproj/argocd-ui
newName: argoproj/argocd-ui
newTag: latest
newTag: v0.12.0-rc4

View File

@@ -646,7 +646,7 @@ spec:
- argocd-redis-ha-announce-2:26379
- --sentinelmaster
- argocd
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-application-controller
ports:
@@ -693,7 +693,7 @@ spec:
- cp
- /usr/local/bin/argocd-util
- /shared
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: copyutil
volumeMounts:
@@ -748,7 +748,7 @@ spec:
- argocd-redis-ha-announce-2:26379
- --sentinelmaster
- argocd
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-repo-server
ports:
@@ -804,7 +804,7 @@ spec:
- argocd-redis-ha-announce-2:26379
- --sentinelmaster
- argocd
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-server
ports:
@@ -825,7 +825,7 @@ spec:
- -r
- /app
- /shared
image: argoproj/argocd-ui:latest
image: argoproj/argocd-ui:v0.12.0-rc4
imagePullPolicy: Always
name: ui
volumeMounts:

View File

@@ -562,7 +562,7 @@ spec:
- argocd-redis-ha-announce-2:26379
- --sentinelmaster
- argocd
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-application-controller
ports:
@@ -609,7 +609,7 @@ spec:
- cp
- /usr/local/bin/argocd-util
- /shared
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: copyutil
volumeMounts:
@@ -664,7 +664,7 @@ spec:
- argocd-redis-ha-announce-2:26379
- --sentinelmaster
- argocd
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-repo-server
ports:
@@ -720,7 +720,7 @@ spec:
- argocd-redis-ha-announce-2:26379
- --sentinelmaster
- argocd
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-server
ports:
@@ -741,7 +741,7 @@ spec:
- -r
- /app
- /shared
image: argoproj/argocd-ui:latest
image: argoproj/argocd-ui:v0.12.0-rc4
imagePullPolicy: Always
name: ui
volumeMounts:

View File

@@ -451,7 +451,7 @@ spec:
- "20"
- --operation-processors
- "10"
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-application-controller
ports:
@@ -498,7 +498,7 @@ spec:
- cp
- /usr/local/bin/argocd-util
- /shared
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: copyutil
volumeMounts:
@@ -561,7 +561,7 @@ spec:
- argocd-repo-server
- --redis
- argocd-redis:6379
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-repo-server
ports:
@@ -594,7 +594,7 @@ spec:
- argocd-server
- --staticassets
- /shared/app
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-server
ports:
@@ -615,7 +615,7 @@ spec:
- -r
- /app
- /shared
image: argoproj/argocd-ui:latest
image: argoproj/argocd-ui:v0.12.0-rc4
imagePullPolicy: Always
name: ui
volumeMounts:

View File

@@ -367,7 +367,7 @@ spec:
- "20"
- --operation-processors
- "10"
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-application-controller
ports:
@@ -414,7 +414,7 @@ spec:
- cp
- /usr/local/bin/argocd-util
- /shared
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: copyutil
volumeMounts:
@@ -477,7 +477,7 @@ spec:
- argocd-repo-server
- --redis
- argocd-redis:6379
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-repo-server
ports:
@@ -510,7 +510,7 @@ spec:
- argocd-server
- --staticassets
- /shared/app
image: argoproj/argocd:latest
image: argoproj/argocd:v0.12.0-rc4
imagePullPolicy: Always
name: argocd-server
ports:
@@ -531,7 +531,7 @@ spec:
- -r
- /app
- /shared
image: argoproj/argocd-ui:latest
image: argoproj/argocd-ui:v0.12.0-rc4
imagePullPolicy: Always
name: ui
volumeMounts:

View File

@@ -25,7 +25,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"github.com/argoproj/argo-cd"
argocd "github.com/argoproj/argo-cd"
"github.com/argoproj/argo-cd/common"
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
argoappv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"

View File

@@ -251,7 +251,7 @@ func TestGetAppDetailsKustomize(t *testing.T) {
Path: "kustomization_yaml",
})
assert.NoError(t, err)
assert.Equal(t, "nginx", res.Kustomize.ImageTags[0].Name)
assert.Equal(t, "1.15.4", res.Kustomize.ImageTags[0].Value)
assert.Equal(t, "k8s.gcr.io/nginx-slim", res.Kustomize.ImageTags[0].Name)
assert.Equal(t, "0.8", res.Kustomize.ImageTags[0].Value)
assert.Equal(t, 2, len(res.Kustomize.ImageTags))
}

View File

@@ -492,18 +492,13 @@ func (s *Server) validateAndNormalizeApp(ctx context.Context, app *appv1.Applica
}
}
conditions, err := argo.GetSpecErrors(ctx, &app.Spec, proj, s.repoClientset, s.db)
conditions, appSourceType, err := argo.GetSpecErrors(ctx, &app.Spec, proj, s.repoClientset, s.db)
if err != nil {
return err
}
if len(conditions) > 0 {
return status.Errorf(codes.InvalidArgument, "application spec is invalid: %s", argo.FormatAppConditions(conditions))
}
appSourceType, err := argo.QueryAppSourceType(ctx, app, s.repoClientset, s.db)
if err != nil {
return err
}
app.Spec = *argo.NormalizeApplicationSpec(&app.Spec, appSourceType)
return nil
}

View File

@@ -140,20 +140,20 @@ func GetSpecErrors(
proj *argoappv1.AppProject,
repoClientset reposerver.Clientset,
db db.ArgoDB,
) ([]argoappv1.ApplicationCondition, error) {
) ([]argoappv1.ApplicationCondition, argoappv1.ApplicationSourceType, error) {
conditions := make([]argoappv1.ApplicationCondition, 0)
if spec.Source.RepoURL == "" || spec.Source.Path == "" {
conditions = append(conditions, argoappv1.ApplicationCondition{
Type: argoappv1.ApplicationConditionInvalidSpecError,
Message: "spec.source.repoURL and spec.source.path are required",
})
return conditions, nil
return conditions, "", nil
}
// Test the repo
conn, repoClient, err := repoClientset.NewRepoServerClient()
if err != nil {
return nil, err
return nil, "", err
}
defer util.Close(conn)
repoAccessable := false
@@ -174,12 +174,13 @@ func GetSpecErrors(
repoAccessable = true
}
} else {
return nil, err
return nil, "", err
}
} else {
repoAccessable = true
}
var appSourceType argoappv1.ApplicationSourceType
// Verify only one source type is defined
explicitSourceType, err := spec.Source.ExplicitType()
if err != nil {
@@ -190,7 +191,6 @@ func GetSpecErrors(
}
if repoAccessable {
var appSourceType argoappv1.ApplicationSourceType
if explicitSourceType != nil {
appSourceType = *explicitSourceType
} else {
@@ -249,11 +249,11 @@ func GetSpecErrors(
Message: fmt.Sprintf("cluster '%s' has not been configured", spec.Destination.Server),
})
} else {
return nil, err
return nil, "", err
}
}
}
return conditions, nil
return conditions, appSourceType, nil
}
// GetAppProject returns a project from an application
@@ -261,27 +261,6 @@ func GetAppProject(spec *argoappv1.ApplicationSpec, projLister applicationsv1.Ap
return projLister.AppProjects(ns).Get(spec.GetProject())
}
// QueryAppSourceType queries repo server for yaml files in a directory, and determines its
// application source type based on the files in the directory.
// This code is redundant to the logic in argo.GetSpecErrors, but since it's is hard to
// extract out of there. We will be throwing away this code when we remove
// componentParameterOverrides.
func QueryAppSourceType(ctx context.Context, app *argoappv1.Application, repoClientset reposerver.Clientset, db db.ArgoDB) (argoappv1.ApplicationSourceType, error) {
if t, _ := app.Spec.Source.ExplicitType(); t != nil {
return *t, nil
}
repoRes, err := db.GetRepository(ctx, app.Spec.Source.RepoURL)
if err != nil {
return "", err
}
conn, repoClient, err := repoClientset.NewRepoServerClient()
if err != nil {
return "", err
}
defer util.Close(conn)
return queryAppSourceType(ctx, &app.Spec, repoRes, repoClient)
}
func queryAppSourceType(ctx context.Context, spec *argoappv1.ApplicationSpec, repoRes *argoappv1.Repository, repoClient repository.RepoServerServiceClient) (argoappv1.ApplicationSourceType, error) {
req := repository.ListDirRequest{
Repo: &argoappv1.Repository{

View File

@@ -2,7 +2,7 @@ package argo
import (
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"

View File

@@ -28,7 +28,7 @@ func TestCache(t *testing.T) {
cacheObj.Foo = "baz"
err = c.Get("key", &obj)
assert.Nil(t, err)
assert.EqualValues(t, string(obj.Foo), "foo")
assert.EqualValues(t, obj.Foo, "foo")
assert.EqualValues(t, string(obj.Bar), "bar")
err = c.Delete("key")

View File

@@ -21,7 +21,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubernetes/pkg/kubectl/util/term"
"github.com/argoproj/argo-cd"
argocd "github.com/argoproj/argo-cd"
"github.com/argoproj/argo-cd/errors"
)

View File

@@ -3,7 +3,7 @@ package db
import (
"context"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"

View File

@@ -103,6 +103,18 @@ func (m *nativeGitClient) Init() error {
// Fetch fetches latest updates from origin
func (m *nativeGitClient) Fetch() error {
log.Debugf("Fetching repo %s at %s", m.repoURL, m.root)
// Two techniques are used for fetching the remote depending if the remote is SSH vs. HTTPS
// If http, we fork/exec the git CLI since the go-git client does not properly support git
// providers such as AWS CodeCommit and Azure DevOps.
if _, ok := m.auth.(*ssh2.PublicKeys); ok {
return m.goGitFetch()
}
_, err := m.runCredentialedCmd("git", "fetch", "origin", "--tags", "--force")
return err
}
// goGitFetch fetches the remote using go-git
func (m *nativeGitClient) goGitFetch() error {
repo, err := git.PlainOpen(m.root)
if err != nil {
return err
@@ -119,17 +131,6 @@ func (m *nativeGitClient) Fetch() error {
return nil
}
return err
// git fetch does not update the HEAD reference. The following command will update the local
// knowledge of what remote considers the “default branch”
// See: https://stackoverflow.com/questions/8839958/how-does-origin-head-get-set
// NOTE(jessesuen): disabling the following code because:
// 1. we no longer perform a `git checkout HEAD`, instead relying on `ls-remote` and checking
// out a specific SHA1.
// 2. This command is the only other command that we use (excluding fetch/ls-remote) which
// requires remote access, and there appears to be no go-git equivalent to this command.
// _, err = m.runCmd("git", "remote", "set-head", "origin", "-a")
// return err
}
// LsFiles lists the local working tree, including only files that are under source control
@@ -241,13 +242,27 @@ func (m *nativeGitClient) CommitSHA() (string, error) {
// runCmd is a convenience function to run a command in a given directory and return its output
func (m *nativeGitClient) runCmd(command string, args ...string) (string, error) {
cmd := exec.Command(command, args...)
return m.runCmdOutput(cmd)
}
// runCredentialedCmd is a convenience function to run a git command with username/password credentials
func (m *nativeGitClient) runCredentialedCmd(command string, args ...string) (string, error) {
cmd := exec.Command(command, args...)
if auth, ok := m.auth.(*http.BasicAuth); ok {
cmd.Env = append(cmd.Env, fmt.Sprintf("GIT_ASKPASS=git-ask-pass.sh"))
cmd.Env = append(cmd.Env, fmt.Sprintf("GIT_USERNAME=%s", auth.Username))
cmd.Env = append(cmd.Env, fmt.Sprintf("GIT_PASSWORD=%s", auth.Password))
}
return m.runCmdOutput(cmd)
}
func (m *nativeGitClient) runCmdOutput(cmd *exec.Cmd) (string, error) {
log.Debug(strings.Join(cmd.Args, " "))
cmd.Dir = m.root
env := os.Environ()
env = append(env, "HOME=/dev/null")
env = append(env, "GIT_CONFIG_NOSYSTEM=true")
env = append(env, "GIT_ASKPASS=")
cmd.Env = env
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, "HOME=/dev/null")
cmd.Env = append(cmd.Env, "GIT_CONFIG_NOSYSTEM=true")
cmd.Env = append(cmd.Env, "GIT_CONFIG_NOGLOBAL=true")
out, err := cmd.Output()
if len(out) > 0 {
log.Debug(string(out))

View File

@@ -1,8 +1,10 @@
package git
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
@@ -127,8 +129,7 @@ func TestLsRemote(t *testing.T) {
func TestGitClient(t *testing.T) {
testRepos := []string{
"https://github.com/argoproj/argocd-example-apps",
// TODO: add this back when azure repos are supported
//"https://jsuen0437@dev.azure.com/jsuen0437/jsuen/_git/jsuen",
"https://jsuen0437@dev.azure.com/jsuen0437/jsuen/_git/jsuen",
}
for _, repo := range testRepos {
dirName, err := ioutil.TempDir("", "git-client-test-")
@@ -142,22 +143,21 @@ func TestGitClient(t *testing.T) {
}
}
// TestPrivateGitRepo tests the ability to operate on a private git repo. This test needs to be run
// manually since we do not have a private git repo for testing
//
// export TEST_REPO=https://github.com/jessesuen/private-argocd-example-apps
// export GITHUB_TOKEN=<YOURGITHUBTOKEN>
// go test -v -run ^(TestPrivateGitRepo)$ ./util/git/...
// TestPrivateGitRepo tests the ability to operate on a private git repo.
func TestPrivateGitRepo(t *testing.T) {
repo := os.Getenv("TEST_REPO")
username := os.Getenv("TEST_USERNAME")
password := os.Getenv("GITHUB_TOKEN")
if username == "" {
username = "git" // username does not matter for tokens
}
if repo == "" || password == "" {
t.Skip("skipping private git repo test since no repo or password supplied")
}
repo := "https://gitlab.com/argo-cd-test/argocd-example-apps.git"
username := "blah"
// This is a personal access token generated with read only access in a throwaway gitlab test
// account/repo
password := "B5sBDeoqAVUouoHkrovy"
// add the hack path which has the git-ask-pass.sh shell script
osPath := os.Getenv("PATH")
hackPath, err := filepath.Abs("../../hack")
assert.NoError(t, err)
err = os.Setenv("PATH", fmt.Sprintf("%s:%s", osPath, hackPath))
assert.NoError(t, err)
defer func() { _ = os.Setenv("PATH", osPath) }()
dirName, err := ioutil.TempDir("", "git-client-test-")
assert.NoError(t, err)

View File

@@ -5,14 +5,13 @@ import (
"encoding/json"
"fmt"
"github.com/gogo/protobuf/proto"
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
ctx_logrus "github.com/grpc-ecosystem/go-grpc-middleware/tags/logrus"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/gogo/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-middleware/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/grpc-ecosystem/go-grpc-middleware/tags/logrus"
"github.com/sirupsen/logrus"
)
func logRequest(entry *logrus.Entry, info string, pbMsg interface{}, ctx context.Context, logClaims bool) {

View File

@@ -3,8 +3,8 @@ package health
import (
"fmt"
"k8s.io/api/apps/v1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
coreV1 "k8s.io/api/core/v1"
extv1beta1 "k8s.io/api/extensions/v1beta1"

View File

@@ -2,24 +2,18 @@ package kube
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os/exec"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
@@ -29,170 +23,91 @@ import (
"github.com/argoproj/argo-cd/util/diff"
)
type ResourcesBatch struct {
GVK schema.GroupVersionKind
ResourceInfo metav1.APIResource
ListResourceVersion string
Objects []unstructured.Unstructured
Error error
}
type CacheRefreshEvent struct {
GVK schema.GroupVersionKind
ResourceVersion string
Objects []unstructured.Unstructured
}
type WatchEvent struct {
WatchEvent *watch.Event
CacheRefresh *CacheRefreshEvent
}
type CachedVersionSource func(gk schema.GroupKind) (string, error)
type Kubectl interface {
ApplyResource(config *rest.Config, obj *unstructured.Unstructured, namespace string, dryRun, force bool) (string, error)
ConvertToVersion(obj *unstructured.Unstructured, group, version string) (*unstructured.Unstructured, error)
DeleteResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string, forceDelete bool) error
GetResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string) (*unstructured.Unstructured, error)
PatchResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string, patchType types.PatchType, patchBytes []byte) (*unstructured.Unstructured, error)
WatchResources(ctx context.Context, config *rest.Config, resourceFilter ResourceFilter, versionSource CachedVersionSource) (chan WatchEvent, error)
GetResources(config *rest.Config, resourceFilter ResourceFilter, namespace string) (chan ResourcesBatch, error)
GetAPIResources(config *rest.Config) ([]*metav1.APIResourceList, error)
GetAPIResources(config *rest.Config, resourceFilter ResourceFilter) ([]APIResourceInfo, error)
}
type KubectlCmd struct{}
func (k KubectlCmd) GetAPIResources(config *rest.Config) ([]*metav1.APIResourceList, error) {
type APIResourceInfo struct {
GroupKind schema.GroupKind
Meta metav1.APIResource
Interface dynamic.ResourceInterface
}
type filterFunc func(apiResource *metav1.APIResource) bool
func filterAPIResources(config *rest.Config, resourceFilter ResourceFilter, filter filterFunc, namespace string) ([]APIResourceInfo, error) {
dynamicIf, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
disco, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}
return disco.ServerResources()
}
// GetResources returns all kubernetes resources
func (k KubectlCmd) GetResources(config *rest.Config, resourceFilter ResourceFilter, namespace string) (chan ResourcesBatch, error) {
res := make(chan ResourcesBatch)
listSupported := func(apiResource *metav1.APIResource) bool {
return isSupportedVerb(apiResource, listVerb)
}
apiResIfs, err := filterAPIResources(config, resourceFilter, listSupported, namespace)
serverResources, err := disco.ServerPreferredResources()
if err != nil {
return nil, err
if len(serverResources) == 0 {
return nil, err
}
log.Warnf("Partial success when performing preferred resource discovery: %v", err)
}
go func() {
defer close(res)
var wg sync.WaitGroup
wg.Add(len(apiResIfs))
for _, apiResIf := range apiResIfs {
go func(apiResIf apiResourceInterface) {
defer wg.Done()
batch := ResourcesBatch{
GVK: apiResIf.groupVersion.WithKind(apiResIf.apiResource.Kind),
ResourceInfo: apiResIf.apiResource,
}
list, err := apiResIf.resourceIf.List(metav1.ListOptions{})
apiResIfs := make([]APIResourceInfo, 0)
for _, apiResourcesList := range serverResources {
gv, err := schema.ParseGroupVersion(apiResourcesList.GroupVersion)
if err != nil {
gv = schema.GroupVersion{}
}
for _, apiResource := range apiResourcesList.APIResources {
if resourceFilter.IsExcludedResource(gv.Group, apiResource.Kind, config.Host) {
continue
}
if _, ok := isObsoleteExtensionsGroupKind(gv.Group, apiResource.Kind); ok {
continue
}
if filter(&apiResource) {
resource := ToGroupVersionResource(apiResourcesList.GroupVersion, &apiResource)
resourceIf := ToResourceInterface(dynamicIf, &apiResource, resource, namespace)
gv, err := schema.ParseGroupVersion(apiResourcesList.GroupVersion)
if err != nil {
if !apierr.IsNotFound(err) {
batch.Error = err
res <- batch
}
return
return nil, err
}
batch.ListResourceVersion = list.GetResourceVersion()
batch.Objects = list.Items
res <- batch
}(apiResIf)
apiResIf := APIResourceInfo{
GroupKind: schema.GroupKind{Group: gv.Group, Kind: apiResource.Kind},
Meta: apiResource,
Interface: resourceIf,
}
apiResIfs = append(apiResIfs, apiResIf)
}
}
wg.Wait()
}()
return res, nil
}
return apiResIfs, nil
}
const watchResourcesRetryTimeout = 1 * time.Second
// WatchResources watches all the existing resources in the cluster provided by the config. Method retries watch with the most recent resource version stored in cache.
// The WatchResources returns channel which container either watch event with updated resource info or new list of resources if cached resource version had expired.
func (k KubectlCmd) WatchResources(
ctx context.Context,
config *rest.Config,
resourceFilter ResourceFilter,
versionSource CachedVersionSource,
) (chan WatchEvent, error) {
watchSupported := func(apiResource *metav1.APIResource) bool {
return isSupportedVerb(apiResource, watchVerb)
// isSupportedVerb returns whether or not a APIResource supports a specific verb
func isSupportedVerb(apiResource *metav1.APIResource, verb string) bool {
for _, v := range apiResource.Verbs {
if v == verb {
return true
}
}
log.Infof("Start watching for resources changes with in cluster %s", config.Host)
apiResIfs, err := filterAPIResources(config, resourceFilter, watchSupported, "")
return false
}
func (k KubectlCmd) GetAPIResources(config *rest.Config, resourceFilter ResourceFilter) ([]APIResourceInfo, error) {
apiResIfs, err := filterAPIResources(config, resourceFilter, func(apiResource *metav1.APIResource) bool {
return isSupportedVerb(apiResource, listVerb) && isSupportedVerb(apiResource, watchVerb)
}, "")
if err != nil {
return nil, err
}
ch := make(chan WatchEvent)
go func() {
var wg sync.WaitGroup
wg.Add(len(apiResIfs))
for _, a := range apiResIfs {
go func(apiResIf apiResourceInterface) {
defer wg.Done()
util.RetryUntilSucceed(func() (err error) {
defer func() {
if r := recover(); r != nil {
message := fmt.Sprintf("Recovered from panic: %+v\n%s", r, debug.Stack())
log.Error(message)
err = errors.New(message)
}
}()
watchCh := WatchWithRetry(ctx, func() (i watch.Interface, e error) {
gvk := apiResIf.groupVersion.WithKind(apiResIf.apiResource.Kind)
resourceVersion, err := versionSource(gvk.GroupKind())
if err != nil {
return nil, err
}
w, err := apiResIf.resourceIf.Watch(metav1.ListOptions{
ResourceVersion: resourceVersion,
})
if apierr.IsGone(err) {
log.Infof("Resource version of %s has expired at cluster %s, reloading list", gvk, config.Host)
list, err := apiResIf.resourceIf.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
ch <- WatchEvent{
CacheRefresh: &CacheRefreshEvent{
GVK: gvk,
ResourceVersion: list.GetResourceVersion(),
Objects: list.Items,
},
}
return apiResIf.resourceIf.Watch(metav1.ListOptions{ResourceVersion: list.GetResourceVersion()})
}
return w, err
})
for next := range watchCh {
if next.Error != nil {
return next.Error
}
ch <- WatchEvent{
WatchEvent: &watch.Event{
Object: next.Object,
Type: next.Type,
},
}
}
return nil
}, fmt.Sprintf("watch resources %s %s/%s", config.Host, apiResIf.groupVersion, apiResIf.apiResource.Kind), ctx, watchResourcesRetryTimeout)
}(a)
}
wg.Wait()
close(ch)
log.Infof("Stop watching for resources changes with in cluster %s", config.Host)
}()
return ch, nil
return apiResIfs, err
}
// GetResource returns resource

View File

@@ -75,6 +75,10 @@ func (k *ResourceKey) String() string {
return fmt.Sprintf("%s/%s/%s/%s", k.Group, k.Kind, k.Namespace, k.Name)
}
func (k ResourceKey) GroupKind() schema.GroupKind {
return schema.GroupKind{Group: k.Group, Kind: k.Kind}
}
func isObsoleteExtensionsGroupKind(group string, kind string) (string, bool) {
if group == "extensions" {
newGroup, ok := obsoleteExtensionsKinds[kind]
@@ -246,72 +250,6 @@ func IsCRD(obj *unstructured.Unstructured) bool {
return IsCRDGroupVersionKind(obj.GroupVersionKind())
}
type apiResourceInterface struct {
groupVersion schema.GroupVersion
apiResource metav1.APIResource
resourceIf dynamic.ResourceInterface
}
type filterFunc func(apiResource *metav1.APIResource) bool
func filterAPIResources(config *rest.Config, resourceFilter ResourceFilter, filter filterFunc, namespace string) ([]apiResourceInterface, error) {
dynamicIf, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
disco, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}
serverResources, err := disco.ServerPreferredResources()
if err != nil {
if len(serverResources) == 0 {
return nil, err
}
log.Warnf("Partial success when performing preferred resource discovery: %v", err)
}
apiResIfs := make([]apiResourceInterface, 0)
for _, apiResourcesList := range serverResources {
gv, err := schema.ParseGroupVersion(apiResourcesList.GroupVersion)
if err != nil {
gv = schema.GroupVersion{}
}
if resourceFilter.IsExcludedResource(gv.Group, apiResourcesList.Kind, config.Host) {
continue
}
for _, apiResource := range apiResourcesList.APIResources {
if _, ok := isObsoleteExtensionsGroupKind(gv.Group, apiResource.Kind); ok || gv.Group == "" && apiResource.Kind == "Event" {
continue
}
if filter(&apiResource) {
resource := ToGroupVersionResource(apiResourcesList.GroupVersion, &apiResource)
resourceIf := ToResourceInterface(dynamicIf, &apiResource, resource, namespace)
gv, err := schema.ParseGroupVersion(apiResourcesList.GroupVersion)
if err != nil {
return nil, err
}
apiResIf := apiResourceInterface{
groupVersion: gv,
apiResource: apiResource,
resourceIf: resourceIf,
}
apiResIfs = append(apiResIfs, apiResIf)
}
}
}
return apiResIfs, nil
}
// isSupportedVerb returns whether or not a APIResource supports a specific verb
func isSupportedVerb(apiResource *metav1.APIResource, verb string) bool {
for _, v := range apiResource.Verbs {
if v == verb {
return true
}
}
return false
}
// See: https://github.com/ksonnet/ksonnet/blob/master/utils/client.go
func ServerResourceForGroupVersionKind(disco discovery.DiscoveryInterface, gvk schema.GroupVersionKind) (*metav1.APIResource, error) {
resources, err := disco.ServerResourcesForGroupVersion(gvk.GroupVersion().String())

View File

@@ -1,12 +1,10 @@
package kubetest
import (
"context"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"github.com/argoproj/argo-cd/util/kube"
@@ -18,27 +16,15 @@ type KubectlOutput struct {
}
type MockKubectlCmd struct {
APIResources []*v1.APIResourceList
Resources []kube.ResourcesBatch
APIResources []kube.APIResourceInfo
Commands map[string]KubectlOutput
Events chan kube.WatchEvent
Events chan watch.Event
}
func (k MockKubectlCmd) GetAPIResources(config *rest.Config) ([]*v1.APIResourceList, error) {
func (k MockKubectlCmd) GetAPIResources(config *rest.Config, resourceFilter kube.ResourceFilter) ([]kube.APIResourceInfo, error) {
return k.APIResources, nil
}
func (k MockKubectlCmd) GetResources(config *rest.Config, resourceFilter kube.ResourceFilter, namespace string) (chan kube.ResourcesBatch, error) {
res := make(chan kube.ResourcesBatch)
go func() {
defer close(res)
for i := range k.Resources {
res <- k.Resources[i]
}
}()
return res, nil
}
func (k MockKubectlCmd) GetResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string) (*unstructured.Unstructured, error) {
return nil, nil
}
@@ -47,12 +33,6 @@ func (k MockKubectlCmd) PatchResource(config *rest.Config, gvk schema.GroupVersi
return nil, nil
}
func (k MockKubectlCmd) WatchResources(
ctx context.Context, config *rest.Config, resourceFilter kube.ResourceFilter, versionSource kube.CachedVersionSource) (chan kube.WatchEvent, error) {
return k.Events, nil
}
func (k MockKubectlCmd) DeleteResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string, forceDelete bool) error {
command, ok := k.Commands[name]
if !ok {

View File

@@ -6,6 +6,7 @@ import (
"os"
"os/exec"
"path/filepath"
"sort"
"strings"
argoexec "github.com/argoproj/pkg/exec"
@@ -170,6 +171,9 @@ func getImageParameters(objs []*unstructured.Unstructured) []*v1alpha1.Kustomize
Value: version,
})
}
sort.Slice(params, func(i, j int) bool {
return params[i].Name < params[j].Name
})
return params
}

View File

@@ -62,11 +62,6 @@ type ClientApp struct {
cache *cache.Cache
}
type appState struct {
// ReturnURL is the URL in which to redirect a user back to after completing an OAuth2 login
ReturnURL string `json:"returnURL"`
}
// NewClientApp will register the Argo CD client app (either via Dex or external OIDC) and return an
// object which has HTTP handlers for handling the HTTP responses for login and callback
func NewClientApp(settings *settings.ArgoCDSettings, cache *cache.Cache, dexServerAddr string) (*ClientApp, error) {

View File

@@ -130,7 +130,10 @@ func RunAllAsync(count int, action func(i int) error) (err error) {
wg.Add(1)
go func(index int) {
defer wg.Done()
err = action(index)
actionErr := action(index)
if actionErr != nil {
err = actionErr
}
}(i)
if err != nil {
break