mirror of
https://github.com/argoproj/argo-cd.git
synced 2026-02-20 01:28:45 +01:00
1397 lines
47 KiB
Go
1397 lines
47 KiB
Go
package cache
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime/debug"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
"golang.org/x/sync/semaphore"
|
|
authorizationv1 "k8s.io/api/authorization/v1"
|
|
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/managedfields"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/kubernetes"
|
|
authType1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/pager"
|
|
watchutil "k8s.io/client-go/tools/watch"
|
|
"k8s.io/client-go/util/retry"
|
|
"k8s.io/klog/v2/textlogger"
|
|
"k8s.io/kubectl/pkg/util/openapi"
|
|
|
|
"github.com/argoproj/gitops-engine/pkg/utils/kube"
|
|
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
|
|
)
|
|
|
|
const (
|
|
watchResourcesRetryTimeout = 1 * time.Second
|
|
ClusterRetryTimeout = 10 * time.Second
|
|
|
|
// default duration before we invalidate entire cluster cache. Can be set to 0 to never invalidate cache
|
|
defaultClusterResyncTimeout = 24 * time.Hour
|
|
|
|
// default duration before restarting individual resource watch
|
|
defaultWatchResyncTimeout = 10 * time.Minute
|
|
|
|
// Same page size as in k8s.io/client-go/tools/pager/pager.go
|
|
defaultListPageSize = 500
|
|
// Prefetch only a single page
|
|
defaultListPageBufferSize = 1
|
|
// listSemaphore is used to limit the number of concurrent memory consuming operations on the
|
|
// k8s list queries results.
|
|
// Limit is required to avoid memory spikes during cache initialization.
|
|
// The default limit of 50 is chosen based on experiments.
|
|
defaultListSemaphoreWeight = 50
|
|
// defaultEventProcessingInterval is the default interval for processing events
|
|
defaultEventProcessingInterval = 100 * time.Millisecond
|
|
)
|
|
|
|
const (
|
|
// RespectRbacDisabled default value for respectRbac
|
|
RespectRbacDisabled = iota
|
|
// RespectRbacNormal checks only api response for forbidden/unauthorized errors
|
|
RespectRbacNormal
|
|
// RespectRbacStrict checks both api response for forbidden/unauthorized errors and SelfSubjectAccessReview
|
|
RespectRbacStrict
|
|
)
|
|
|
|
type apiMeta struct {
|
|
namespaced bool
|
|
// watchCancel stops the watch of all resources for this API. This gets called when the cache is invalidated or when
|
|
// the watched API ceases to exist (e.g. a CRD gets deleted).
|
|
watchCancel context.CancelFunc
|
|
}
|
|
|
|
type eventMeta struct {
|
|
event watch.EventType
|
|
un *unstructured.Unstructured
|
|
}
|
|
|
|
// ClusterInfo holds cluster cache stats
|
|
type ClusterInfo struct {
|
|
// Server holds cluster API server URL
|
|
Server string
|
|
// K8SVersion holds Kubernetes version
|
|
K8SVersion string
|
|
// ResourcesCount holds number of observed Kubernetes resources
|
|
ResourcesCount int
|
|
// APIsCount holds number of observed Kubernetes API count
|
|
APIsCount int
|
|
// LastCacheSyncTime holds time of most recent cache synchronization
|
|
LastCacheSyncTime *time.Time
|
|
// SyncError holds most recent cache synchronization error
|
|
SyncError error
|
|
// APIResources holds list of API resources supported by the cluster
|
|
APIResources []kube.APIResourceInfo
|
|
}
|
|
|
|
// OnEventHandler is a function that handles Kubernetes event
|
|
type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured)
|
|
|
|
// OnProcessEventsHandler handles process events event
|
|
type OnProcessEventsHandler func(duration time.Duration, processedEventsNumber int)
|
|
|
|
// OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache
|
|
type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info any, cacheManifest bool)
|
|
|
|
// OnResourceUpdatedHandler handlers resource update event
|
|
type (
|
|
OnResourceUpdatedHandler func(newRes *Resource, oldRes *Resource, namespaceResources map[kube.ResourceKey]*Resource)
|
|
Unsubscribe func()
|
|
)
|
|
|
|
type ClusterCache interface {
|
|
// EnsureSynced checks cache state and synchronizes it if necessary
|
|
EnsureSynced() error
|
|
// GetServerVersion returns observed cluster version
|
|
GetServerVersion() string
|
|
// GetAPIResources returns information about observed API resources
|
|
GetAPIResources() []kube.APIResourceInfo
|
|
// GetOpenAPISchema returns open API schema of supported API resources
|
|
GetOpenAPISchema() openapi.Resources
|
|
// GetGVKParser returns a parser able to build a TypedValue used in
|
|
// structured merge diffs.
|
|
GetGVKParser() *managedfields.GvkParser
|
|
// Invalidate cache and executes callback that optionally might update cache settings
|
|
Invalidate(opts ...UpdateSettingsFunc)
|
|
// FindResources returns resources that matches given list of predicates from specified namespace or everywhere if specified namespace is empty
|
|
FindResources(namespace string, predicates ...func(r *Resource) bool) map[kube.ResourceKey]*Resource
|
|
// IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree.
|
|
// The action callback returns true if iteration should continue and false otherwise.
|
|
IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool)
|
|
// IsNamespaced answers if specified group/kind is a namespaced resource API or not
|
|
IsNamespaced(gk schema.GroupKind) (bool, error)
|
|
// GetManagedLiveObjs helps finding matching live K8S resources for a given resources list.
|
|
// The function returns all resources from cache for those `isManaged` function returns true and resources
|
|
// specified in targetObjs list.
|
|
GetManagedLiveObjs(targetObjs []*unstructured.Unstructured, isManaged func(r *Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error)
|
|
// GetClusterInfo returns cluster cache statistics
|
|
GetClusterInfo() ClusterInfo
|
|
// OnResourceUpdated register event handler that is executed every time when resource get's updated in the cache
|
|
OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
|
|
// OnEvent register event handler that is executed every time when new K8S event received
|
|
OnEvent(handler OnEventHandler) Unsubscribe
|
|
// OnProcessEventsHandler register event handler that is executed every time when events were processed
|
|
OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe
|
|
}
|
|
|
|
type WeightedSemaphore interface {
|
|
Acquire(ctx context.Context, n int64) error
|
|
TryAcquire(n int64) bool
|
|
Release(n int64)
|
|
}
|
|
|
|
type ListRetryFunc func(err error) bool
|
|
|
|
// NewClusterCache creates new instance of cluster cache
|
|
func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache {
|
|
log := textlogger.NewLogger(textlogger.NewConfig())
|
|
cache := &clusterCache{
|
|
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
|
|
apisMeta: make(map[schema.GroupKind]*apiMeta),
|
|
eventMetaCh: nil,
|
|
listPageSize: defaultListPageSize,
|
|
listPageBufferSize: defaultListPageBufferSize,
|
|
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
|
|
resources: make(map[kube.ResourceKey]*Resource),
|
|
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
|
|
config: config,
|
|
kubectl: &kube.KubectlCmd{
|
|
Log: log,
|
|
Tracer: tracing.NopTracer{},
|
|
},
|
|
syncStatus: clusterCacheSync{
|
|
resyncTimeout: defaultClusterResyncTimeout,
|
|
syncTime: nil,
|
|
},
|
|
watchResyncTimeout: defaultWatchResyncTimeout,
|
|
clusterSyncRetryTimeout: ClusterRetryTimeout,
|
|
eventProcessingInterval: defaultEventProcessingInterval,
|
|
resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{},
|
|
eventHandlers: map[uint64]OnEventHandler{},
|
|
processEventsHandlers: map[uint64]OnProcessEventsHandler{},
|
|
log: log,
|
|
listRetryLimit: 1,
|
|
listRetryUseBackoff: false,
|
|
listRetryFunc: ListRetryFuncNever,
|
|
}
|
|
for i := range opts {
|
|
opts[i](cache)
|
|
}
|
|
return cache
|
|
}
|
|
|
|
type clusterCache struct {
|
|
syncStatus clusterCacheSync
|
|
|
|
apisMeta map[schema.GroupKind]*apiMeta
|
|
batchEventsProcessing bool
|
|
eventMetaCh chan eventMeta
|
|
serverVersion string
|
|
apiResources []kube.APIResourceInfo
|
|
// namespacedResources is a simple map which indicates a groupKind is namespaced
|
|
namespacedResources map[schema.GroupKind]bool
|
|
|
|
// maximum time we allow watches to run before relisting the group/kind and restarting the watch
|
|
watchResyncTimeout time.Duration
|
|
// sync retry timeout for cluster when sync error happens
|
|
clusterSyncRetryTimeout time.Duration
|
|
// ticker interval for events processing
|
|
eventProcessingInterval time.Duration
|
|
|
|
// size of a page for list operations pager.
|
|
listPageSize int64
|
|
// number of pages to prefetch for list pager.
|
|
listPageBufferSize int32
|
|
listSemaphore WeightedSemaphore
|
|
|
|
// retry options for list operations
|
|
listRetryLimit int32
|
|
listRetryUseBackoff bool
|
|
listRetryFunc ListRetryFunc
|
|
|
|
// lock is a rw lock which protects the fields of clusterInfo
|
|
lock sync.RWMutex
|
|
resources map[kube.ResourceKey]*Resource
|
|
nsIndex map[string]map[kube.ResourceKey]*Resource
|
|
|
|
kubectl kube.Kubectl
|
|
log logr.Logger
|
|
config *rest.Config
|
|
namespaces []string
|
|
clusterResources bool
|
|
settings Settings
|
|
|
|
handlersLock sync.Mutex
|
|
handlerKey uint64
|
|
populateResourceInfoHandler OnPopulateResourceInfoHandler
|
|
resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler
|
|
eventHandlers map[uint64]OnEventHandler
|
|
processEventsHandlers map[uint64]OnProcessEventsHandler
|
|
openAPISchema openapi.Resources
|
|
gvkParser *managedfields.GvkParser
|
|
|
|
respectRBAC int
|
|
}
|
|
|
|
type clusterCacheSync struct {
|
|
// When using this struct:
|
|
// 1) 'lock' mutex should be acquired when reading/writing from fields of this struct.
|
|
// 2) The parent 'clusterCache.lock' does NOT need to be owned to r/w from fields of this struct (if it is owned, that is fine, but see below)
|
|
// 3) To prevent deadlocks, do not acquire parent 'clusterCache.lock' after acquiring this lock; if you need both locks, always acquire the parent lock first
|
|
lock sync.Mutex
|
|
syncTime *time.Time
|
|
syncError error
|
|
resyncTimeout time.Duration
|
|
}
|
|
|
|
// ListRetryFuncNever never retries on errors
|
|
func ListRetryFuncNever(_ error) bool {
|
|
return false
|
|
}
|
|
|
|
// ListRetryFuncAlways always retries on errors
|
|
func ListRetryFuncAlways(_ error) bool {
|
|
return true
|
|
}
|
|
|
|
// OnResourceUpdated register event handler that is executed every time when resource get's updated in the cache
|
|
func (c *clusterCache) OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe {
|
|
c.handlersLock.Lock()
|
|
defer c.handlersLock.Unlock()
|
|
key := c.handlerKey
|
|
c.handlerKey++
|
|
c.resourceUpdatedHandlers[key] = handler
|
|
return func() {
|
|
c.handlersLock.Lock()
|
|
defer c.handlersLock.Unlock()
|
|
delete(c.resourceUpdatedHandlers, key)
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) getResourceUpdatedHandlers() []OnResourceUpdatedHandler {
|
|
c.handlersLock.Lock()
|
|
defer c.handlersLock.Unlock()
|
|
var handlers []OnResourceUpdatedHandler
|
|
for _, h := range c.resourceUpdatedHandlers {
|
|
handlers = append(handlers, h)
|
|
}
|
|
return handlers
|
|
}
|
|
|
|
// OnEvent register event handler that is executed every time when new K8S event received
|
|
func (c *clusterCache) OnEvent(handler OnEventHandler) Unsubscribe {
|
|
c.handlersLock.Lock()
|
|
defer c.handlersLock.Unlock()
|
|
key := c.handlerKey
|
|
c.handlerKey++
|
|
c.eventHandlers[key] = handler
|
|
return func() {
|
|
c.handlersLock.Lock()
|
|
defer c.handlersLock.Unlock()
|
|
delete(c.eventHandlers, key)
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) getEventHandlers() []OnEventHandler {
|
|
c.handlersLock.Lock()
|
|
defer c.handlersLock.Unlock()
|
|
handlers := make([]OnEventHandler, 0, len(c.eventHandlers))
|
|
for _, h := range c.eventHandlers {
|
|
handlers = append(handlers, h)
|
|
}
|
|
return handlers
|
|
}
|
|
|
|
// OnProcessEventsHandler register event handler that is executed every time when events were processed
|
|
func (c *clusterCache) OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe {
|
|
c.handlersLock.Lock()
|
|
defer c.handlersLock.Unlock()
|
|
key := c.handlerKey
|
|
c.handlerKey++
|
|
c.processEventsHandlers[key] = handler
|
|
return func() {
|
|
c.handlersLock.Lock()
|
|
defer c.handlersLock.Unlock()
|
|
delete(c.processEventsHandlers, key)
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) getProcessEventsHandlers() []OnProcessEventsHandler {
|
|
c.handlersLock.Lock()
|
|
defer c.handlersLock.Unlock()
|
|
handlers := make([]OnProcessEventsHandler, 0, len(c.processEventsHandlers))
|
|
for _, h := range c.processEventsHandlers {
|
|
handlers = append(handlers, h)
|
|
}
|
|
return handlers
|
|
}
|
|
|
|
// GetServerVersion returns observed cluster version
|
|
func (c *clusterCache) GetServerVersion() string {
|
|
return c.serverVersion
|
|
}
|
|
|
|
// GetAPIResources returns information about observed API resources
|
|
// This method is called frequently during reconciliation to pass API resource info to `helm template`
|
|
// NOTE: we do not provide any consistency guarantees about the returned list. The list might be
|
|
// updated in place (anytime new CRDs are introduced or removed). If necessary, a separate method
|
|
// would need to be introduced to return a copy of the list so it can be iterated consistently.
|
|
func (c *clusterCache) GetAPIResources() []kube.APIResourceInfo {
|
|
return c.apiResources
|
|
}
|
|
|
|
// GetOpenAPISchema returns open API schema of supported API resources
|
|
func (c *clusterCache) GetOpenAPISchema() openapi.Resources {
|
|
return c.openAPISchema
|
|
}
|
|
|
|
// GetGVKParser returns a parser able to build a TypedValue used in
|
|
// structured merge diffs.
|
|
func (c *clusterCache) GetGVKParser() *managedfields.GvkParser {
|
|
return c.gvkParser
|
|
}
|
|
|
|
func (c *clusterCache) appendAPIResource(info kube.APIResourceInfo) {
|
|
exists := false
|
|
for i := range c.apiResources {
|
|
if c.apiResources[i].GroupKind == info.GroupKind && c.apiResources[i].GroupVersionResource.Version == info.GroupVersionResource.Version {
|
|
exists = true
|
|
break
|
|
}
|
|
}
|
|
if !exists {
|
|
c.apiResources = append(c.apiResources, info)
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) deleteAPIResource(info kube.APIResourceInfo) {
|
|
for i := range c.apiResources {
|
|
if c.apiResources[i].GroupKind == info.GroupKind && c.apiResources[i].GroupVersionResource.Version == info.GroupVersionResource.Version {
|
|
c.apiResources[i] = c.apiResources[len(c.apiResources)-1]
|
|
c.apiResources = c.apiResources[:len(c.apiResources)-1]
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) replaceResourceCache(gk schema.GroupKind, resources []*Resource, ns string) {
|
|
objByKey := make(map[kube.ResourceKey]*Resource)
|
|
for i := range resources {
|
|
objByKey[resources[i].ResourceKey()] = resources[i]
|
|
}
|
|
|
|
// update existing nodes
|
|
for i := range resources {
|
|
res := resources[i]
|
|
oldRes := c.resources[res.ResourceKey()]
|
|
if oldRes == nil || oldRes.ResourceVersion != res.ResourceVersion {
|
|
c.onNodeUpdated(oldRes, res)
|
|
}
|
|
}
|
|
|
|
for key := range c.resources {
|
|
if key.Kind != gk.Kind || key.Group != gk.Group || ns != "" && key.Namespace != ns {
|
|
continue
|
|
}
|
|
|
|
if _, ok := objByKey[key]; !ok {
|
|
c.onNodeRemoved(key)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) newResource(un *unstructured.Unstructured) *Resource {
|
|
ownerRefs, isInferredParentOf := c.resolveResourceReferences(un)
|
|
|
|
cacheManifest := false
|
|
var info any
|
|
if c.populateResourceInfoHandler != nil {
|
|
info, cacheManifest = c.populateResourceInfoHandler(un, len(ownerRefs) == 0)
|
|
}
|
|
var creationTimestamp *metav1.Time
|
|
ct := un.GetCreationTimestamp()
|
|
if !ct.IsZero() {
|
|
creationTimestamp = &ct
|
|
}
|
|
resource := &Resource{
|
|
ResourceVersion: un.GetResourceVersion(),
|
|
Ref: kube.GetObjectRef(un),
|
|
OwnerRefs: ownerRefs,
|
|
Info: info,
|
|
CreationTimestamp: creationTimestamp,
|
|
isInferredParentOf: isInferredParentOf,
|
|
}
|
|
if cacheManifest {
|
|
resource.Resource = un
|
|
}
|
|
|
|
return resource
|
|
}
|
|
|
|
func (c *clusterCache) setNode(n *Resource) {
|
|
key := n.ResourceKey()
|
|
c.resources[key] = n
|
|
ns, ok := c.nsIndex[key.Namespace]
|
|
if !ok {
|
|
ns = make(map[kube.ResourceKey]*Resource)
|
|
c.nsIndex[key.Namespace] = ns
|
|
}
|
|
ns[key] = n
|
|
|
|
// update inferred parent references
|
|
if n.isInferredParentOf != nil || mightHaveInferredOwner(n) {
|
|
for k, v := range ns {
|
|
// update child resource owner references
|
|
if n.isInferredParentOf != nil && mightHaveInferredOwner(v) {
|
|
v.setOwnerRef(n.toOwnerRef(), n.isInferredParentOf(k))
|
|
}
|
|
if mightHaveInferredOwner(n) && v.isInferredParentOf != nil {
|
|
n.setOwnerRef(v.toOwnerRef(), v.isInferredParentOf(n.ResourceKey()))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Invalidate cache and executes callback that optionally might update cache settings
|
|
func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
c.syncStatus.lock.Lock()
|
|
c.syncStatus.syncTime = nil
|
|
c.syncStatus.lock.Unlock()
|
|
|
|
for i := range c.apisMeta {
|
|
c.apisMeta[i].watchCancel()
|
|
}
|
|
for i := range opts {
|
|
opts[i](c)
|
|
}
|
|
|
|
if c.batchEventsProcessing {
|
|
c.invalidateEventMeta()
|
|
}
|
|
c.apisMeta = nil
|
|
c.namespacedResources = nil
|
|
c.log.Info("Invalidated cluster")
|
|
}
|
|
|
|
// clusterCacheSync's lock should be held before calling this method
|
|
func (syncStatus *clusterCacheSync) synced(clusterRetryTimeout time.Duration) bool {
|
|
syncTime := syncStatus.syncTime
|
|
|
|
if syncTime == nil {
|
|
return false
|
|
}
|
|
if syncStatus.syncError != nil {
|
|
return time.Now().Before(syncTime.Add(clusterRetryTimeout))
|
|
}
|
|
if syncStatus.resyncTimeout == 0 {
|
|
// cluster resync timeout has been disabled
|
|
return true
|
|
}
|
|
return time.Now().Before(syncTime.Add(syncStatus.resyncTimeout))
|
|
}
|
|
|
|
func (c *clusterCache) stopWatching(gk schema.GroupKind, ns string) {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
if info, ok := c.apisMeta[gk]; ok {
|
|
info.watchCancel()
|
|
delete(c.apisMeta, gk)
|
|
c.replaceResourceCache(gk, nil, ns)
|
|
c.log.Info(fmt.Sprintf("Stop watching: %s not found", gk))
|
|
}
|
|
}
|
|
|
|
// startMissingWatches lists supported cluster resources and starts watching for changes unless watch is already running
|
|
func (c *clusterCache) startMissingWatches() error {
|
|
apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get APIResources: %w", err)
|
|
}
|
|
client, err := c.kubectl.NewDynamicClient(c.config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create client: %w", err)
|
|
}
|
|
clientset, err := kubernetes.NewForConfig(c.config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create clientset: %w", err)
|
|
}
|
|
namespacedResources := make(map[schema.GroupKind]bool)
|
|
for i := range apis {
|
|
api := apis[i]
|
|
namespacedResources[api.GroupKind] = api.Meta.Namespaced
|
|
if _, ok := c.apisMeta[api.GroupKind]; !ok {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}
|
|
|
|
err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
|
|
resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns, false) // don't lock here, we are already in a lock before startMissingWatches is called inside watchEvents
|
|
if err != nil && c.isRestrictedResource(err) {
|
|
keep := false
|
|
if c.respectRBAC == RespectRbacStrict {
|
|
k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
|
|
if permErr != nil {
|
|
return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
|
|
}
|
|
keep = k
|
|
}
|
|
// if we are not allowed to list the resource, remove it from the watch list
|
|
if !keep {
|
|
delete(c.apisMeta, api.GroupKind)
|
|
delete(namespacedResources, api.GroupKind)
|
|
return nil
|
|
}
|
|
}
|
|
go c.watchEvents(ctx, api, resClient, ns, resourceVersion)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
c.namespacedResources = namespacedResources
|
|
return nil
|
|
}
|
|
|
|
func runSynced(lock sync.Locker, action func() error) error {
|
|
lock.Lock()
|
|
defer lock.Unlock()
|
|
return action()
|
|
}
|
|
|
|
// listResources creates list pager and enforces number of concurrent list requests
|
|
// The callback should not wait on any locks that may be held by other callers.
|
|
func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*pager.ListPager) error) (string, error) {
|
|
if err := c.listSemaphore.Acquire(ctx, 1); err != nil {
|
|
return "", fmt.Errorf("failed to acquire list semaphore: %w", err)
|
|
}
|
|
defer c.listSemaphore.Release(1)
|
|
|
|
var retryCount int64
|
|
resourceVersion := ""
|
|
listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
|
|
var res *unstructured.UnstructuredList
|
|
var listRetry wait.Backoff
|
|
|
|
if c.listRetryUseBackoff {
|
|
listRetry = retry.DefaultBackoff
|
|
} else {
|
|
listRetry = retry.DefaultRetry
|
|
}
|
|
|
|
listRetry.Steps = int(c.listRetryLimit)
|
|
err := retry.OnError(listRetry, c.listRetryFunc, func() error {
|
|
var ierr error
|
|
res, ierr = resClient.List(ctx, opts)
|
|
if ierr != nil {
|
|
// Log out a retry
|
|
if c.listRetryLimit > 1 && c.listRetryFunc(ierr) {
|
|
retryCount++
|
|
c.log.Info(fmt.Sprintf("Error while listing resources: %v (try %d/%d)", ierr, retryCount, c.listRetryLimit))
|
|
}
|
|
//nolint:wrapcheck // wrap outside the retry
|
|
return ierr
|
|
}
|
|
resourceVersion = res.GetResourceVersion()
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return res, fmt.Errorf("failed to list resources: %w", err)
|
|
}
|
|
return res, nil
|
|
})
|
|
listPager.PageBufferSize = c.listPageBufferSize
|
|
listPager.PageSize = c.listPageSize
|
|
|
|
return resourceVersion, callback(listPager)
|
|
}
|
|
|
|
// loadInitialState loads the state of all the resources retrieved by the given resource client.
|
|
func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) {
|
|
var items []*Resource
|
|
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
|
|
return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
|
|
if un, ok := obj.(*unstructured.Unstructured); !ok {
|
|
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
|
|
} else {
|
|
items = append(items, c.newResource(un))
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
|
|
}
|
|
|
|
if lock {
|
|
return resourceVersion, runSynced(&c.lock, func() error {
|
|
c.replaceResourceCache(api.GroupKind, items, ns)
|
|
return nil
|
|
})
|
|
}
|
|
c.replaceResourceCache(api.GroupKind, items, ns)
|
|
return resourceVersion, nil
|
|
}
|
|
|
|
func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
|
|
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
err = fmt.Errorf("recovered from panic: %+v\n%s", r, debug.Stack())
|
|
}
|
|
}()
|
|
|
|
// load API initial state if no resource version provided
|
|
if resourceVersion == "" {
|
|
resourceVersion, err = c.loadInitialState(ctx, api, resClient, ns, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
w, err := watchutil.NewRetryWatcherWithContext(ctx, resourceVersion, &cache.ListWatch{
|
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
res, err := resClient.Watch(ctx, options)
|
|
if apierrors.IsNotFound(err) {
|
|
c.stopWatching(api.GroupKind, ns)
|
|
}
|
|
//nolint:wrapcheck // wrap outside the retry
|
|
return res, err
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create resource watcher: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
w.Stop()
|
|
resourceVersion = ""
|
|
}()
|
|
|
|
var watchResyncTimeoutCh <-chan time.Time
|
|
if c.watchResyncTimeout > 0 {
|
|
shouldResync := time.NewTimer(c.watchResyncTimeout)
|
|
defer shouldResync.Stop()
|
|
watchResyncTimeoutCh = shouldResync.C
|
|
}
|
|
|
|
for {
|
|
select {
|
|
// stop watching when parent context got cancelled
|
|
case <-ctx.Done():
|
|
return nil
|
|
|
|
// re-synchronize API state and restart watch periodically
|
|
case <-watchResyncTimeoutCh:
|
|
return fmt.Errorf("resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)
|
|
|
|
// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
|
|
case <-w.Done():
|
|
return fmt.Errorf("watch %s on %s has closed", api.GroupKind, c.config.Host)
|
|
|
|
case event, ok := <-w.ResultChan():
|
|
if !ok {
|
|
return fmt.Errorf("watch %s on %s has closed", api.GroupKind, c.config.Host)
|
|
}
|
|
|
|
obj, ok := event.Object.(*unstructured.Unstructured)
|
|
if !ok {
|
|
return fmt.Errorf("failed to convert to *unstructured.Unstructured: %v", event.Object)
|
|
}
|
|
|
|
c.recordEvent(event.Type, obj)
|
|
if kube.IsCRD(obj) {
|
|
var resources []kube.APIResourceInfo
|
|
crd := apiextensionsv1.CustomResourceDefinition{}
|
|
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &crd)
|
|
if err != nil {
|
|
c.log.Error(err, "Failed to extract CRD resources")
|
|
}
|
|
for _, v := range crd.Spec.Versions {
|
|
resources = append(resources, kube.APIResourceInfo{
|
|
GroupKind: schema.GroupKind{
|
|
Group: crd.Spec.Group, Kind: crd.Spec.Names.Kind,
|
|
},
|
|
GroupVersionResource: schema.GroupVersionResource{
|
|
Group: crd.Spec.Group, Version: v.Name, Resource: crd.Spec.Names.Plural,
|
|
},
|
|
Meta: metav1.APIResource{
|
|
Group: crd.Spec.Group,
|
|
SingularName: crd.Spec.Names.Singular,
|
|
Namespaced: crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
|
|
Name: crd.Spec.Names.Plural,
|
|
Kind: crd.Spec.Names.Singular,
|
|
Version: v.Name,
|
|
ShortNames: crd.Spec.Names.ShortNames,
|
|
},
|
|
})
|
|
}
|
|
|
|
if event.Type == watch.Deleted {
|
|
for i := range resources {
|
|
c.deleteAPIResource(resources[i])
|
|
}
|
|
} else {
|
|
c.log.Info("Updating Kubernetes APIs, watches, and Open API schemas due to CRD event", "eventType", event.Type, "groupKind", crd.GroupVersionKind().GroupKind().String())
|
|
// add new CRD's groupkind to c.apigroups
|
|
if event.Type == watch.Added {
|
|
for i := range resources {
|
|
c.appendAPIResource(resources[i])
|
|
}
|
|
}
|
|
err = runSynced(&c.lock, func() error {
|
|
return c.startMissingWatches()
|
|
})
|
|
if err != nil {
|
|
c.log.Error(err, "Failed to start missing watch")
|
|
}
|
|
}
|
|
err = runSynced(&c.lock, func() error {
|
|
openAPISchema, gvkParser, err := c.kubectl.LoadOpenAPISchema(c.config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load open api schema while handling CRD change: %w", err)
|
|
}
|
|
if gvkParser != nil {
|
|
c.gvkParser = gvkParser
|
|
}
|
|
c.openAPISchema = openAPISchema
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
c.log.Error(err, "Failed to reload open api schema")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// processApi processes all the resources for a given API. First we construct an API client for the given API. Then we
|
|
// call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace.
|
|
// If we're managing specific namespaces, we call the callback for each namespace.
|
|
func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, callback func(resClient dynamic.ResourceInterface, ns string) error) error {
|
|
resClient := client.Resource(api.GroupVersionResource)
|
|
switch {
|
|
// if manage whole cluster or resource is cluster level and cluster resources enabled
|
|
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
|
|
return callback(resClient, "")
|
|
// if manage some namespaces and resource is namespaced
|
|
case len(c.namespaces) != 0 && api.Meta.Namespaced:
|
|
for _, ns := range c.namespaces {
|
|
err := callback(resClient.Namespace(ns), ns)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// isRestrictedResource checks if the kube api call is unauthorized or forbidden
|
|
func (c *clusterCache) isRestrictedResource(err error) bool {
|
|
return c.respectRBAC != RespectRbacDisabled && (apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err))
|
|
}
|
|
|
|
// checkPermission runs a self subject access review to check if the controller has permissions to list the resource
|
|
func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface authType1.SelfSubjectAccessReviewInterface, api kube.APIResourceInfo) (keep bool, err error) {
|
|
sar := &authorizationv1.SelfSubjectAccessReview{
|
|
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
|
|
ResourceAttributes: &authorizationv1.ResourceAttributes{
|
|
Namespace: "*",
|
|
Verb: "list", // uses list verb to check for permissions
|
|
Resource: api.GroupVersionResource.Resource,
|
|
},
|
|
},
|
|
}
|
|
|
|
switch {
|
|
// if manage whole cluster or resource is cluster level and cluster resources enabled
|
|
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
|
|
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to create self subject access review: %w", err)
|
|
}
|
|
if resp != nil && resp.Status.Allowed {
|
|
return true, nil
|
|
}
|
|
// unsupported, remove from watch list
|
|
return false, nil
|
|
// if manage some namespaces and resource is namespaced
|
|
case len(c.namespaces) != 0 && api.Meta.Namespaced:
|
|
for _, ns := range c.namespaces {
|
|
sar.Spec.ResourceAttributes.Namespace = ns
|
|
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to create self subject access review: %w", err)
|
|
}
|
|
if resp != nil && resp.Status.Allowed {
|
|
return true, nil
|
|
}
|
|
// unsupported, remove from watch list
|
|
//nolint:staticcheck //FIXME
|
|
return false, nil
|
|
}
|
|
}
|
|
// checkPermission follows the same logic of determining namespace/cluster resource as the processApi function
|
|
// so if neither of the cases match it means the controller will not watch for it so it is safe to return true.
|
|
return true, nil
|
|
}
|
|
|
|
// sync retrieves the current state of the cluster and stores relevant information in the clusterCache fields.
|
|
//
|
|
// First we get some metadata from the cluster, like the server version, OpenAPI document, and the list of all API
|
|
// resources.
|
|
//
|
|
// Then we get a list of the preferred versions of all API resources which are to be monitored (it's possible to exclude
|
|
// resources from monitoring). We loop through those APIs asynchronously and for each API we list all resources. We also
|
|
// kick off a goroutine to watch the resources for that API and update the cache constantly.
|
|
//
|
|
// When this function exits, the cluster cache is up to date, and the appropriate resources are being watched for
|
|
// changes.
|
|
func (c *clusterCache) sync() error {
|
|
c.log.Info("Start syncing cluster")
|
|
|
|
for i := range c.apisMeta {
|
|
c.apisMeta[i].watchCancel()
|
|
}
|
|
|
|
if c.batchEventsProcessing {
|
|
c.invalidateEventMeta()
|
|
c.eventMetaCh = make(chan eventMeta)
|
|
}
|
|
|
|
c.apisMeta = make(map[schema.GroupKind]*apiMeta)
|
|
c.resources = make(map[kube.ResourceKey]*Resource)
|
|
c.namespacedResources = make(map[schema.GroupKind]bool)
|
|
config := c.config
|
|
version, err := c.kubectl.GetServerVersion(config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get server version: %w", err)
|
|
}
|
|
c.serverVersion = version
|
|
apiResources, err := c.kubectl.GetAPIResources(config, false, NewNoopSettings())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get api resources: %w", err)
|
|
}
|
|
c.apiResources = apiResources
|
|
|
|
openAPISchema, gvkParser, err := c.kubectl.LoadOpenAPISchema(config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load open api schema while syncing cluster cache: %w", err)
|
|
}
|
|
|
|
if gvkParser != nil {
|
|
c.gvkParser = gvkParser
|
|
}
|
|
|
|
c.openAPISchema = openAPISchema
|
|
|
|
apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get api resources: %w", err)
|
|
}
|
|
client, err := c.kubectl.NewDynamicClient(c.config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create client: %w", err)
|
|
}
|
|
clientset, err := kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create clientset: %w", err)
|
|
}
|
|
|
|
if c.batchEventsProcessing {
|
|
go c.processEvents()
|
|
}
|
|
|
|
// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
|
|
lock := sync.Mutex{}
|
|
err = kube.RunAllAsync(len(apis), func(i int) error {
|
|
api := apis[i]
|
|
|
|
lock.Lock()
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
info := &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}
|
|
c.apisMeta[api.GroupKind] = info
|
|
c.namespacedResources[api.GroupKind] = api.Meta.Namespaced
|
|
lock.Unlock()
|
|
|
|
return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
|
|
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
|
|
return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error {
|
|
if un, ok := obj.(*unstructured.Unstructured); !ok {
|
|
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
|
|
} else {
|
|
newRes := c.newResource(un)
|
|
lock.Lock()
|
|
c.setNode(newRes)
|
|
lock.Unlock()
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
if err != nil {
|
|
if c.isRestrictedResource(err) {
|
|
keep := false
|
|
if c.respectRBAC == RespectRbacStrict {
|
|
k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
|
|
if permErr != nil {
|
|
return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
|
|
}
|
|
keep = k
|
|
}
|
|
// if we are not allowed to list the resource, remove it from the watch list
|
|
if !keep {
|
|
lock.Lock()
|
|
delete(c.apisMeta, api.GroupKind)
|
|
delete(c.namespacedResources, api.GroupKind)
|
|
lock.Unlock()
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
|
|
}
|
|
|
|
go c.watchEvents(ctx, api, resClient, ns, resourceVersion)
|
|
|
|
return nil
|
|
})
|
|
})
|
|
if err != nil {
|
|
c.log.Error(err, "Failed to sync cluster")
|
|
return fmt.Errorf("failed to sync cluster %s: %w", c.config.Host, err)
|
|
}
|
|
|
|
c.log.Info("Cluster successfully synced")
|
|
return nil
|
|
}
|
|
|
|
// invalidateEventMeta closes the eventMeta channel if it is open
|
|
func (c *clusterCache) invalidateEventMeta() {
|
|
if c.eventMetaCh != nil {
|
|
close(c.eventMetaCh)
|
|
c.eventMetaCh = nil
|
|
}
|
|
}
|
|
|
|
// EnsureSynced checks cache state and synchronizes it if necessary
|
|
func (c *clusterCache) EnsureSynced() error {
|
|
syncStatus := &c.syncStatus
|
|
|
|
// first check if cluster is synced *without acquiring the full clusterCache lock*
|
|
syncStatus.lock.Lock()
|
|
if syncStatus.synced(c.clusterSyncRetryTimeout) {
|
|
syncError := syncStatus.syncError
|
|
syncStatus.lock.Unlock()
|
|
return syncError
|
|
}
|
|
syncStatus.lock.Unlock() // release the lock, so that we can acquire the parent lock (see struct comment re: lock acquisition ordering)
|
|
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
syncStatus.lock.Lock()
|
|
defer syncStatus.lock.Unlock()
|
|
|
|
// before doing any work, check once again now that we have the lock, to see if it got
|
|
// synced between the first check and now
|
|
if syncStatus.synced(c.clusterSyncRetryTimeout) {
|
|
return syncStatus.syncError
|
|
}
|
|
err := c.sync()
|
|
syncTime := time.Now()
|
|
syncStatus.syncTime = &syncTime
|
|
syncStatus.syncError = err
|
|
return syncStatus.syncError
|
|
}
|
|
|
|
func (c *clusterCache) FindResources(namespace string, predicates ...func(r *Resource) bool) map[kube.ResourceKey]*Resource {
|
|
c.lock.RLock()
|
|
defer c.lock.RUnlock()
|
|
result := map[kube.ResourceKey]*Resource{}
|
|
resources := map[kube.ResourceKey]*Resource{}
|
|
if namespace != "" {
|
|
if ns, ok := c.nsIndex[namespace]; ok {
|
|
resources = ns
|
|
}
|
|
} else {
|
|
resources = c.resources
|
|
}
|
|
|
|
for k := range resources {
|
|
r := resources[k]
|
|
matches := true
|
|
for i := range predicates {
|
|
if !predicates[i](r) {
|
|
matches = false
|
|
break
|
|
}
|
|
}
|
|
|
|
if matches {
|
|
result[k] = r
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// IterateHierarchy iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree
|
|
func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) {
|
|
c.lock.RLock()
|
|
defer c.lock.RUnlock()
|
|
keysPerNamespace := make(map[string][]kube.ResourceKey)
|
|
for _, key := range keys {
|
|
_, ok := c.resources[key]
|
|
if !ok {
|
|
continue
|
|
}
|
|
keysPerNamespace[key.Namespace] = append(keysPerNamespace[key.Namespace], key)
|
|
}
|
|
for namespace, namespaceKeys := range keysPerNamespace {
|
|
nsNodes := c.nsIndex[namespace]
|
|
graph := buildGraph(nsNodes)
|
|
visited := make(map[kube.ResourceKey]int)
|
|
for _, key := range namespaceKeys {
|
|
visited[key] = 0
|
|
}
|
|
for _, key := range namespaceKeys {
|
|
// The check for existence of key is done above.
|
|
res := c.resources[key]
|
|
if visited[key] == 2 || !action(res, nsNodes) {
|
|
continue
|
|
}
|
|
visited[key] = 1
|
|
if _, ok := graph[key]; ok {
|
|
for _, child := range graph[key] {
|
|
if visited[child.ResourceKey()] == 0 && action(child, nsNodes) {
|
|
child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool {
|
|
if err != nil {
|
|
c.log.V(2).Info(err.Error())
|
|
return false
|
|
}
|
|
return action(child, namespaceResources)
|
|
})
|
|
}
|
|
}
|
|
}
|
|
visited[key] = 2
|
|
}
|
|
}
|
|
}
|
|
|
|
func buildGraph(nsNodes map[kube.ResourceKey]*Resource) map[kube.ResourceKey]map[types.UID]*Resource {
|
|
// Prepare to construct a graph
|
|
nodesByUID := make(map[types.UID][]*Resource, len(nsNodes))
|
|
for _, node := range nsNodes {
|
|
nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node)
|
|
}
|
|
|
|
// In graph, they key is the parent and the value is a list of children.
|
|
graph := make(map[kube.ResourceKey]map[types.UID]*Resource)
|
|
|
|
// Loop through all nodes, calling each one "childNode," because we're only bothering with it if it has a parent.
|
|
for _, childNode := range nsNodes {
|
|
for i, ownerRef := range childNode.OwnerRefs {
|
|
// First, backfill UID of inferred owner child references.
|
|
if ownerRef.UID == "" {
|
|
group, err := schema.ParseGroupVersion(ownerRef.APIVersion)
|
|
if err != nil {
|
|
// APIVersion is invalid, so we couldn't find the parent.
|
|
continue
|
|
}
|
|
graphKeyNode, ok := nsNodes[kube.ResourceKey{Group: group.Group, Kind: ownerRef.Kind, Namespace: childNode.Ref.Namespace, Name: ownerRef.Name}]
|
|
if !ok {
|
|
// No resource found with the given graph key, so move on.
|
|
continue
|
|
}
|
|
ownerRef.UID = graphKeyNode.Ref.UID
|
|
childNode.OwnerRefs[i] = ownerRef
|
|
}
|
|
|
|
// Now that we have the UID of the parent, update the graph.
|
|
uidNodes, ok := nodesByUID[ownerRef.UID]
|
|
if ok {
|
|
for _, uidNode := range uidNodes {
|
|
// Update the graph for this owner to include the child.
|
|
if _, ok := graph[uidNode.ResourceKey()]; !ok {
|
|
graph[uidNode.ResourceKey()] = make(map[types.UID]*Resource)
|
|
}
|
|
r, ok := graph[uidNode.ResourceKey()][childNode.Ref.UID]
|
|
if !ok {
|
|
graph[uidNode.ResourceKey()][childNode.Ref.UID] = childNode
|
|
} else if r != nil {
|
|
// The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group).
|
|
// It is ok to pick any object, but we need to make sure we pick the same child after every refresh.
|
|
key1 := r.ResourceKey()
|
|
key2 := childNode.ResourceKey()
|
|
if strings.Compare(key1.String(), key2.String()) > 0 {
|
|
graph[uidNode.ResourceKey()][childNode.Ref.UID] = childNode
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return graph
|
|
}
|
|
|
|
// IsNamespaced answers if specified group/kind is a namespaced resource API or not
|
|
func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) {
|
|
if isNamespaced, ok := c.namespacedResources[gk]; ok {
|
|
return isNamespaced, nil
|
|
}
|
|
return false, apierrors.NewNotFound(schema.GroupResource{Group: gk.Group}, "")
|
|
}
|
|
|
|
func (c *clusterCache) managesNamespace(namespace string) bool {
|
|
for _, ns := range c.namespaces {
|
|
if ns == namespace {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// GetManagedLiveObjs helps finding matching live K8S resources for a given resources list.
|
|
// The function returns all resources from cache for those `isManaged` function returns true and resources
|
|
// specified in targetObjs list.
|
|
func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructured, isManaged func(r *Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
|
|
c.lock.RLock()
|
|
defer c.lock.RUnlock()
|
|
|
|
for _, o := range targetObjs {
|
|
if len(c.namespaces) > 0 {
|
|
if o.GetNamespace() == "" && !c.clusterResources {
|
|
return nil, fmt.Errorf("cluster level %s %q can not be managed when in namespaced mode", o.GetKind(), o.GetName())
|
|
} else if o.GetNamespace() != "" && !c.managesNamespace(o.GetNamespace()) {
|
|
return nil, fmt.Errorf("namespace %q for %s %q is not managed", o.GetNamespace(), o.GetKind(), o.GetName())
|
|
}
|
|
}
|
|
}
|
|
|
|
managedObjs := make(map[kube.ResourceKey]*unstructured.Unstructured)
|
|
// iterate all objects in live state cache to find ones associated with app
|
|
for key, o := range c.resources {
|
|
if isManaged(o) && o.Resource != nil && len(o.OwnerRefs) == 0 {
|
|
managedObjs[key] = o.Resource
|
|
}
|
|
}
|
|
// but are simply missing our label
|
|
lock := &sync.Mutex{}
|
|
err := kube.RunAllAsync(len(targetObjs), func(i int) error {
|
|
targetObj := targetObjs[i]
|
|
key := kube.GetResourceKey(targetObj)
|
|
lock.Lock()
|
|
managedObj := managedObjs[key]
|
|
lock.Unlock()
|
|
|
|
if managedObj == nil {
|
|
if existingObj, exists := c.resources[key]; exists {
|
|
if existingObj.Resource != nil {
|
|
managedObj = existingObj.Resource
|
|
} else {
|
|
var err error
|
|
managedObj, err = c.kubectl.GetResource(context.TODO(), c.config, targetObj.GroupVersionKind(), existingObj.Ref.Name, existingObj.Ref.Namespace)
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("unexpected error getting managed object: %w", err)
|
|
}
|
|
}
|
|
} else if _, watched := c.apisMeta[key.GroupKind()]; !watched {
|
|
var err error
|
|
managedObj, err = c.kubectl.GetResource(context.TODO(), c.config, targetObj.GroupVersionKind(), targetObj.GetName(), targetObj.GetNamespace())
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("unexpected error getting managed object: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if managedObj != nil {
|
|
converted, err := c.kubectl.ConvertToVersion(managedObj, targetObj.GroupVersionKind().Group, targetObj.GroupVersionKind().Version)
|
|
if err != nil {
|
|
// fallback to loading resource from kubernetes if conversion fails
|
|
c.log.V(1).Info(fmt.Sprintf("Failed to convert resource: %v", err))
|
|
managedObj, err = c.kubectl.GetResource(context.TODO(), c.config, targetObj.GroupVersionKind(), managedObj.GetName(), managedObj.GetNamespace())
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("unexpected error getting managed object: %w", err)
|
|
}
|
|
} else {
|
|
managedObj = converted
|
|
}
|
|
lock.Lock()
|
|
managedObjs[key] = managedObj
|
|
lock.Unlock()
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get managed objects: %w", err)
|
|
}
|
|
|
|
return managedObjs, nil
|
|
}
|
|
|
|
func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstructured) {
|
|
for _, h := range c.getEventHandlers() {
|
|
h(event, un)
|
|
}
|
|
key := kube.GetResourceKey(un)
|
|
if event == watch.Modified && skipAppRequeuing(key) {
|
|
return
|
|
}
|
|
|
|
if c.batchEventsProcessing {
|
|
c.eventMetaCh <- eventMeta{event, un}
|
|
} else {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
c.processEvent(key, eventMeta{event, un})
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) processEvents() {
|
|
log := c.log.WithValues("functionName", "processItems")
|
|
log.V(1).Info("Start processing events")
|
|
|
|
c.lock.Lock()
|
|
ch := c.eventMetaCh
|
|
c.lock.Unlock()
|
|
|
|
eventMetas := make([]eventMeta, 0)
|
|
ticker := time.NewTicker(c.eventProcessingInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case evMeta, ok := <-ch:
|
|
if !ok {
|
|
log.V(2).Info("Event processing channel closed, finish processing")
|
|
return
|
|
}
|
|
eventMetas = append(eventMetas, evMeta)
|
|
case <-ticker.C:
|
|
if len(eventMetas) > 0 {
|
|
c.processEventsBatch(eventMetas)
|
|
eventMetas = eventMetas[:0]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) {
|
|
log := c.log.WithValues("functionName", "processEventsBatch")
|
|
start := time.Now()
|
|
c.lock.Lock()
|
|
log.V(1).Info("Lock acquired (ms)", "duration", time.Since(start).Milliseconds())
|
|
defer func() {
|
|
c.lock.Unlock()
|
|
duration := time.Since(start)
|
|
// Update the metric with the duration of the events processing
|
|
for _, handler := range c.getProcessEventsHandlers() {
|
|
handler(duration, len(eventMetas))
|
|
}
|
|
}()
|
|
|
|
for _, evMeta := range eventMetas {
|
|
key := kube.GetResourceKey(evMeta.un)
|
|
c.processEvent(key, evMeta)
|
|
}
|
|
|
|
log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds())
|
|
}
|
|
|
|
func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) {
|
|
existingNode, exists := c.resources[key]
|
|
if evMeta.event == watch.Deleted {
|
|
if exists {
|
|
c.onNodeRemoved(key)
|
|
}
|
|
} else {
|
|
c.onNodeUpdated(existingNode, c.newResource(evMeta.un))
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
|
|
c.setNode(newRes)
|
|
for _, h := range c.getResourceUpdatedHandlers() {
|
|
h(newRes, oldRes, c.nsIndex[newRes.Ref.Namespace])
|
|
}
|
|
}
|
|
|
|
func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
|
|
existing, ok := c.resources[key]
|
|
if ok {
|
|
delete(c.resources, key)
|
|
ns, ok := c.nsIndex[key.Namespace]
|
|
if ok {
|
|
delete(ns, key)
|
|
if len(ns) == 0 {
|
|
delete(c.nsIndex, key.Namespace)
|
|
}
|
|
// remove ownership references from children with inferred references
|
|
if existing.isInferredParentOf != nil {
|
|
for k, v := range ns {
|
|
if mightHaveInferredOwner(v) && existing.isInferredParentOf(k) {
|
|
v.setOwnerRef(existing.toOwnerRef(), false)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for _, h := range c.getResourceUpdatedHandlers() {
|
|
h(nil, existing, ns)
|
|
}
|
|
}
|
|
}
|
|
|
|
var ignoredRefreshResources = map[string]bool{
|
|
"/" + kube.EndpointsKind: true,
|
|
}
|
|
|
|
// GetClusterInfo returns cluster cache statistics
|
|
func (c *clusterCache) GetClusterInfo() ClusterInfo {
|
|
c.lock.RLock()
|
|
defer c.lock.RUnlock()
|
|
c.syncStatus.lock.Lock()
|
|
defer c.syncStatus.lock.Unlock()
|
|
|
|
return ClusterInfo{
|
|
APIsCount: len(c.apisMeta),
|
|
K8SVersion: c.serverVersion,
|
|
ResourcesCount: len(c.resources),
|
|
Server: c.config.Host,
|
|
LastCacheSyncTime: c.syncStatus.syncTime,
|
|
SyncError: c.syncStatus.syncError,
|
|
APIResources: c.apiResources,
|
|
}
|
|
}
|
|
|
|
// skipAppRequeuing checks if the object is an API type which we want to skip requeuing against.
|
|
// We ignore API types which have a high churn rate, and/or whose updates are irrelevant to the app
|
|
func skipAppRequeuing(key kube.ResourceKey) bool {
|
|
return ignoredRefreshResources[key.Group+"/"+key.Kind]
|
|
}
|