mirror of
https://github.com/argoproj/argo-cd.git
synced 2026-02-26 12:38:47 +01:00
Compare commits
12 Commits
appsetdocs
...
v0.12.0-rc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22fe538645 | ||
|
|
24d73e56f1 | ||
|
|
8f93bdc2a5 | ||
|
|
6d982ca397 | ||
|
|
eff67bffad | ||
|
|
2b781eea49 | ||
|
|
2f0dc20235 | ||
|
|
36dc50c121 | ||
|
|
f8f974e871 | ||
|
|
c4b474ae98 | ||
|
|
8d98d6e058 | ||
|
|
233708ecdd |
@@ -44,7 +44,7 @@ spec:
|
||||
repo: "{{workflow.parameters.repo}}"
|
||||
revision: "{{workflow.parameters.revision}}"
|
||||
container:
|
||||
image: argoproj/argo-cd-ci-builder:v0.12.0
|
||||
image: argoproj/argo-cd-ci-builder:v0.13.0
|
||||
imagePullPolicy: Always
|
||||
command: [bash, -c]
|
||||
args: ["{{inputs.parameters.cmd}}"]
|
||||
@@ -73,7 +73,7 @@ spec:
|
||||
repo: "{{workflow.parameters.repo}}"
|
||||
revision: "{{workflow.parameters.revision}}"
|
||||
container:
|
||||
image: argoproj/argo-cd-ci-builder:v0.12.0
|
||||
image: argoproj/argo-cd-ci-builder:v0.13.0
|
||||
imagePullPolicy: Always
|
||||
command: [sh, -c]
|
||||
args: ["until docker ps; do sleep 3; done && {{inputs.parameters.cmd}}"]
|
||||
|
||||
22
.golangci.yml
Normal file
22
.golangci.yml
Normal file
@@ -0,0 +1,22 @@
|
||||
run:
|
||||
deadline: 8m
|
||||
skip-files:
|
||||
- ".*\\.pb\\.go"
|
||||
skip-dirs:
|
||||
- pkg/client
|
||||
- vendor
|
||||
linter-settings:
|
||||
goimports:
|
||||
local-prefixes: github.com/argoproj/argo-cd
|
||||
linters:
|
||||
enable:
|
||||
- vet
|
||||
- gofmt
|
||||
- goimports
|
||||
- deadcode
|
||||
- varcheck
|
||||
- structcheck
|
||||
- ineffassign
|
||||
- unconvert
|
||||
- misspell
|
||||
disable-all: true
|
||||
@@ -40,10 +40,8 @@ go get -u github.com/golang/protobuf/protoc-gen-go
|
||||
go get -u github.com/go-swagger/go-swagger/cmd/swagger
|
||||
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
|
||||
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
|
||||
go get -u gopkg.in/alecthomas/gometalinter.v2
|
||||
go get -u github.com/golangci/golangci-lint/cmd/golangci-lint
|
||||
go get -u github.com/mattn/goreman
|
||||
|
||||
gometalinter.v2 --install
|
||||
```
|
||||
|
||||
## Building
|
||||
|
||||
12
Dockerfile
12
Dockerfile
@@ -79,6 +79,17 @@ ENV AWS_IAM_AUTHENTICATOR_VERSION=0.4.0-alpha.1
|
||||
RUN curl -L -o /usr/local/bin/aws-iam-authenticator https://github.com/kubernetes-sigs/aws-iam-authenticator/releases/download/${AWS_IAM_AUTHENTICATOR_VERSION}/aws-iam-authenticator_${AWS_IAM_AUTHENTICATOR_VERSION}_linux_amd64 && \
|
||||
chmod +x /usr/local/bin/aws-iam-authenticator
|
||||
|
||||
# Install golangci-lint
|
||||
RUN wget https://install.goreleaser.com/github.com/golangci/golangci-lint.sh && \
|
||||
chmod +x ./golangci-lint.sh && \
|
||||
./golangci-lint.sh -b $GOPATH/bin && \
|
||||
golangci-lint linters
|
||||
|
||||
COPY .golangci.yml ${GOPATH}/src/dummy/.golangci.yml
|
||||
|
||||
RUN cd ${GOPATH}/src/dummy && \
|
||||
touch dummy.go \
|
||||
golangci-lint run
|
||||
|
||||
####################################################################################################
|
||||
# Argo CD Base - used as the base for both the release and dev argocd images
|
||||
@@ -94,6 +105,7 @@ RUN groupadd -g 999 argocd && \
|
||||
apt-get clean && \
|
||||
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
|
||||
|
||||
COPY hack/git-ask-pass.sh /usr/local/bin/git-ask-pass.sh
|
||||
COPY --from=builder /usr/local/bin/ks /usr/local/bin/ks
|
||||
COPY --from=builder /usr/local/bin/helm /usr/local/bin/helm
|
||||
COPY --from=builder /usr/local/bin/kubectl /usr/local/bin/kubectl
|
||||
|
||||
33
Gopkg.lock
generated
33
Gopkg.lock
generated
@@ -48,25 +48,6 @@
|
||||
pruneopts = ""
|
||||
revision = "09c41003ee1d5015b75f331e52215512e7145b8d"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:a74730e052a45a3fab1d310fdef2ec17ae3d6af16228421e238320846f2aaec8"
|
||||
name = "github.com/alecthomas/template"
|
||||
packages = [
|
||||
".",
|
||||
"parse",
|
||||
]
|
||||
pruneopts = ""
|
||||
revision = "a0175ee3bccc567396460bf5acd36800cb10c49c"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:8483994d21404c8a1d489f6be756e25bfccd3b45d65821f25695577791a08e68"
|
||||
name = "github.com/alecthomas/units"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
revision = "2efee857e7cfd4f3d0138cc3cbb1b4966962b93a"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:0caf9208419fa5db5a0ca7112affaa9550c54291dda8e2abac0c0e76181c959e"
|
||||
@@ -760,7 +741,6 @@
|
||||
packages = [
|
||||
"expfmt",
|
||||
"internal/bitbucket.org/ww/goautoneg",
|
||||
"log",
|
||||
"model",
|
||||
]
|
||||
pruneopts = ""
|
||||
@@ -992,8 +972,6 @@
|
||||
packages = [
|
||||
"unix",
|
||||
"windows",
|
||||
"windows/registry",
|
||||
"windows/svc/eventlog",
|
||||
]
|
||||
pruneopts = ""
|
||||
revision = "d0be0721c37eeb5299f245a996a483160fc36940"
|
||||
@@ -1115,14 +1093,6 @@
|
||||
revision = "8dea3dc473e90c8179e519d91302d0597c0ca1d1"
|
||||
version = "v1.15.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:15d017551627c8bb091bde628215b2861bed128855343fdd570c62d08871f6e1"
|
||||
name = "gopkg.in/alecthomas/kingpin.v2"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
revision = "947dcec5ba9c011838740e680966fd7087a71d0d"
|
||||
version = "v2.2.6"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:bf7444e1e6a36e633f4f1624a67b9e4734cfb879c27ac0a2082ac16aff8462ac"
|
||||
name = "gopkg.in/go-playground/webhooks.v3"
|
||||
@@ -1378,6 +1348,7 @@
|
||||
"discovery",
|
||||
"discovery/fake",
|
||||
"dynamic",
|
||||
"dynamic/fake",
|
||||
"informers/core/v1",
|
||||
"informers/internalinterfaces",
|
||||
"kubernetes",
|
||||
@@ -1610,7 +1581,6 @@
|
||||
"github.com/pkg/errors",
|
||||
"github.com/prometheus/client_golang/prometheus",
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp",
|
||||
"github.com/prometheus/common/log",
|
||||
"github.com/sirupsen/logrus",
|
||||
"github.com/skratchdot/open-golang/open",
|
||||
"github.com/soheilhy/cmux",
|
||||
@@ -1673,6 +1643,7 @@
|
||||
"k8s.io/client-go/discovery",
|
||||
"k8s.io/client-go/discovery/fake",
|
||||
"k8s.io/client-go/dynamic",
|
||||
"k8s.io/client-go/dynamic/fake",
|
||||
"k8s.io/client-go/informers/core/v1",
|
||||
"k8s.io/client-go/kubernetes",
|
||||
"k8s.io/client-go/kubernetes/fake",
|
||||
|
||||
7
Makefile
7
Makefile
@@ -119,10 +119,11 @@ endif
|
||||
.PHONY: builder-image
|
||||
builder-image:
|
||||
docker build -t $(IMAGE_PREFIX)argo-cd-ci-builder:$(IMAGE_TAG) --target builder .
|
||||
docker push $(IMAGE_PREFIX)argo-cd-ci-builder:$(IMAGE_TAG)
|
||||
|
||||
.PHONY: dep-ensure
|
||||
dep-ensure:
|
||||
dep ensure
|
||||
dep ensure -no-vendor
|
||||
|
||||
.PHONY: format-code
|
||||
format-code:
|
||||
@@ -130,7 +131,7 @@ format-code:
|
||||
|
||||
.PHONY: lint
|
||||
lint:
|
||||
gometalinter.v2 --config gometalinter.json ./...
|
||||
golangci-lint run
|
||||
|
||||
.PHONY: test
|
||||
test:
|
||||
@@ -159,4 +160,4 @@ release-precheck: manifests
|
||||
@if [ "$(GIT_TAG)" != "v`cat VERSION`" ]; then echo 'VERSION does not match git tag'; exit 1; fi
|
||||
|
||||
.PHONY: release
|
||||
release: release-precheck precheckin image release-cli
|
||||
release: release-precheck pre-commit image release-cli
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/argoproj/argo-cd"
|
||||
argocd "github.com/argoproj/argo-cd"
|
||||
"github.com/argoproj/argo-cd/errors"
|
||||
"github.com/argoproj/argo-cd/reposerver"
|
||||
"github.com/argoproj/argo-cd/util/cache"
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/argoproj/argo-cd/errors"
|
||||
argocdclient "github.com/argoproj/argo-cd/pkg/apiclient"
|
||||
|
||||
@@ -659,7 +659,7 @@ func (ctrl *ApplicationController) refreshAppConditions(app *appv1.Application)
|
||||
})
|
||||
}
|
||||
} else {
|
||||
specConditions, err := argo.GetSpecErrors(context.Background(), &app.Spec, proj, ctrl.repoClientset, ctrl.db)
|
||||
specConditions, _, err := argo.GetSpecErrors(context.Background(), &app.Spec, proj, ctrl.repoClientset, ctrl.db)
|
||||
if err != nil {
|
||||
conditions = append(conditions, appv1.ApplicationCondition{
|
||||
Type: appv1.ApplicationConditionUnknownError,
|
||||
|
||||
150
controller/cache/cache.go
vendored
150
controller/cache/cache.go
vendored
@@ -2,7 +2,6 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -19,7 +18,7 @@ import (
|
||||
)
|
||||
|
||||
type LiveStateCache interface {
|
||||
IsNamespaced(server string, gvk schema.GroupVersionKind) (bool, error)
|
||||
IsNamespaced(server string, obj *unstructured.Unstructured) (bool, error)
|
||||
// Returns child nodes for a given k8s resource
|
||||
GetChildren(server string, obj *unstructured.Unstructured) ([]appv1.ResourceNode, error)
|
||||
// Returns state of live nodes which correspond for target nodes of specified application.
|
||||
@@ -73,13 +72,6 @@ func (c *liveStateCache) processEvent(event watch.EventType, obj *unstructured.U
|
||||
return info.processEvent(event, obj)
|
||||
}
|
||||
|
||||
func (c *liveStateCache) removeCluster(server string) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
delete(c.clusters, server)
|
||||
log.Infof("Dropped cluster %s cache", server)
|
||||
}
|
||||
|
||||
func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
@@ -90,7 +82,7 @@ func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) {
|
||||
return nil, err
|
||||
}
|
||||
info = &clusterInfo{
|
||||
apis: make(map[schema.GroupKind]*gkInfo),
|
||||
apisMeta: make(map[schema.GroupKind]*apiMeta),
|
||||
lock: &sync.Mutex{},
|
||||
nodes: make(map[kube.ResourceKey]*node),
|
||||
nsIndex: make(map[string]map[kube.ResourceKey]*node),
|
||||
@@ -140,12 +132,12 @@ func (c *liveStateCache) Delete(server string, obj *unstructured.Unstructured) e
|
||||
return clusterInfo.delete(obj)
|
||||
}
|
||||
|
||||
func (c *liveStateCache) IsNamespaced(server string, gvk schema.GroupVersionKind) (bool, error) {
|
||||
func (c *liveStateCache) IsNamespaced(server string, obj *unstructured.Unstructured) (bool, error) {
|
||||
clusterInfo, err := c.getSyncedCluster(server)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return clusterInfo.isNamespaced(gvk.GroupKind()), nil
|
||||
return clusterInfo.isNamespaced(obj), nil
|
||||
}
|
||||
|
||||
func (c *liveStateCache) GetChildren(server string, obj *unstructured.Unstructured) ([]appv1.ResourceNode, error) {
|
||||
@@ -175,135 +167,29 @@ func isClusterHasApps(apps []interface{}, cluster *appv1.Cluster) bool {
|
||||
|
||||
// Run watches for resource changes annotated with application label on all registered clusters and schedule corresponding app refresh.
|
||||
func (c *liveStateCache) Run(ctx context.Context) {
|
||||
watchingClustersLock := sync.Mutex{}
|
||||
watchingClusters := make(map[string]struct {
|
||||
cancel context.CancelFunc
|
||||
cluster *appv1.Cluster
|
||||
})
|
||||
|
||||
util.RetryUntilSucceed(func() error {
|
||||
clusterEventCallback := func(event *db.ClusterEvent) {
|
||||
info, ok := watchingClusters[event.Cluster.Server]
|
||||
hasApps := isClusterHasApps(c.appInformer.GetStore().List(), event.Cluster)
|
||||
|
||||
// cluster resources must be watched only if cluster has at least one app
|
||||
if (event.Type == watch.Deleted || !hasApps) && ok {
|
||||
info.cancel()
|
||||
watchingClustersLock.Lock()
|
||||
delete(watchingClusters, event.Cluster.Server)
|
||||
watchingClustersLock.Unlock()
|
||||
} else if event.Type != watch.Deleted && !ok && hasApps {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
watchingClustersLock.Lock()
|
||||
watchingClusters[event.Cluster.Server] = struct {
|
||||
cancel context.CancelFunc
|
||||
cluster *appv1.Cluster
|
||||
}{
|
||||
cancel: func() {
|
||||
c.removeCluster(event.Cluster.Server)
|
||||
cancel()
|
||||
},
|
||||
cluster: event.Cluster,
|
||||
}
|
||||
watchingClustersLock.Unlock()
|
||||
go c.watchClusterResources(ctx, *event.Cluster)
|
||||
}
|
||||
}
|
||||
|
||||
onAppModified := func(obj interface{}) {
|
||||
if app, ok := obj.(*appv1.Application); ok {
|
||||
var cluster *appv1.Cluster
|
||||
info, infoOk := watchingClusters[app.Spec.Destination.Server]
|
||||
if infoOk {
|
||||
cluster = info.cluster
|
||||
} else {
|
||||
cluster, _ = c.db.GetCluster(ctx, app.Spec.Destination.Server)
|
||||
}
|
||||
if cluster != nil {
|
||||
// trigger cluster event every time when app created/deleted to either start or stop watching resources
|
||||
clusterEventCallback(&db.ClusterEvent{Cluster: cluster, Type: watch.Modified})
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
if cluster, ok := c.clusters[event.Cluster.Server]; ok {
|
||||
if event.Type == watch.Deleted {
|
||||
cluster.invalidate()
|
||||
delete(c.clusters, event.Cluster.Server)
|
||||
} else if event.Type == watch.Modified {
|
||||
cluster.cluster = event.Cluster
|
||||
cluster.invalidate()
|
||||
} else if event.Type == watch.Added && isClusterHasApps(c.appInformer.GetStore().List(), event.Cluster) {
|
||||
go func() {
|
||||
// warm up cache for cluster with apps
|
||||
_ = cluster.ensureSynced()
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.appInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: onAppModified,
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
oldApp, oldOk := oldObj.(*appv1.Application)
|
||||
newApp, newOk := newObj.(*appv1.Application)
|
||||
if oldOk && newOk {
|
||||
if oldApp.Spec.Destination.Server != newApp.Spec.Destination.Server {
|
||||
onAppModified(oldObj)
|
||||
onAppModified(newApp)
|
||||
}
|
||||
}
|
||||
},
|
||||
DeleteFunc: onAppModified,
|
||||
})
|
||||
|
||||
return c.db.WatchClusters(ctx, clusterEventCallback)
|
||||
|
||||
}, "watch clusters", ctx, clusterRetryTimeout)
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
// watchClusterResources watches for resource changes annotated with application label on specified cluster and schedule corresponding app refresh.
|
||||
func (c *liveStateCache) watchClusterResources(ctx context.Context, item appv1.Cluster) {
|
||||
util.RetryUntilSucceed(func() (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("Recovered from panic: %v\n", r)
|
||||
}
|
||||
}()
|
||||
config := item.RESTConfig()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
ch, err := c.kubectl.WatchResources(ctx, config, c.settings, func(gk schema.GroupKind) (s string, e error) {
|
||||
clusterInfo, err := c.getSyncedCluster(item.Server)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return clusterInfo.getResourceVersion(gk), nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for event := range ch {
|
||||
if event.WatchEvent != nil {
|
||||
eventObj := event.WatchEvent.Object.(*unstructured.Unstructured)
|
||||
if kube.IsCRD(eventObj) {
|
||||
// restart if new CRD has been created after watch started
|
||||
if event.WatchEvent.Type == watch.Added {
|
||||
c.removeCluster(item.Server)
|
||||
return fmt.Errorf("Restarting the watch because a new CRD %s was added", eventObj.GetName())
|
||||
} else if event.WatchEvent.Type == watch.Deleted {
|
||||
c.removeCluster(item.Server)
|
||||
return fmt.Errorf("Restarting the watch because CRD %s was deleted", eventObj.GetName())
|
||||
}
|
||||
}
|
||||
err = c.processEvent(event.WatchEvent.Type, eventObj, item.Server)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to process event %s for obj %v: %v", event.WatchEvent.Type, event.WatchEvent.Object, err)
|
||||
}
|
||||
} else {
|
||||
err = c.updateCache(item.Server, event.CacheRefresh.GVK.GroupKind(), event.CacheRefresh.ResourceVersion, event.CacheRefresh.Objects)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to process event %s for obj %v: %v", event.WatchEvent.Type, event.WatchEvent.Object, err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return fmt.Errorf("resource updates channel has closed")
|
||||
}, fmt.Sprintf("watch app resources on %s", item.Server), ctx, clusterRetryTimeout)
|
||||
}
|
||||
|
||||
func (c *liveStateCache) updateCache(server string, gk schema.GroupKind, resourceVersion string, objs []unstructured.Unstructured) error {
|
||||
clusterInfo, err := c.getSyncedCluster(server)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clusterInfo.updateCache(gk, resourceVersion, objs)
|
||||
return nil
|
||||
}
|
||||
|
||||
129
controller/cache/cache_test.go
vendored
129
controller/cache/cache_test.go
vendored
@@ -1,129 +0,0 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
||||
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
"github.com/argoproj/argo-cd/util/kube"
|
||||
"github.com/argoproj/argo-cd/util/kube/kubetest"
|
||||
)
|
||||
|
||||
const (
|
||||
pollInterval = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
func TestWatchClusterResourcesHandlesResourceEvents(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
events := make(chan kube.WatchEvent)
|
||||
defer func() {
|
||||
cancel()
|
||||
close(events)
|
||||
}()
|
||||
|
||||
pod := testPod.DeepCopy()
|
||||
|
||||
kubeMock := &kubetest.MockKubectlCmd{
|
||||
Resources: []kube.ResourcesBatch{{
|
||||
GVK: pod.GroupVersionKind(),
|
||||
Objects: make([]unstructured.Unstructured, 0),
|
||||
}},
|
||||
Events: events,
|
||||
}
|
||||
|
||||
server := "https://test"
|
||||
clusterCache := newClusterExt(kubeMock)
|
||||
|
||||
cache := &liveStateCache{
|
||||
clusters: map[string]*clusterInfo{server: clusterCache},
|
||||
lock: &sync.Mutex{},
|
||||
kubectl: kubeMock,
|
||||
}
|
||||
|
||||
go cache.watchClusterResources(ctx, v1alpha1.Cluster{Server: server})
|
||||
|
||||
assert.False(t, clusterCache.synced())
|
||||
|
||||
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: pod, Type: watch.Added}}
|
||||
|
||||
err := wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
_, hasPod := clusterCache.nodes[kube.GetResourceKey(pod)]
|
||||
return hasPod, nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
|
||||
pod.SetResourceVersion("updated-resource-version")
|
||||
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: pod, Type: watch.Modified}}
|
||||
|
||||
err = wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
updatedPodInfo, hasPod := clusterCache.nodes[kube.GetResourceKey(pod)]
|
||||
return hasPod && updatedPodInfo.resourceVersion == "updated-resource-version", nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
|
||||
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: pod, Type: watch.Deleted}}
|
||||
|
||||
err = wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
_, hasPod := clusterCache.nodes[kube.GetResourceKey(pod)]
|
||||
return !hasPod, nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestClusterCacheDroppedOnCreatedDeletedCRD(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
events := make(chan kube.WatchEvent)
|
||||
defer func() {
|
||||
cancel()
|
||||
close(events)
|
||||
}()
|
||||
|
||||
kubeMock := &kubetest.MockKubectlCmd{
|
||||
Resources: []kube.ResourcesBatch{{
|
||||
GVK: testCRD.GroupVersionKind(),
|
||||
Objects: make([]unstructured.Unstructured, 0),
|
||||
}},
|
||||
Events: events,
|
||||
}
|
||||
|
||||
server := "https://test"
|
||||
clusterCache := newClusterExt(kubeMock)
|
||||
|
||||
cache := &liveStateCache{
|
||||
clusters: map[string]*clusterInfo{server: clusterCache},
|
||||
lock: &sync.Mutex{},
|
||||
kubectl: kubeMock,
|
||||
}
|
||||
|
||||
go cache.watchClusterResources(ctx, v1alpha1.Cluster{Server: server})
|
||||
|
||||
err := clusterCache.ensureSynced()
|
||||
assert.Nil(t, err)
|
||||
|
||||
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: testCRD, Type: watch.Added}}
|
||||
err = wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
cache.lock.Lock()
|
||||
defer cache.lock.Unlock()
|
||||
_, hasCache := cache.clusters[server]
|
||||
return !hasCache, nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
|
||||
cache.clusters[server] = clusterCache
|
||||
|
||||
events <- kube.WatchEvent{WatchEvent: &watch.Event{Object: testCRD, Type: watch.Deleted}}
|
||||
err = wait.Poll(pollInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
cache.lock.Lock()
|
||||
defer cache.lock.Unlock()
|
||||
_, hasCache := cache.clusters[server]
|
||||
return !hasCache, nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
222
controller/cache/cluster.go
vendored
222
controller/cache/cluster.go
vendored
@@ -1,6 +1,7 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
@@ -21,44 +22,38 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
clusterSyncTimeout = 24 * time.Hour
|
||||
clusterRetryTimeout = 10 * time.Second
|
||||
clusterSyncTimeout = 24 * time.Hour
|
||||
clusterRetryTimeout = 10 * time.Second
|
||||
watchResourcesRetryTimeout = 1 * time.Second
|
||||
)
|
||||
|
||||
type gkInfo struct {
|
||||
resource metav1.APIResource
|
||||
type apiMeta struct {
|
||||
namespaced bool
|
||||
resourceVersion string
|
||||
watchCancel context.CancelFunc
|
||||
}
|
||||
|
||||
type clusterInfo struct {
|
||||
apis map[schema.GroupKind]*gkInfo
|
||||
nodes map[kube.ResourceKey]*node
|
||||
nsIndex map[string]map[kube.ResourceKey]*node
|
||||
lock *sync.Mutex
|
||||
syncLock *sync.Mutex
|
||||
syncTime *time.Time
|
||||
syncError error
|
||||
apisMeta map[schema.GroupKind]*apiMeta
|
||||
|
||||
lock *sync.Mutex
|
||||
nodes map[kube.ResourceKey]*node
|
||||
nsIndex map[string]map[kube.ResourceKey]*node
|
||||
|
||||
onAppUpdated func(appName string)
|
||||
kubectl kube.Kubectl
|
||||
cluster *appv1.Cluster
|
||||
syncLock *sync.Mutex
|
||||
syncTime *time.Time
|
||||
syncError error
|
||||
log *log.Entry
|
||||
settings *settings.ArgoCDSettings
|
||||
}
|
||||
|
||||
func (c *clusterInfo) getResourceVersion(gk schema.GroupKind) string {
|
||||
func (c *clusterInfo) replaceResourceCache(gk schema.GroupKind, resourceVersion string, objs []unstructured.Unstructured) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
info, ok := c.apis[gk]
|
||||
if ok {
|
||||
return info.resourceVersion
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (c *clusterInfo) updateCache(gk schema.GroupKind, resourceVersion string, objs []unstructured.Unstructured) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
info, ok := c.apis[gk]
|
||||
info, ok := c.apisMeta[gk]
|
||||
if ok {
|
||||
objByKind := make(map[kube.ResourceKey]*unstructured.Unstructured)
|
||||
for i := range objs {
|
||||
@@ -136,7 +131,13 @@ func (c *clusterInfo) removeNode(key kube.ResourceKey) {
|
||||
}
|
||||
|
||||
func (c *clusterInfo) invalidate() {
|
||||
c.syncLock.Lock()
|
||||
defer c.syncLock.Unlock()
|
||||
c.syncTime = nil
|
||||
for i := range c.apisMeta {
|
||||
c.apisMeta[i].watchCancel()
|
||||
}
|
||||
c.apisMeta = nil
|
||||
}
|
||||
|
||||
func (c *clusterInfo) synced() bool {
|
||||
@@ -149,38 +150,160 @@ func (c *clusterInfo) synced() bool {
|
||||
return time.Now().Before(c.syncTime.Add(clusterSyncTimeout))
|
||||
}
|
||||
|
||||
func (c *clusterInfo) sync() (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
|
||||
}
|
||||
}()
|
||||
func (c *clusterInfo) stopWatching(gk schema.GroupKind) {
|
||||
c.syncLock.Lock()
|
||||
defer c.syncLock.Unlock()
|
||||
if info, ok := c.apisMeta[gk]; ok {
|
||||
info.watchCancel()
|
||||
delete(c.apisMeta, gk)
|
||||
c.replaceResourceCache(gk, "", []unstructured.Unstructured{})
|
||||
log.Warnf("Stop watching %s not found on %s.", gk, c.cluster.Server)
|
||||
}
|
||||
}
|
||||
|
||||
c.log.Info("Start syncing cluster")
|
||||
// startMissingWatches lists supported cluster resources and start watching for changes unless watch is already running
|
||||
func (c *clusterInfo) startMissingWatches() error {
|
||||
c.syncLock.Lock()
|
||||
defer c.syncLock.Unlock()
|
||||
|
||||
c.apis = make(map[schema.GroupKind]*gkInfo)
|
||||
c.nodes = make(map[kube.ResourceKey]*node)
|
||||
|
||||
resources, err := c.kubectl.GetResources(c.cluster.RESTConfig(), c.settings, "")
|
||||
apis, err := c.kubectl.GetAPIResources(c.cluster.RESTConfig(), c.settings)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to sync cluster %s: %v", c.cluster.Server, err)
|
||||
return err
|
||||
}
|
||||
|
||||
appLabelKey := c.settings.GetAppInstanceLabelKey()
|
||||
for res := range resources {
|
||||
if res.Error != nil {
|
||||
return res.Error
|
||||
for i := range apis {
|
||||
api := apis[i]
|
||||
if _, ok := c.apisMeta[api.GroupKind]; !ok {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
info := &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}
|
||||
c.apisMeta[api.GroupKind] = info
|
||||
go c.watchEvents(ctx, api, info)
|
||||
}
|
||||
if _, ok := c.apis[res.GVK.GroupKind()]; !ok {
|
||||
c.apis[res.GVK.GroupKind()] = &gkInfo{
|
||||
resourceVersion: res.ListResourceVersion,
|
||||
resource: res.ResourceInfo,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func runSynced(lock *sync.Mutex, action func() error) error {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
return action()
|
||||
}
|
||||
|
||||
func (c *clusterInfo) watchEvents(ctx context.Context, api kube.APIResourceInfo, info *apiMeta) {
|
||||
util.RetryUntilSucceed(func() (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
|
||||
}
|
||||
}()
|
||||
|
||||
err = runSynced(c.syncLock, func() error {
|
||||
if info.resourceVersion == "" {
|
||||
list, err := api.Interface.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.replaceResourceCache(api.GroupKind, list.GetResourceVersion(), list.Items)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w, err := api.Interface.Watch(metav1.ListOptions{ResourceVersion: info.resourceVersion})
|
||||
if errors.IsNotFound(err) {
|
||||
c.stopWatching(api.GroupKind)
|
||||
return nil
|
||||
}
|
||||
|
||||
err = runSynced(c.syncLock, func() error {
|
||||
if errors.IsGone(err) {
|
||||
info.resourceVersion = ""
|
||||
log.Warnf("Resource version of %s on %s is too old.", api.GroupKind, c.cluster.Server)
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer w.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case event, ok := <-w.ResultChan():
|
||||
if ok {
|
||||
obj := event.Object.(*unstructured.Unstructured)
|
||||
info.resourceVersion = obj.GetResourceVersion()
|
||||
err = c.processEvent(event.Type, obj)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to process event %s %s/%s/%s: %v", event.Type, obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
if kube.IsCRD(obj) {
|
||||
if event.Type == watch.Deleted {
|
||||
group, groupOk, groupErr := unstructured.NestedString(obj.Object, "spec", "group")
|
||||
kind, kindOk, kindErr := unstructured.NestedString(obj.Object, "spec", "names", "kind")
|
||||
|
||||
if groupOk && groupErr == nil && kindOk && kindErr == nil {
|
||||
gk := schema.GroupKind{Group: group, Kind: kind}
|
||||
c.stopWatching(gk)
|
||||
}
|
||||
} else {
|
||||
err = c.startMissingWatches()
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
log.Warnf("Failed to start missing watch: %v", err)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.cluster.Server)
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := range res.Objects {
|
||||
c.setNode(createObjInfo(&res.Objects[i], appLabelKey))
|
||||
|
||||
}, fmt.Sprintf("watch %s on %s", api.GroupKind, c.cluster.Server), ctx, watchResourcesRetryTimeout)
|
||||
}
|
||||
|
||||
func (c *clusterInfo) sync() (err error) {
|
||||
|
||||
c.log.Info("Start syncing cluster")
|
||||
|
||||
c.apisMeta = make(map[schema.GroupKind]*apiMeta)
|
||||
c.nodes = make(map[kube.ResourceKey]*node)
|
||||
|
||||
apis, err := c.kubectl.GetAPIResources(c.cluster.RESTConfig(), c.settings)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lock := sync.Mutex{}
|
||||
err = util.RunAllAsync(len(apis), func(i int) error {
|
||||
api := apis[i]
|
||||
list, err := api.Interface.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lock.Lock()
|
||||
for i := range list.Items {
|
||||
c.setNode(createObjInfo(&list.Items[i], c.settings.GetAppInstanceLabelKey()))
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
info := &apiMeta{namespaced: api.Meta.Namespaced, resourceVersion: list.GetResourceVersion(), watchCancel: cancel}
|
||||
c.apisMeta[api.GroupKind] = info
|
||||
lock.Unlock()
|
||||
|
||||
go c.watchEvents(ctx, api, info)
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("Failed to sync cluster %s: %v", c.cluster.Server, err)
|
||||
return err
|
||||
}
|
||||
|
||||
c.log.Info("Cluster successfully synced")
|
||||
@@ -219,8 +342,8 @@ func (c *clusterInfo) getChildren(obj *unstructured.Unstructured) []appv1.Resour
|
||||
return children
|
||||
}
|
||||
|
||||
func (c *clusterInfo) isNamespaced(gk schema.GroupKind) bool {
|
||||
if api, ok := c.apis[gk]; ok && !api.resource.Namespaced {
|
||||
func (c *clusterInfo) isNamespaced(obj *unstructured.Unstructured) bool {
|
||||
if api, ok := c.apisMeta[kube.GetResourceKey(obj).GroupKind()]; ok && !api.namespaced {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -242,7 +365,7 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
|
||||
lock := &sync.Mutex{}
|
||||
err := util.RunAllAsync(len(targetObjs), func(i int) error {
|
||||
targetObj := targetObjs[i]
|
||||
key := GetTargetObjKey(a, targetObj, c.isNamespaced(targetObj.GroupVersionKind().GroupKind()))
|
||||
key := GetTargetObjKey(a, targetObj, c.isNamespaced(targetObj))
|
||||
lock.Lock()
|
||||
managedObj := managedObjs[key]
|
||||
lock.Unlock()
|
||||
@@ -311,9 +434,6 @@ func (c *clusterInfo) processEvent(event watch.EventType, un *unstructured.Unstr
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
key := kube.GetResourceKey(un)
|
||||
if info, ok := c.apis[schema.GroupKind{Group: key.Group, Kind: key.Kind}]; ok {
|
||||
info.resourceVersion = un.GetResourceVersion()
|
||||
}
|
||||
existingNode, exists := c.nodes[key]
|
||||
if event == watch.Deleted {
|
||||
if exists {
|
||||
|
||||
62
controller/cache/cluster_test.go
vendored
62
controller/cache/cluster_test.go
vendored
@@ -6,6 +6,9 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/dynamic/fake"
|
||||
|
||||
"github.com/ghodss/yaml"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -49,8 +52,7 @@ var (
|
||||
resourceVersion: "123"`)
|
||||
|
||||
testRS = strToUnstructured(`
|
||||
apiVersion: v1
|
||||
apiVersion: extensions/v1beta1
|
||||
apiVersion: apps/v1
|
||||
kind: ReplicaSet
|
||||
metadata:
|
||||
name: helm-guestbook-rs
|
||||
@@ -62,39 +64,39 @@ var (
|
||||
resourceVersion: "123"`)
|
||||
|
||||
testDeploy = strToUnstructured(`
|
||||
apiVersion: extensions/v1beta1
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/instance: helm-guestbook
|
||||
name: helm-guestbook
|
||||
namespace: default
|
||||
resourceVersion: "123"`)
|
||||
|
||||
testCRD = strToUnstructured(`
|
||||
apiVersion: apiextensions.k8s.io/v1beta1
|
||||
kind: CustomResourceDefinition
|
||||
metadata:
|
||||
name: my-custom-resource-definition
|
||||
resourceVersion: "123"`)
|
||||
)
|
||||
|
||||
func newCluster(objs ...*unstructured.Unstructured) *clusterInfo {
|
||||
resByGVK := make(map[schema.GroupVersionKind][]unstructured.Unstructured)
|
||||
runtimeObjs := make([]runtime.Object, len(objs))
|
||||
for i := range objs {
|
||||
resByGVK[objs[i].GroupVersionKind()] = append(resByGVK[objs[i].GroupVersionKind()], *objs[i])
|
||||
runtimeObjs[i] = objs[i]
|
||||
}
|
||||
resources := make([]kube.ResourcesBatch, 0)
|
||||
for gvk, objects := range resByGVK {
|
||||
resources = append(resources, kube.ResourcesBatch{
|
||||
ListResourceVersion: "1",
|
||||
GVK: gvk,
|
||||
Objects: objects,
|
||||
})
|
||||
}
|
||||
return newClusterExt(kubetest.MockKubectlCmd{
|
||||
Resources: resources,
|
||||
})
|
||||
scheme := runtime.NewScheme()
|
||||
client := fake.NewSimpleDynamicClient(scheme, runtimeObjs...)
|
||||
|
||||
apiResources := []kube.APIResourceInfo{{
|
||||
GroupKind: schema.GroupKind{Group: "", Kind: "Pod"},
|
||||
Interface: client.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}),
|
||||
Meta: metav1.APIResource{Namespaced: true},
|
||||
}, {
|
||||
GroupKind: schema.GroupKind{Group: "apps", Kind: "ReplicaSet"},
|
||||
Interface: client.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}),
|
||||
Meta: metav1.APIResource{Namespaced: true},
|
||||
}, {
|
||||
GroupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"},
|
||||
Interface: client.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}),
|
||||
Meta: metav1.APIResource{Namespaced: true},
|
||||
}}
|
||||
|
||||
return newClusterExt(kubetest.MockKubectlCmd{APIResources: apiResources})
|
||||
}
|
||||
|
||||
func newClusterExt(kubectl kube.Kubectl) *clusterInfo {
|
||||
@@ -107,7 +109,7 @@ func newClusterExt(kubectl kube.Kubectl) *clusterInfo {
|
||||
cluster: &appv1.Cluster{},
|
||||
syncTime: nil,
|
||||
syncLock: &sync.Mutex{},
|
||||
apis: make(map[schema.GroupKind]*gkInfo),
|
||||
apisMeta: make(map[schema.GroupKind]*apiMeta),
|
||||
log: log.WithField("cluster", "test"),
|
||||
settings: &settings.ArgoCDSettings{},
|
||||
}
|
||||
@@ -135,8 +137,8 @@ func TestGetChildren(t *testing.T) {
|
||||
Kind: "ReplicaSet",
|
||||
Namespace: "default",
|
||||
Name: "helm-guestbook-rs",
|
||||
Group: "extensions",
|
||||
Version: "v1beta1",
|
||||
Group: "apps",
|
||||
Version: "v1",
|
||||
ResourceVersion: "123",
|
||||
Children: rsChildren,
|
||||
Info: []appv1.InfoItem{},
|
||||
@@ -149,7 +151,7 @@ func TestGetManagedLiveObjs(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
targetDeploy := strToUnstructured(`
|
||||
apiVersion: extensions/v1beta1
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: helm-guestbook
|
||||
@@ -279,7 +281,7 @@ func TestUpdateAppResource(t *testing.T) {
|
||||
err = cluster.processEvent(watch.Modified, mustToUnstructured(testPod))
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Equal(t, []string{"helm-guestbook"}, updatesReceived)
|
||||
assert.Contains(t, updatesReceived, "helm-guestbook")
|
||||
}
|
||||
|
||||
func TestCircularReference(t *testing.T) {
|
||||
@@ -316,7 +318,7 @@ func TestWatchCacheUpdated(t *testing.T) {
|
||||
|
||||
podGroupKind := testPod.GroupVersionKind().GroupKind()
|
||||
|
||||
cluster.updateCache(podGroupKind, "updated-list-version", []unstructured.Unstructured{*updated, *added})
|
||||
cluster.replaceResourceCache(podGroupKind, "updated-list-version", []unstructured.Unstructured{*updated, *added})
|
||||
|
||||
_, ok := cluster.nodes[kube.GetResourceKey(removed)]
|
||||
assert.False(t, ok)
|
||||
@@ -327,6 +329,4 @@ func TestWatchCacheUpdated(t *testing.T) {
|
||||
|
||||
_, ok = cluster.nodes[kube.GetResourceKey(added)]
|
||||
assert.True(t, ok)
|
||||
|
||||
assert.Equal(t, cluster.getResourceVersion(podGroupKind), "updated-list-version")
|
||||
}
|
||||
|
||||
2
controller/cache/info.go
vendored
2
controller/cache/info.go
vendored
@@ -3,7 +3,7 @@ package cache
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
k8snode "k8s.io/kubernetes/pkg/util/node"
|
||||
|
||||
27
controller/cache/mocks/LiveStateCache.go
vendored
27
controller/cache/mocks/LiveStateCache.go
vendored
@@ -2,14 +2,11 @@
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
import "github.com/argoproj/argo-cd/util/kube"
|
||||
import "github.com/stretchr/testify/mock"
|
||||
import "k8s.io/apimachinery/pkg/runtime/schema"
|
||||
import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
import "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
import context "context"
|
||||
import kube "github.com/argoproj/argo-cd/util/kube"
|
||||
import mock "github.com/stretchr/testify/mock"
|
||||
import unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
import v1alpha1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
|
||||
// LiveStateCache is an autogenerated mock type for the LiveStateCache type
|
||||
type LiveStateCache struct {
|
||||
@@ -81,20 +78,20 @@ func (_m *LiveStateCache) Invalidate() {
|
||||
_m.Called()
|
||||
}
|
||||
|
||||
// IsNamespaced provides a mock function with given fields: server, gvk
|
||||
func (_m *LiveStateCache) IsNamespaced(server string, gvk schema.GroupVersionKind) (bool, error) {
|
||||
ret := _m.Called(server, gvk)
|
||||
// IsNamespaced provides a mock function with given fields: server, obj
|
||||
func (_m *LiveStateCache) IsNamespaced(server string, obj *unstructured.Unstructured) (bool, error) {
|
||||
ret := _m.Called(server, obj)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(string, schema.GroupVersionKind) bool); ok {
|
||||
r0 = rf(server, gvk)
|
||||
if rf, ok := ret.Get(0).(func(string, *unstructured.Unstructured) bool); ok {
|
||||
r0 = rf(server, obj)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(string, schema.GroupVersionKind) error); ok {
|
||||
r1 = rf(server, gvk)
|
||||
if rf, ok := ret.Get(1).(func(string, *unstructured.Unstructured) error); ok {
|
||||
r1 = rf(server, obj)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
4
controller/cache/node.go
vendored
4
controller/cache/node.go
vendored
@@ -6,7 +6,7 @@ import (
|
||||
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
"github.com/argoproj/argo-cd/util/kube"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@@ -26,8 +26,8 @@ func (n *node) resourceKey() kube.ResourceKey {
|
||||
}
|
||||
|
||||
func (n *node) isParentOf(child *node) bool {
|
||||
ownerGvk := n.ref.GroupVersionKind()
|
||||
for _, ownerRef := range child.ownerRefs {
|
||||
ownerGvk := schema.FromAPIVersionAndKind(ownerRef.APIVersion, ownerRef.Kind)
|
||||
if kube.NewResourceKey(ownerGvk.Group, ownerRef.Kind, n.ref.Namespace, ownerRef.Name) == n.resourceKey() {
|
||||
return true
|
||||
}
|
||||
|
||||
25
controller/cache/node_test.go
vendored
Normal file
25
controller/cache/node_test.go
vendored
Normal file
@@ -0,0 +1,25 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestIsParentOf(t *testing.T) {
|
||||
child := createObjInfo(testPod, "")
|
||||
parent := createObjInfo(testRS, "")
|
||||
grandParent := createObjInfo(testDeploy, "")
|
||||
|
||||
assert.True(t, parent.isParentOf(child))
|
||||
assert.False(t, grandParent.isParentOf(child))
|
||||
}
|
||||
|
||||
func TestIsParentOfSameKindDifferentGroup(t *testing.T) {
|
||||
rs := testRS.DeepCopy()
|
||||
rs.SetAPIVersion("somecrd.io/v1")
|
||||
child := createObjInfo(testPod, "")
|
||||
invalidParent := createObjInfo(rs, "")
|
||||
|
||||
assert.False(t, invalidParent.isParentOf(child))
|
||||
}
|
||||
@@ -220,7 +220,7 @@ func TestReconcileMetrics(t *testing.T) {
|
||||
metricsServ := NewMetricsServer("localhost:8082", appLister)
|
||||
|
||||
fakeApp := newFakeApp(fakeApp)
|
||||
metricsServ.IncReconcile(fakeApp, time.Duration(5*time.Second))
|
||||
metricsServ.IncReconcile(fakeApp, 5*time.Second)
|
||||
|
||||
req, err := http.NewRequest("GET", "/metrics", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -175,7 +175,7 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, revision st
|
||||
for i, obj := range targetObjs {
|
||||
gvk := obj.GroupVersionKind()
|
||||
ns := util.FirstNonEmpty(obj.GetNamespace(), app.Spec.Destination.Namespace)
|
||||
if namespaced, err := m.liveStateCache.IsNamespaced(app.Spec.Destination.Server, obj.GroupVersionKind()); err == nil && !namespaced {
|
||||
if namespaced, err := m.liveStateCache.IsNamespaced(app.Spec.Destination.Server, obj); err == nil && !namespaced {
|
||||
ns = ""
|
||||
}
|
||||
key := kubeutil.NewResourceKey(gvk.Group, gvk.Kind, ns, obj.GetName())
|
||||
@@ -278,7 +278,9 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, revision st
|
||||
conditions: conditions,
|
||||
hooks: hooks,
|
||||
diffNormalizer: diffNormalizer,
|
||||
appSourceType: v1alpha1.ApplicationSourceType(manifestInfo.SourceType),
|
||||
}
|
||||
if manifestInfo != nil {
|
||||
compRes.appSourceType = v1alpha1.ApplicationSourceType(manifestInfo.SourceType)
|
||||
}
|
||||
return &compRes, nil
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
||||
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
{
|
||||
"Vendor": true,
|
||||
"DisableAll": true,
|
||||
"Deadline": "8m",
|
||||
"Enable": [
|
||||
"vet",
|
||||
"gofmt",
|
||||
"goimports",
|
||||
"deadcode",
|
||||
"errcheck",
|
||||
"varcheck",
|
||||
"structcheck",
|
||||
"ineffassign",
|
||||
"unconvert",
|
||||
"misspell"
|
||||
],
|
||||
"Linters": {
|
||||
"goimports": {"Command": "goimports -l --local github.com/argoproj/argo-cd"}
|
||||
},
|
||||
"Skip": [
|
||||
"pkg/client",
|
||||
"vendor/",
|
||||
".pb.go"
|
||||
],
|
||||
"Exclude": [
|
||||
"pkg/client",
|
||||
"vendor/",
|
||||
".pb.go",
|
||||
".*warning.*fmt.Fprint"
|
||||
]
|
||||
}
|
||||
7
hack/git-ask-pass.sh
Executable file
7
hack/git-ask-pass.sh
Executable file
@@ -0,0 +1,7 @@
|
||||
#!/bin/sh
|
||||
# This script is used as the commaned supplied to GIT_ASKPASS as a way to supply username/password
|
||||
# credentials to git, without having to use git credentials helpers, or having on-disk config.
|
||||
case "$1" in
|
||||
Username*) echo "${GIT_USERNAME}" ;;
|
||||
Password*) echo "${GIT_PASSWORD}" ;;
|
||||
esac
|
||||
@@ -12,7 +12,7 @@ bases:
|
||||
images:
|
||||
- name: argoproj/argocd
|
||||
newName: argoproj/argocd
|
||||
newTag: latest
|
||||
newTag: v0.12.0-rc4
|
||||
- name: argoproj/argocd-ui
|
||||
newName: argoproj/argocd-ui
|
||||
newTag: latest
|
||||
newTag: v0.12.0-rc4
|
||||
|
||||
@@ -17,7 +17,7 @@ patchesStrategicMerge:
|
||||
images:
|
||||
- name: argoproj/argocd
|
||||
newName: argoproj/argocd
|
||||
newTag: latest
|
||||
newTag: v0.12.0-rc4
|
||||
- name: argoproj/argocd-ui
|
||||
newName: argoproj/argocd-ui
|
||||
newTag: latest
|
||||
newTag: v0.12.0-rc4
|
||||
|
||||
@@ -646,7 +646,7 @@ spec:
|
||||
- argocd-redis-ha-announce-2:26379
|
||||
- --sentinelmaster
|
||||
- argocd
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-application-controller
|
||||
ports:
|
||||
@@ -693,7 +693,7 @@ spec:
|
||||
- cp
|
||||
- /usr/local/bin/argocd-util
|
||||
- /shared
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: copyutil
|
||||
volumeMounts:
|
||||
@@ -748,7 +748,7 @@ spec:
|
||||
- argocd-redis-ha-announce-2:26379
|
||||
- --sentinelmaster
|
||||
- argocd
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-repo-server
|
||||
ports:
|
||||
@@ -804,7 +804,7 @@ spec:
|
||||
- argocd-redis-ha-announce-2:26379
|
||||
- --sentinelmaster
|
||||
- argocd
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-server
|
||||
ports:
|
||||
@@ -825,7 +825,7 @@ spec:
|
||||
- -r
|
||||
- /app
|
||||
- /shared
|
||||
image: argoproj/argocd-ui:latest
|
||||
image: argoproj/argocd-ui:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: ui
|
||||
volumeMounts:
|
||||
|
||||
@@ -562,7 +562,7 @@ spec:
|
||||
- argocd-redis-ha-announce-2:26379
|
||||
- --sentinelmaster
|
||||
- argocd
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-application-controller
|
||||
ports:
|
||||
@@ -609,7 +609,7 @@ spec:
|
||||
- cp
|
||||
- /usr/local/bin/argocd-util
|
||||
- /shared
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: copyutil
|
||||
volumeMounts:
|
||||
@@ -664,7 +664,7 @@ spec:
|
||||
- argocd-redis-ha-announce-2:26379
|
||||
- --sentinelmaster
|
||||
- argocd
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-repo-server
|
||||
ports:
|
||||
@@ -720,7 +720,7 @@ spec:
|
||||
- argocd-redis-ha-announce-2:26379
|
||||
- --sentinelmaster
|
||||
- argocd
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-server
|
||||
ports:
|
||||
@@ -741,7 +741,7 @@ spec:
|
||||
- -r
|
||||
- /app
|
||||
- /shared
|
||||
image: argoproj/argocd-ui:latest
|
||||
image: argoproj/argocd-ui:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: ui
|
||||
volumeMounts:
|
||||
|
||||
@@ -451,7 +451,7 @@ spec:
|
||||
- "20"
|
||||
- --operation-processors
|
||||
- "10"
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-application-controller
|
||||
ports:
|
||||
@@ -498,7 +498,7 @@ spec:
|
||||
- cp
|
||||
- /usr/local/bin/argocd-util
|
||||
- /shared
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: copyutil
|
||||
volumeMounts:
|
||||
@@ -561,7 +561,7 @@ spec:
|
||||
- argocd-repo-server
|
||||
- --redis
|
||||
- argocd-redis:6379
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-repo-server
|
||||
ports:
|
||||
@@ -594,7 +594,7 @@ spec:
|
||||
- argocd-server
|
||||
- --staticassets
|
||||
- /shared/app
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-server
|
||||
ports:
|
||||
@@ -615,7 +615,7 @@ spec:
|
||||
- -r
|
||||
- /app
|
||||
- /shared
|
||||
image: argoproj/argocd-ui:latest
|
||||
image: argoproj/argocd-ui:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: ui
|
||||
volumeMounts:
|
||||
|
||||
@@ -367,7 +367,7 @@ spec:
|
||||
- "20"
|
||||
- --operation-processors
|
||||
- "10"
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-application-controller
|
||||
ports:
|
||||
@@ -414,7 +414,7 @@ spec:
|
||||
- cp
|
||||
- /usr/local/bin/argocd-util
|
||||
- /shared
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: copyutil
|
||||
volumeMounts:
|
||||
@@ -477,7 +477,7 @@ spec:
|
||||
- argocd-repo-server
|
||||
- --redis
|
||||
- argocd-redis:6379
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-repo-server
|
||||
ports:
|
||||
@@ -510,7 +510,7 @@ spec:
|
||||
- argocd-server
|
||||
- --staticassets
|
||||
- /shared/app
|
||||
image: argoproj/argocd:latest
|
||||
image: argoproj/argocd:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: argocd-server
|
||||
ports:
|
||||
@@ -531,7 +531,7 @@ spec:
|
||||
- -r
|
||||
- /app
|
||||
- /shared
|
||||
image: argoproj/argocd-ui:latest
|
||||
image: argoproj/argocd-ui:v0.12.0-rc4
|
||||
imagePullPolicy: Always
|
||||
name: ui
|
||||
volumeMounts:
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/argoproj/argo-cd"
|
||||
argocd "github.com/argoproj/argo-cd"
|
||||
"github.com/argoproj/argo-cd/common"
|
||||
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
argoappv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
|
||||
@@ -251,7 +251,7 @@ func TestGetAppDetailsKustomize(t *testing.T) {
|
||||
Path: "kustomization_yaml",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "nginx", res.Kustomize.ImageTags[0].Name)
|
||||
assert.Equal(t, "1.15.4", res.Kustomize.ImageTags[0].Value)
|
||||
assert.Equal(t, "k8s.gcr.io/nginx-slim", res.Kustomize.ImageTags[0].Name)
|
||||
assert.Equal(t, "0.8", res.Kustomize.ImageTags[0].Value)
|
||||
assert.Equal(t, 2, len(res.Kustomize.ImageTags))
|
||||
}
|
||||
|
||||
@@ -492,18 +492,13 @@ func (s *Server) validateAndNormalizeApp(ctx context.Context, app *appv1.Applica
|
||||
}
|
||||
}
|
||||
|
||||
conditions, err := argo.GetSpecErrors(ctx, &app.Spec, proj, s.repoClientset, s.db)
|
||||
conditions, appSourceType, err := argo.GetSpecErrors(ctx, &app.Spec, proj, s.repoClientset, s.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(conditions) > 0 {
|
||||
return status.Errorf(codes.InvalidArgument, "application spec is invalid: %s", argo.FormatAppConditions(conditions))
|
||||
}
|
||||
|
||||
appSourceType, err := argo.QueryAppSourceType(ctx, app, s.repoClientset, s.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
app.Spec = *argo.NormalizeApplicationSpec(&app.Spec, appSourceType)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -140,20 +140,20 @@ func GetSpecErrors(
|
||||
proj *argoappv1.AppProject,
|
||||
repoClientset reposerver.Clientset,
|
||||
db db.ArgoDB,
|
||||
) ([]argoappv1.ApplicationCondition, error) {
|
||||
) ([]argoappv1.ApplicationCondition, argoappv1.ApplicationSourceType, error) {
|
||||
conditions := make([]argoappv1.ApplicationCondition, 0)
|
||||
if spec.Source.RepoURL == "" || spec.Source.Path == "" {
|
||||
conditions = append(conditions, argoappv1.ApplicationCondition{
|
||||
Type: argoappv1.ApplicationConditionInvalidSpecError,
|
||||
Message: "spec.source.repoURL and spec.source.path are required",
|
||||
})
|
||||
return conditions, nil
|
||||
return conditions, "", nil
|
||||
}
|
||||
|
||||
// Test the repo
|
||||
conn, repoClient, err := repoClientset.NewRepoServerClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
defer util.Close(conn)
|
||||
repoAccessable := false
|
||||
@@ -174,12 +174,13 @@ func GetSpecErrors(
|
||||
repoAccessable = true
|
||||
}
|
||||
} else {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
} else {
|
||||
repoAccessable = true
|
||||
}
|
||||
|
||||
var appSourceType argoappv1.ApplicationSourceType
|
||||
// Verify only one source type is defined
|
||||
explicitSourceType, err := spec.Source.ExplicitType()
|
||||
if err != nil {
|
||||
@@ -190,7 +191,6 @@ func GetSpecErrors(
|
||||
}
|
||||
|
||||
if repoAccessable {
|
||||
var appSourceType argoappv1.ApplicationSourceType
|
||||
if explicitSourceType != nil {
|
||||
appSourceType = *explicitSourceType
|
||||
} else {
|
||||
@@ -249,11 +249,11 @@ func GetSpecErrors(
|
||||
Message: fmt.Sprintf("cluster '%s' has not been configured", spec.Destination.Server),
|
||||
})
|
||||
} else {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
}
|
||||
}
|
||||
return conditions, nil
|
||||
return conditions, appSourceType, nil
|
||||
}
|
||||
|
||||
// GetAppProject returns a project from an application
|
||||
@@ -261,27 +261,6 @@ func GetAppProject(spec *argoappv1.ApplicationSpec, projLister applicationsv1.Ap
|
||||
return projLister.AppProjects(ns).Get(spec.GetProject())
|
||||
}
|
||||
|
||||
// QueryAppSourceType queries repo server for yaml files in a directory, and determines its
|
||||
// application source type based on the files in the directory.
|
||||
// This code is redundant to the logic in argo.GetSpecErrors, but since it's is hard to
|
||||
// extract out of there. We will be throwing away this code when we remove
|
||||
// componentParameterOverrides.
|
||||
func QueryAppSourceType(ctx context.Context, app *argoappv1.Application, repoClientset reposerver.Clientset, db db.ArgoDB) (argoappv1.ApplicationSourceType, error) {
|
||||
if t, _ := app.Spec.Source.ExplicitType(); t != nil {
|
||||
return *t, nil
|
||||
}
|
||||
repoRes, err := db.GetRepository(ctx, app.Spec.Source.RepoURL)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conn, repoClient, err := repoClientset.NewRepoServerClient()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer util.Close(conn)
|
||||
return queryAppSourceType(ctx, &app.Spec, repoRes, repoClient)
|
||||
}
|
||||
|
||||
func queryAppSourceType(ctx context.Context, spec *argoappv1.ApplicationSpec, repoRes *argoappv1.Repository, repoClient repository.RepoServerServiceClient) (argoappv1.ApplicationSourceType, error) {
|
||||
req := repository.ListDirRequest{
|
||||
Repo: &argoappv1.Repository{
|
||||
|
||||
@@ -2,7 +2,7 @@ package argo
|
||||
|
||||
import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
2
util/cache/client_test.go
vendored
2
util/cache/client_test.go
vendored
@@ -28,7 +28,7 @@ func TestCache(t *testing.T) {
|
||||
cacheObj.Foo = "baz"
|
||||
err = c.Get("key", &obj)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, string(obj.Foo), "foo")
|
||||
assert.EqualValues(t, obj.Foo, "foo")
|
||||
assert.EqualValues(t, string(obj.Bar), "bar")
|
||||
|
||||
err = c.Delete("key")
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/kubernetes/pkg/kubectl/util/term"
|
||||
|
||||
"github.com/argoproj/argo-cd"
|
||||
argocd "github.com/argoproj/argo-cd"
|
||||
"github.com/argoproj/argo-cd/errors"
|
||||
)
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ package db
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
|
||||
@@ -103,6 +103,18 @@ func (m *nativeGitClient) Init() error {
|
||||
// Fetch fetches latest updates from origin
|
||||
func (m *nativeGitClient) Fetch() error {
|
||||
log.Debugf("Fetching repo %s at %s", m.repoURL, m.root)
|
||||
// Two techniques are used for fetching the remote depending if the remote is SSH vs. HTTPS
|
||||
// If http, we fork/exec the git CLI since the go-git client does not properly support git
|
||||
// providers such as AWS CodeCommit and Azure DevOps.
|
||||
if _, ok := m.auth.(*ssh2.PublicKeys); ok {
|
||||
return m.goGitFetch()
|
||||
}
|
||||
_, err := m.runCredentialedCmd("git", "fetch", "origin", "--tags", "--force")
|
||||
return err
|
||||
}
|
||||
|
||||
// goGitFetch fetches the remote using go-git
|
||||
func (m *nativeGitClient) goGitFetch() error {
|
||||
repo, err := git.PlainOpen(m.root)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -119,17 +131,6 @@ func (m *nativeGitClient) Fetch() error {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
|
||||
// git fetch does not update the HEAD reference. The following command will update the local
|
||||
// knowledge of what remote considers the “default branch”
|
||||
// See: https://stackoverflow.com/questions/8839958/how-does-origin-head-get-set
|
||||
// NOTE(jessesuen): disabling the following code because:
|
||||
// 1. we no longer perform a `git checkout HEAD`, instead relying on `ls-remote` and checking
|
||||
// out a specific SHA1.
|
||||
// 2. This command is the only other command that we use (excluding fetch/ls-remote) which
|
||||
// requires remote access, and there appears to be no go-git equivalent to this command.
|
||||
// _, err = m.runCmd("git", "remote", "set-head", "origin", "-a")
|
||||
// return err
|
||||
}
|
||||
|
||||
// LsFiles lists the local working tree, including only files that are under source control
|
||||
@@ -241,13 +242,27 @@ func (m *nativeGitClient) CommitSHA() (string, error) {
|
||||
// runCmd is a convenience function to run a command in a given directory and return its output
|
||||
func (m *nativeGitClient) runCmd(command string, args ...string) (string, error) {
|
||||
cmd := exec.Command(command, args...)
|
||||
return m.runCmdOutput(cmd)
|
||||
}
|
||||
|
||||
// runCredentialedCmd is a convenience function to run a git command with username/password credentials
|
||||
func (m *nativeGitClient) runCredentialedCmd(command string, args ...string) (string, error) {
|
||||
cmd := exec.Command(command, args...)
|
||||
if auth, ok := m.auth.(*http.BasicAuth); ok {
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("GIT_ASKPASS=git-ask-pass.sh"))
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("GIT_USERNAME=%s", auth.Username))
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("GIT_PASSWORD=%s", auth.Password))
|
||||
}
|
||||
return m.runCmdOutput(cmd)
|
||||
}
|
||||
|
||||
func (m *nativeGitClient) runCmdOutput(cmd *exec.Cmd) (string, error) {
|
||||
log.Debug(strings.Join(cmd.Args, " "))
|
||||
cmd.Dir = m.root
|
||||
env := os.Environ()
|
||||
env = append(env, "HOME=/dev/null")
|
||||
env = append(env, "GIT_CONFIG_NOSYSTEM=true")
|
||||
env = append(env, "GIT_ASKPASS=")
|
||||
cmd.Env = env
|
||||
cmd.Env = append(cmd.Env, os.Environ()...)
|
||||
cmd.Env = append(cmd.Env, "HOME=/dev/null")
|
||||
cmd.Env = append(cmd.Env, "GIT_CONFIG_NOSYSTEM=true")
|
||||
cmd.Env = append(cmd.Env, "GIT_CONFIG_NOGLOBAL=true")
|
||||
out, err := cmd.Output()
|
||||
if len(out) > 0 {
|
||||
log.Debug(string(out))
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package git
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -127,8 +129,7 @@ func TestLsRemote(t *testing.T) {
|
||||
func TestGitClient(t *testing.T) {
|
||||
testRepos := []string{
|
||||
"https://github.com/argoproj/argocd-example-apps",
|
||||
// TODO: add this back when azure repos are supported
|
||||
//"https://jsuen0437@dev.azure.com/jsuen0437/jsuen/_git/jsuen",
|
||||
"https://jsuen0437@dev.azure.com/jsuen0437/jsuen/_git/jsuen",
|
||||
}
|
||||
for _, repo := range testRepos {
|
||||
dirName, err := ioutil.TempDir("", "git-client-test-")
|
||||
@@ -142,22 +143,21 @@ func TestGitClient(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestPrivateGitRepo tests the ability to operate on a private git repo. This test needs to be run
|
||||
// manually since we do not have a private git repo for testing
|
||||
//
|
||||
// export TEST_REPO=https://github.com/jessesuen/private-argocd-example-apps
|
||||
// export GITHUB_TOKEN=<YOURGITHUBTOKEN>
|
||||
// go test -v -run ^(TestPrivateGitRepo)$ ./util/git/...
|
||||
// TestPrivateGitRepo tests the ability to operate on a private git repo.
|
||||
func TestPrivateGitRepo(t *testing.T) {
|
||||
repo := os.Getenv("TEST_REPO")
|
||||
username := os.Getenv("TEST_USERNAME")
|
||||
password := os.Getenv("GITHUB_TOKEN")
|
||||
if username == "" {
|
||||
username = "git" // username does not matter for tokens
|
||||
}
|
||||
if repo == "" || password == "" {
|
||||
t.Skip("skipping private git repo test since no repo or password supplied")
|
||||
}
|
||||
repo := "https://gitlab.com/argo-cd-test/argocd-example-apps.git"
|
||||
username := "blah"
|
||||
// This is a personal access token generated with read only access in a throwaway gitlab test
|
||||
// account/repo
|
||||
password := "B5sBDeoqAVUouoHkrovy"
|
||||
|
||||
// add the hack path which has the git-ask-pass.sh shell script
|
||||
osPath := os.Getenv("PATH")
|
||||
hackPath, err := filepath.Abs("../../hack")
|
||||
assert.NoError(t, err)
|
||||
err = os.Setenv("PATH", fmt.Sprintf("%s:%s", osPath, hackPath))
|
||||
assert.NoError(t, err)
|
||||
defer func() { _ = os.Setenv("PATH", osPath) }()
|
||||
|
||||
dirName, err := ioutil.TempDir("", "git-client-test-")
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -5,14 +5,13 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
|
||||
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
|
||||
ctx_logrus "github.com/grpc-ecosystem/go-grpc-middleware/tags/logrus"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/logging"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/tags/logrus"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func logRequest(entry *logrus.Entry, info string, pbMsg interface{}, ctx context.Context, logClaims bool) {
|
||||
|
||||
@@ -3,8 +3,8 @@ package health
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/apps/v1"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/apps/v1"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
coreV1 "k8s.io/api/core/v1"
|
||||
extv1beta1 "k8s.io/api/extensions/v1beta1"
|
||||
|
||||
205
util/kube/ctl.go
205
util/kube/ctl.go
@@ -2,24 +2,18 @@ package kube
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os/exec"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
apierr "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
@@ -29,170 +23,91 @@ import (
|
||||
"github.com/argoproj/argo-cd/util/diff"
|
||||
)
|
||||
|
||||
type ResourcesBatch struct {
|
||||
GVK schema.GroupVersionKind
|
||||
ResourceInfo metav1.APIResource
|
||||
ListResourceVersion string
|
||||
Objects []unstructured.Unstructured
|
||||
Error error
|
||||
}
|
||||
|
||||
type CacheRefreshEvent struct {
|
||||
GVK schema.GroupVersionKind
|
||||
ResourceVersion string
|
||||
Objects []unstructured.Unstructured
|
||||
}
|
||||
|
||||
type WatchEvent struct {
|
||||
WatchEvent *watch.Event
|
||||
CacheRefresh *CacheRefreshEvent
|
||||
}
|
||||
|
||||
type CachedVersionSource func(gk schema.GroupKind) (string, error)
|
||||
|
||||
type Kubectl interface {
|
||||
ApplyResource(config *rest.Config, obj *unstructured.Unstructured, namespace string, dryRun, force bool) (string, error)
|
||||
ConvertToVersion(obj *unstructured.Unstructured, group, version string) (*unstructured.Unstructured, error)
|
||||
DeleteResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string, forceDelete bool) error
|
||||
GetResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string) (*unstructured.Unstructured, error)
|
||||
PatchResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string, patchType types.PatchType, patchBytes []byte) (*unstructured.Unstructured, error)
|
||||
WatchResources(ctx context.Context, config *rest.Config, resourceFilter ResourceFilter, versionSource CachedVersionSource) (chan WatchEvent, error)
|
||||
GetResources(config *rest.Config, resourceFilter ResourceFilter, namespace string) (chan ResourcesBatch, error)
|
||||
GetAPIResources(config *rest.Config) ([]*metav1.APIResourceList, error)
|
||||
GetAPIResources(config *rest.Config, resourceFilter ResourceFilter) ([]APIResourceInfo, error)
|
||||
}
|
||||
|
||||
type KubectlCmd struct{}
|
||||
|
||||
func (k KubectlCmd) GetAPIResources(config *rest.Config) ([]*metav1.APIResourceList, error) {
|
||||
type APIResourceInfo struct {
|
||||
GroupKind schema.GroupKind
|
||||
Meta metav1.APIResource
|
||||
Interface dynamic.ResourceInterface
|
||||
}
|
||||
|
||||
type filterFunc func(apiResource *metav1.APIResource) bool
|
||||
|
||||
func filterAPIResources(config *rest.Config, resourceFilter ResourceFilter, filter filterFunc, namespace string) ([]APIResourceInfo, error) {
|
||||
dynamicIf, err := dynamic.NewForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
disco, err := discovery.NewDiscoveryClientForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return disco.ServerResources()
|
||||
}
|
||||
|
||||
// GetResources returns all kubernetes resources
|
||||
func (k KubectlCmd) GetResources(config *rest.Config, resourceFilter ResourceFilter, namespace string) (chan ResourcesBatch, error) {
|
||||
res := make(chan ResourcesBatch)
|
||||
|
||||
listSupported := func(apiResource *metav1.APIResource) bool {
|
||||
return isSupportedVerb(apiResource, listVerb)
|
||||
}
|
||||
apiResIfs, err := filterAPIResources(config, resourceFilter, listSupported, namespace)
|
||||
serverResources, err := disco.ServerPreferredResources()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if len(serverResources) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
log.Warnf("Partial success when performing preferred resource discovery: %v", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(res)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(apiResIfs))
|
||||
for _, apiResIf := range apiResIfs {
|
||||
go func(apiResIf apiResourceInterface) {
|
||||
defer wg.Done()
|
||||
batch := ResourcesBatch{
|
||||
GVK: apiResIf.groupVersion.WithKind(apiResIf.apiResource.Kind),
|
||||
ResourceInfo: apiResIf.apiResource,
|
||||
}
|
||||
list, err := apiResIf.resourceIf.List(metav1.ListOptions{})
|
||||
apiResIfs := make([]APIResourceInfo, 0)
|
||||
for _, apiResourcesList := range serverResources {
|
||||
gv, err := schema.ParseGroupVersion(apiResourcesList.GroupVersion)
|
||||
if err != nil {
|
||||
gv = schema.GroupVersion{}
|
||||
}
|
||||
for _, apiResource := range apiResourcesList.APIResources {
|
||||
if resourceFilter.IsExcludedResource(gv.Group, apiResource.Kind, config.Host) {
|
||||
continue
|
||||
}
|
||||
if _, ok := isObsoleteExtensionsGroupKind(gv.Group, apiResource.Kind); ok {
|
||||
continue
|
||||
}
|
||||
if filter(&apiResource) {
|
||||
resource := ToGroupVersionResource(apiResourcesList.GroupVersion, &apiResource)
|
||||
resourceIf := ToResourceInterface(dynamicIf, &apiResource, resource, namespace)
|
||||
gv, err := schema.ParseGroupVersion(apiResourcesList.GroupVersion)
|
||||
if err != nil {
|
||||
if !apierr.IsNotFound(err) {
|
||||
batch.Error = err
|
||||
res <- batch
|
||||
}
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
batch.ListResourceVersion = list.GetResourceVersion()
|
||||
batch.Objects = list.Items
|
||||
res <- batch
|
||||
}(apiResIf)
|
||||
apiResIf := APIResourceInfo{
|
||||
GroupKind: schema.GroupKind{Group: gv.Group, Kind: apiResource.Kind},
|
||||
Meta: apiResource,
|
||||
Interface: resourceIf,
|
||||
}
|
||||
apiResIfs = append(apiResIfs, apiResIf)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
return res, nil
|
||||
}
|
||||
return apiResIfs, nil
|
||||
}
|
||||
|
||||
const watchResourcesRetryTimeout = 1 * time.Second
|
||||
|
||||
// WatchResources watches all the existing resources in the cluster provided by the config. Method retries watch with the most recent resource version stored in cache.
|
||||
// The WatchResources returns channel which container either watch event with updated resource info or new list of resources if cached resource version had expired.
|
||||
func (k KubectlCmd) WatchResources(
|
||||
ctx context.Context,
|
||||
config *rest.Config,
|
||||
resourceFilter ResourceFilter,
|
||||
versionSource CachedVersionSource,
|
||||
) (chan WatchEvent, error) {
|
||||
watchSupported := func(apiResource *metav1.APIResource) bool {
|
||||
return isSupportedVerb(apiResource, watchVerb)
|
||||
// isSupportedVerb returns whether or not a APIResource supports a specific verb
|
||||
func isSupportedVerb(apiResource *metav1.APIResource, verb string) bool {
|
||||
for _, v := range apiResource.Verbs {
|
||||
if v == verb {
|
||||
return true
|
||||
}
|
||||
}
|
||||
log.Infof("Start watching for resources changes with in cluster %s", config.Host)
|
||||
apiResIfs, err := filterAPIResources(config, resourceFilter, watchSupported, "")
|
||||
return false
|
||||
}
|
||||
|
||||
func (k KubectlCmd) GetAPIResources(config *rest.Config, resourceFilter ResourceFilter) ([]APIResourceInfo, error) {
|
||||
apiResIfs, err := filterAPIResources(config, resourceFilter, func(apiResource *metav1.APIResource) bool {
|
||||
return isSupportedVerb(apiResource, listVerb) && isSupportedVerb(apiResource, watchVerb)
|
||||
}, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ch := make(chan WatchEvent)
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(apiResIfs))
|
||||
for _, a := range apiResIfs {
|
||||
go func(apiResIf apiResourceInterface) {
|
||||
defer wg.Done()
|
||||
|
||||
util.RetryUntilSucceed(func() (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
message := fmt.Sprintf("Recovered from panic: %+v\n%s", r, debug.Stack())
|
||||
log.Error(message)
|
||||
err = errors.New(message)
|
||||
}
|
||||
}()
|
||||
watchCh := WatchWithRetry(ctx, func() (i watch.Interface, e error) {
|
||||
gvk := apiResIf.groupVersion.WithKind(apiResIf.apiResource.Kind)
|
||||
resourceVersion, err := versionSource(gvk.GroupKind())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w, err := apiResIf.resourceIf.Watch(metav1.ListOptions{
|
||||
ResourceVersion: resourceVersion,
|
||||
})
|
||||
if apierr.IsGone(err) {
|
||||
log.Infof("Resource version of %s has expired at cluster %s, reloading list", gvk, config.Host)
|
||||
list, err := apiResIf.resourceIf.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ch <- WatchEvent{
|
||||
CacheRefresh: &CacheRefreshEvent{
|
||||
GVK: gvk,
|
||||
ResourceVersion: list.GetResourceVersion(),
|
||||
Objects: list.Items,
|
||||
},
|
||||
}
|
||||
return apiResIf.resourceIf.Watch(metav1.ListOptions{ResourceVersion: list.GetResourceVersion()})
|
||||
}
|
||||
return w, err
|
||||
})
|
||||
for next := range watchCh {
|
||||
if next.Error != nil {
|
||||
return next.Error
|
||||
}
|
||||
ch <- WatchEvent{
|
||||
WatchEvent: &watch.Event{
|
||||
Object: next.Object,
|
||||
Type: next.Type,
|
||||
},
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, fmt.Sprintf("watch resources %s %s/%s", config.Host, apiResIf.groupVersion, apiResIf.apiResource.Kind), ctx, watchResourcesRetryTimeout)
|
||||
}(a)
|
||||
}
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
log.Infof("Stop watching for resources changes with in cluster %s", config.Host)
|
||||
}()
|
||||
return ch, nil
|
||||
return apiResIfs, err
|
||||
}
|
||||
|
||||
// GetResource returns resource
|
||||
|
||||
@@ -75,6 +75,10 @@ func (k *ResourceKey) String() string {
|
||||
return fmt.Sprintf("%s/%s/%s/%s", k.Group, k.Kind, k.Namespace, k.Name)
|
||||
}
|
||||
|
||||
func (k ResourceKey) GroupKind() schema.GroupKind {
|
||||
return schema.GroupKind{Group: k.Group, Kind: k.Kind}
|
||||
}
|
||||
|
||||
func isObsoleteExtensionsGroupKind(group string, kind string) (string, bool) {
|
||||
if group == "extensions" {
|
||||
newGroup, ok := obsoleteExtensionsKinds[kind]
|
||||
@@ -246,72 +250,6 @@ func IsCRD(obj *unstructured.Unstructured) bool {
|
||||
return IsCRDGroupVersionKind(obj.GroupVersionKind())
|
||||
}
|
||||
|
||||
type apiResourceInterface struct {
|
||||
groupVersion schema.GroupVersion
|
||||
apiResource metav1.APIResource
|
||||
resourceIf dynamic.ResourceInterface
|
||||
}
|
||||
|
||||
type filterFunc func(apiResource *metav1.APIResource) bool
|
||||
|
||||
func filterAPIResources(config *rest.Config, resourceFilter ResourceFilter, filter filterFunc, namespace string) ([]apiResourceInterface, error) {
|
||||
dynamicIf, err := dynamic.NewForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
disco, err := discovery.NewDiscoveryClientForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serverResources, err := disco.ServerPreferredResources()
|
||||
if err != nil {
|
||||
if len(serverResources) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
log.Warnf("Partial success when performing preferred resource discovery: %v", err)
|
||||
}
|
||||
apiResIfs := make([]apiResourceInterface, 0)
|
||||
for _, apiResourcesList := range serverResources {
|
||||
gv, err := schema.ParseGroupVersion(apiResourcesList.GroupVersion)
|
||||
if err != nil {
|
||||
gv = schema.GroupVersion{}
|
||||
}
|
||||
if resourceFilter.IsExcludedResource(gv.Group, apiResourcesList.Kind, config.Host) {
|
||||
continue
|
||||
}
|
||||
for _, apiResource := range apiResourcesList.APIResources {
|
||||
if _, ok := isObsoleteExtensionsGroupKind(gv.Group, apiResource.Kind); ok || gv.Group == "" && apiResource.Kind == "Event" {
|
||||
continue
|
||||
}
|
||||
if filter(&apiResource) {
|
||||
resource := ToGroupVersionResource(apiResourcesList.GroupVersion, &apiResource)
|
||||
resourceIf := ToResourceInterface(dynamicIf, &apiResource, resource, namespace)
|
||||
gv, err := schema.ParseGroupVersion(apiResourcesList.GroupVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
apiResIf := apiResourceInterface{
|
||||
groupVersion: gv,
|
||||
apiResource: apiResource,
|
||||
resourceIf: resourceIf,
|
||||
}
|
||||
apiResIfs = append(apiResIfs, apiResIf)
|
||||
}
|
||||
}
|
||||
}
|
||||
return apiResIfs, nil
|
||||
}
|
||||
|
||||
// isSupportedVerb returns whether or not a APIResource supports a specific verb
|
||||
func isSupportedVerb(apiResource *metav1.APIResource, verb string) bool {
|
||||
for _, v := range apiResource.Verbs {
|
||||
if v == verb {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// See: https://github.com/ksonnet/ksonnet/blob/master/utils/client.go
|
||||
func ServerResourceForGroupVersionKind(disco discovery.DiscoveryInterface, gvk schema.GroupVersionKind) (*metav1.APIResource, error) {
|
||||
resources, err := disco.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
package kubetest
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/rest"
|
||||
|
||||
"github.com/argoproj/argo-cd/util/kube"
|
||||
@@ -18,27 +16,15 @@ type KubectlOutput struct {
|
||||
}
|
||||
|
||||
type MockKubectlCmd struct {
|
||||
APIResources []*v1.APIResourceList
|
||||
Resources []kube.ResourcesBatch
|
||||
APIResources []kube.APIResourceInfo
|
||||
Commands map[string]KubectlOutput
|
||||
Events chan kube.WatchEvent
|
||||
Events chan watch.Event
|
||||
}
|
||||
|
||||
func (k MockKubectlCmd) GetAPIResources(config *rest.Config) ([]*v1.APIResourceList, error) {
|
||||
func (k MockKubectlCmd) GetAPIResources(config *rest.Config, resourceFilter kube.ResourceFilter) ([]kube.APIResourceInfo, error) {
|
||||
return k.APIResources, nil
|
||||
}
|
||||
|
||||
func (k MockKubectlCmd) GetResources(config *rest.Config, resourceFilter kube.ResourceFilter, namespace string) (chan kube.ResourcesBatch, error) {
|
||||
res := make(chan kube.ResourcesBatch)
|
||||
go func() {
|
||||
defer close(res)
|
||||
for i := range k.Resources {
|
||||
res <- k.Resources[i]
|
||||
}
|
||||
}()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (k MockKubectlCmd) GetResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string) (*unstructured.Unstructured, error) {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -47,12 +33,6 @@ func (k MockKubectlCmd) PatchResource(config *rest.Config, gvk schema.GroupVersi
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (k MockKubectlCmd) WatchResources(
|
||||
ctx context.Context, config *rest.Config, resourceFilter kube.ResourceFilter, versionSource kube.CachedVersionSource) (chan kube.WatchEvent, error) {
|
||||
|
||||
return k.Events, nil
|
||||
}
|
||||
|
||||
func (k MockKubectlCmd) DeleteResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string, forceDelete bool) error {
|
||||
command, ok := k.Commands[name]
|
||||
if !ok {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
argoexec "github.com/argoproj/pkg/exec"
|
||||
@@ -170,6 +171,9 @@ func getImageParameters(objs []*unstructured.Unstructured) []*v1alpha1.Kustomize
|
||||
Value: version,
|
||||
})
|
||||
}
|
||||
sort.Slice(params, func(i, j int) bool {
|
||||
return params[i].Name < params[j].Name
|
||||
})
|
||||
return params
|
||||
}
|
||||
|
||||
|
||||
@@ -62,11 +62,6 @@ type ClientApp struct {
|
||||
cache *cache.Cache
|
||||
}
|
||||
|
||||
type appState struct {
|
||||
// ReturnURL is the URL in which to redirect a user back to after completing an OAuth2 login
|
||||
ReturnURL string `json:"returnURL"`
|
||||
}
|
||||
|
||||
// NewClientApp will register the Argo CD client app (either via Dex or external OIDC) and return an
|
||||
// object which has HTTP handlers for handling the HTTP responses for login and callback
|
||||
func NewClientApp(settings *settings.ArgoCDSettings, cache *cache.Cache, dexServerAddr string) (*ClientApp, error) {
|
||||
|
||||
@@ -130,7 +130,10 @@ func RunAllAsync(count int, action func(i int) error) (err error) {
|
||||
wg.Add(1)
|
||||
go func(index int) {
|
||||
defer wg.Done()
|
||||
err = action(index)
|
||||
actionErr := action(index)
|
||||
if actionErr != nil {
|
||||
err = actionErr
|
||||
}
|
||||
}(i)
|
||||
if err != nil {
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user