Files
argo-cd/server/server.go
Mike Cutsail 2793097480 feat: add OTEL instrumentation for authentication and handlers (#25296)
Signed-off-by: Mike Cutsail <mcutsail15@apple.com>
Signed-off-by: Alexandre Gaudreault <alexandre_gaudreault@intuit.com>
Co-authored-by: Alexandre Gaudreault <alexandre_gaudreault@intuit.com>
2026-02-10 13:31:55 +00:00

1746 lines
65 KiB
Go

package server
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
goio "io"
"io/fs"
"math"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"reflect"
"regexp"
go_runtime "runtime"
"runtime/debug"
"strings"
gosync "sync"
"sync/atomic"
"syscall"
"time"
"github.com/argoproj/notifications-engine/pkg/api"
"github.com/argoproj/pkg/v2/sync"
"github.com/golang-jwt/jwt/v5"
golang_proto "github.com/golang/protobuf/proto" //nolint:staticcheck
"github.com/gorilla/handlers"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
otel_codes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/argoproj/argo-cd/v3/common"
"github.com/argoproj/argo-cd/v3/pkg/apiclient"
accountpkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/account"
applicationpkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/application"
applicationsetpkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/applicationset"
certificatepkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/certificate"
clusterpkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/cluster"
gpgkeypkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/gpgkey"
notificationpkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/notification"
projectpkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/project"
repocredspkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/repocreds"
repositorypkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/repository"
sessionpkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/session"
settingspkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/settings"
versionpkg "github.com/argoproj/argo-cd/v3/pkg/apiclient/version"
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned"
appinformer "github.com/argoproj/argo-cd/v3/pkg/client/informers/externalversions"
applisters "github.com/argoproj/argo-cd/v3/pkg/client/listers/application/v1alpha1"
repoapiclient "github.com/argoproj/argo-cd/v3/reposerver/apiclient"
repocache "github.com/argoproj/argo-cd/v3/reposerver/cache"
"github.com/argoproj/argo-cd/v3/server/account"
"github.com/argoproj/argo-cd/v3/server/application"
"github.com/argoproj/argo-cd/v3/server/applicationset"
"github.com/argoproj/argo-cd/v3/server/badge"
servercache "github.com/argoproj/argo-cd/v3/server/cache"
"github.com/argoproj/argo-cd/v3/server/certificate"
"github.com/argoproj/argo-cd/v3/server/cluster"
"github.com/argoproj/argo-cd/v3/server/extension"
"github.com/argoproj/argo-cd/v3/server/gpgkey"
"github.com/argoproj/argo-cd/v3/server/logout"
"github.com/argoproj/argo-cd/v3/server/metrics"
"github.com/argoproj/argo-cd/v3/server/notification"
"github.com/argoproj/argo-cd/v3/server/project"
"github.com/argoproj/argo-cd/v3/server/rbacpolicy"
"github.com/argoproj/argo-cd/v3/server/repocreds"
"github.com/argoproj/argo-cd/v3/server/repository"
"github.com/argoproj/argo-cd/v3/server/session"
"github.com/argoproj/argo-cd/v3/server/settings"
"github.com/argoproj/argo-cd/v3/server/version"
"github.com/argoproj/argo-cd/v3/ui"
"github.com/argoproj/argo-cd/v3/util/assets"
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
"github.com/argoproj/argo-cd/v3/util/db"
dexutil "github.com/argoproj/argo-cd/v3/util/dex"
"github.com/argoproj/argo-cd/v3/util/env"
errorsutil "github.com/argoproj/argo-cd/v3/util/errors"
grpc_util "github.com/argoproj/argo-cd/v3/util/grpc"
"github.com/argoproj/argo-cd/v3/util/healthz"
httputil "github.com/argoproj/argo-cd/v3/util/http"
utilio "github.com/argoproj/argo-cd/v3/util/io"
"github.com/argoproj/argo-cd/v3/util/io/files"
jwtutil "github.com/argoproj/argo-cd/v3/util/jwt"
kubeutil "github.com/argoproj/argo-cd/v3/util/kube"
service "github.com/argoproj/argo-cd/v3/util/notification/argocd"
"github.com/argoproj/argo-cd/v3/util/notification/k8s"
settings_notif "github.com/argoproj/argo-cd/v3/util/notification/settings"
"github.com/argoproj/argo-cd/v3/util/oidc"
"github.com/argoproj/argo-cd/v3/util/rbac"
util_session "github.com/argoproj/argo-cd/v3/util/session"
settings_util "github.com/argoproj/argo-cd/v3/util/settings"
"github.com/argoproj/argo-cd/v3/util/swagger"
tlsutil "github.com/argoproj/argo-cd/v3/util/tls"
"github.com/argoproj/argo-cd/v3/util/webhook"
)
const (
maxConcurrentLoginRequestsCountEnv = "ARGOCD_MAX_CONCURRENT_LOGIN_REQUESTS_COUNT"
replicasCountEnv = "ARGOCD_API_SERVER_REPLICAS"
renewTokenKey = "renew-token"
)
// ErrNoSession indicates no auth token was supplied as part of a request
var ErrNoSession = status.Errorf(codes.Unauthenticated, "no session information")
var noCacheHeaders = map[string]string{
"Expires": time.Unix(0, 0).Format(time.RFC1123),
"Cache-Control": "no-cache, private, max-age=0",
"Pragma": "no-cache",
"X-Accel-Expires": "0",
}
var backoff = wait.Backoff{
Steps: 5,
Duration: 500 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}
var (
clientConstraint = ">= " + common.MinClientVersion
baseHRefRegex = regexp.MustCompile(`<base href="(.*?)">`)
// limits number of concurrent login requests to prevent password brute forcing. If set to 0 then no limit is enforced.
maxConcurrentLoginRequestsCount = 50
replicasCount = 1
enableGRPCTimeHistogram = true
)
// OpenTelemetry tracer for this package
var tracer trace.Tracer
func init() {
maxConcurrentLoginRequestsCount = env.ParseNumFromEnv(maxConcurrentLoginRequestsCountEnv, maxConcurrentLoginRequestsCount, 0, math.MaxInt32)
replicasCount = env.ParseNumFromEnv(replicasCountEnv, replicasCount, 0, math.MaxInt32)
if replicasCount > 0 {
maxConcurrentLoginRequestsCount = maxConcurrentLoginRequestsCount / replicasCount
}
enableGRPCTimeHistogram = env.ParseBoolFromEnv(common.EnvEnableGRPCTimeHistogramEnv, false)
tracer = otel.Tracer("github.com/argoproj/argo-cd/v3/server")
}
// ArgoCDServer is the API server for Argo CD
type ArgoCDServer struct {
ArgoCDServerOpts
ApplicationSetOpts
ssoClientApp *oidc.ClientApp
settings *settings_util.ArgoCDSettings
log *log.Entry
sessionMgr *util_session.SessionManager
settingsMgr *settings_util.SettingsManager
enf *rbac.Enforcer
projInformer cache.SharedIndexInformer
policyEnforcer *rbacpolicy.RBACPolicyEnforcer
clusterInformer *settings_util.ClusterInformer
appInformer cache.SharedIndexInformer
appLister applisters.ApplicationLister
appsetInformer cache.SharedIndexInformer
appsetLister applisters.ApplicationSetLister
db db.ArgoDB
// stopCh is the channel which when closed, will shutdown the Argo CD server
stopCh chan os.Signal
userStateStorage util_session.UserStateStorage
indexDataInit gosync.Once
indexData []byte
indexDataErr error
staticAssets http.FileSystem
apiFactory api.Factory
secretInformer cache.SharedIndexInformer
configMapInformer cache.SharedIndexInformer
serviceSet *ArgoCDServiceSet
extensionManager *extension.Manager
Shutdown func()
terminateRequested atomic.Bool
available atomic.Bool
}
type ArgoCDServerOpts struct {
DisableAuth bool
ContentTypes []string
EnableGZip bool
Insecure bool
StaticAssetsDir string
ListenPort int
ListenHost string
MetricsPort int
MetricsHost string
Namespace string
DexServerAddr string
DexTLSConfig *dexutil.DexTLSConfig
BaseHRef string
RootPath string
DynamicClientset dynamic.Interface
KubeControllerClientset client.Client
KubeClientset kubernetes.Interface
AppClientset appclientset.Interface
RepoClientset repoapiclient.Clientset
Cache *servercache.Cache
RepoServerCache *repocache.Cache
RedisClient *redis.Client
TLSConfigCustomizer tlsutil.ConfigCustomizer
XFrameOptions string
ContentSecurityPolicy string
ApplicationNamespaces []string
EnableProxyExtension bool
WebhookParallelism int
EnableK8sEvent []string
HydratorEnabled bool
SyncWithReplaceAllowed bool
}
type ApplicationSetOpts struct {
GitSubmoduleEnabled bool
EnableNewGitFileGlobbing bool
ScmRootCAPath string
AllowedScmProviders []string
EnableScmProviders bool
EnableGitHubAPIMetrics bool
}
// GracefulRestartSignal implements a signal to be used for a graceful restart trigger.
type GracefulRestartSignal struct{}
// HTTPMetricsRegistry exposes operations to update http metrics in the Argo CD
// API server.
type HTTPMetricsRegistry interface {
// IncExtensionRequestCounter will increase the request counter for the given
// extension with the given status.
IncExtensionRequestCounter(extension string, status int)
// ObserveExtensionRequestDuration will register the request roundtrip duration
// between Argo CD API Server and the extension backend service for the given
// extension.
ObserveExtensionRequestDuration(extension string, duration time.Duration)
}
// String is a part of os.Signal interface to represent a signal as a string.
func (g GracefulRestartSignal) String() string {
return "GracefulRestartSignal"
}
// Signal is a part of os.Signal interface doing nothing.
func (g GracefulRestartSignal) Signal() {}
// initializeDefaultProject creates the default project if it does not already exist
func initializeDefaultProject(opts ArgoCDServerOpts) error {
defaultProj := &v1alpha1.AppProject{
ObjectMeta: metav1.ObjectMeta{Name: v1alpha1.DefaultAppProjectName, Namespace: opts.Namespace},
Spec: v1alpha1.AppProjectSpec{
SourceRepos: []string{"*"},
Destinations: []v1alpha1.ApplicationDestination{{Server: "*", Namespace: "*"}},
ClusterResourceWhitelist: []v1alpha1.ClusterResourceRestrictionItem{{Group: "*", Kind: "*"}},
},
}
_, err := opts.AppClientset.ArgoprojV1alpha1().AppProjects(opts.Namespace).Get(context.Background(), defaultProj.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
_, err = opts.AppClientset.ArgoprojV1alpha1().AppProjects(opts.Namespace).Create(context.Background(), defaultProj, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
return nil
}
}
return err
}
// NewServer returns a new instance of the Argo CD API server
func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts ApplicationSetOpts) *ArgoCDServer {
settingsMgr := settings_util.NewSettingsManager(ctx, opts.KubeClientset, opts.Namespace)
settings, err := settingsMgr.InitializeSettings(opts.Insecure)
errorsutil.CheckError(err)
err = initializeDefaultProject(opts)
errorsutil.CheckError(err)
clusterInformer, err := settings_util.NewClusterInformer(opts.KubeClientset, opts.Namespace)
errorsutil.CheckError(err)
appInformerNs := opts.Namespace
if len(opts.ApplicationNamespaces) > 0 {
appInformerNs = ""
}
projFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(opts.Namespace), appinformer.WithTweakListOptions(func(_ *metav1.ListOptions) {}))
appFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(appInformerNs), appinformer.WithTweakListOptions(func(_ *metav1.ListOptions) {}))
projInformer := projFactory.Argoproj().V1alpha1().AppProjects().Informer()
projLister := projFactory.Argoproj().V1alpha1().AppProjects().Lister().AppProjects(opts.Namespace)
appInformer := appFactory.Argoproj().V1alpha1().Applications().Informer()
appLister := appFactory.Argoproj().V1alpha1().Applications().Lister()
appsetInformer := appFactory.Argoproj().V1alpha1().ApplicationSets().Informer()
appsetLister := appFactory.Argoproj().V1alpha1().ApplicationSets().Lister()
userStateStorage := util_session.NewUserStateStorage(opts.RedisClient)
ssoClientApp, err := oidc.NewClientApp(settings, opts.DexServerAddr, opts.DexTLSConfig, opts.BaseHRef, cacheutil.NewRedisCache(opts.RedisClient, settings.UserInfoCacheExpiration(), cacheutil.RedisCompressionNone))
errorsutil.CheckError(err)
sessionMgr := util_session.NewSessionManager(settingsMgr, projLister, opts.DexServerAddr, opts.DexTLSConfig, userStateStorage)
enf := rbac.NewEnforcer(opts.KubeClientset, opts.Namespace, common.ArgoCDRBACConfigMapName, nil)
enf.EnableEnforce(!opts.DisableAuth)
err = enf.SetBuiltinPolicy(assets.BuiltinPolicyCSV)
errorsutil.CheckError(err)
enf.EnableLog(os.Getenv(common.EnvVarRBACDebug) == "1")
policyEnf := rbacpolicy.NewRBACPolicyEnforcer(enf, projLister)
enf.SetClaimsEnforcerFunc(policyEnf.EnforceClaims)
staticFS, err := fs.Sub(ui.Embedded, "dist/app")
errorsutil.CheckError(err)
root, err := os.OpenRoot(opts.StaticAssetsDir)
if err != nil {
if os.IsNotExist(err) {
log.Warnf("Static assets directory %q does not exist, using only embedded assets", opts.StaticAssetsDir)
} else {
errorsutil.CheckError(err)
}
} else {
staticFS = utilio.NewComposableFS(staticFS, root.FS())
}
argocdService, err := service.NewArgoCDService(opts.KubeClientset, opts.Namespace, opts.RepoClientset)
errorsutil.CheckError(err)
secretInformer := k8s.NewSecretInformer(opts.KubeClientset, opts.Namespace, "argocd-notifications-secret")
configMapInformer := k8s.NewConfigMapInformer(opts.KubeClientset, opts.Namespace, "argocd-notifications-cm")
apiFactory := api.NewFactory(settings_notif.GetFactorySettings(argocdService, "argocd-notifications-secret", "argocd-notifications-cm", false), opts.Namespace, secretInformer, configMapInformer)
dbInstance := db.NewDB(opts.Namespace, settingsMgr, opts.KubeClientset)
logger := log.NewEntry(log.StandardLogger())
sg := extension.NewDefaultSettingsGetter(settingsMgr)
ag := extension.NewDefaultApplicationGetter(appLister)
pg := extension.NewDefaultProjectGetter(projLister, dbInstance)
ug := extension.NewDefaultUserGetter(policyEnf)
em := extension.NewManager(logger, opts.Namespace, sg, ag, pg, dbInstance, enf, ug)
noopShutdown := func() {
log.Error("API Server Shutdown function called but server is not started yet.")
}
a := &ArgoCDServer{
ArgoCDServerOpts: opts,
ApplicationSetOpts: appsetOpts,
ssoClientApp: ssoClientApp,
log: logger,
settings: settings,
sessionMgr: sessionMgr,
settingsMgr: settingsMgr,
enf: enf,
projInformer: projInformer,
appInformer: appInformer,
appLister: appLister,
appsetInformer: appsetInformer,
appsetLister: appsetLister,
policyEnforcer: policyEnf,
clusterInformer: clusterInformer,
userStateStorage: userStateStorage,
staticAssets: http.FS(staticFS),
db: dbInstance,
apiFactory: apiFactory,
secretInformer: secretInformer,
configMapInformer: configMapInformer,
extensionManager: em,
Shutdown: noopShutdown,
stopCh: make(chan os.Signal, 1),
}
err = a.logInClusterWarnings()
if err != nil {
// Just log. It's not critical.
log.Warnf("Failed to log in-cluster warnings: %v", err)
}
return a
}
const (
// catches corrupted informer state; see https://github.com/argoproj/argo-cd/issues/4960 for more information
notObjectErrMsg = "object does not implement the Object interfaces"
)
func (server *ArgoCDServer) healthCheck(r *http.Request) error {
if server.terminateRequested.Load() {
return errors.New("API Server is terminating and unable to serve requests")
}
if !server.available.Load() {
return errors.New("API Server is not available: it either hasn't started or is restarting")
}
if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" {
argoDB := db.NewDB(server.Namespace, server.settingsMgr, server.KubeClientset)
_, err := argoDB.ListClusters(r.Context())
if err != nil && strings.Contains(err.Error(), notObjectErrMsg) {
return err
}
}
return nil
}
type Listeners struct {
Main net.Listener
Metrics net.Listener
GatewayConn *grpc.ClientConn
}
func (l *Listeners) Close() error {
if l.Main != nil {
if err := l.Main.Close(); err != nil {
return err
}
l.Main = nil
}
if l.Metrics != nil {
if err := l.Metrics.Close(); err != nil {
return err
}
l.Metrics = nil
}
if l.GatewayConn != nil {
if err := l.GatewayConn.Close(); err != nil {
return err
}
l.GatewayConn = nil
}
return nil
}
// logInClusterWarnings checks the in-cluster configuration and prints out any warnings.
func (server *ArgoCDServer) logInClusterWarnings() error {
labelSelector := labels.NewSelector()
req, err := labels.NewRequirement(common.LabelKeySecretType, selection.Equals, []string{common.LabelValueSecretTypeCluster})
if err != nil {
return fmt.Errorf("failed to construct cluster-type label selector: %w", err)
}
labelSelector = labelSelector.Add(*req)
secretsLister, err := server.settingsMgr.GetSecretsLister()
if err != nil {
return fmt.Errorf("failed to get secrets lister: %w", err)
}
clusterSecrets, err := secretsLister.Secrets(server.ArgoCDServerOpts.Namespace).List(labelSelector)
if err != nil {
return fmt.Errorf("failed to list cluster secrets: %w", err)
}
var inClusterSecrets []string
for _, clusterSecret := range clusterSecrets {
cluster, err := db.SecretToCluster(clusterSecret)
if err != nil {
return fmt.Errorf("could not unmarshal cluster secret %q: %w", clusterSecret.Name, err)
}
if cluster.Server == v1alpha1.KubernetesInternalAPIServerAddr {
inClusterSecrets = append(inClusterSecrets, clusterSecret.Name)
}
}
if len(inClusterSecrets) > 0 {
// Don't make this call unless we actually have in-cluster secrets, to save time.
dbSettings, err := server.settingsMgr.GetSettings()
if err != nil {
return fmt.Errorf("could not get DB settings: %w", err)
}
if !dbSettings.InClusterEnabled {
for _, clusterName := range inClusterSecrets {
log.Warnf("cluster %q uses in-cluster server address but it's disabled in Argo CD settings", clusterName)
}
}
}
return nil
}
func startListener(host string, port int) (net.Listener, error) {
var conn net.Listener
var realErr error
lc := net.ListenConfig{}
_ = wait.ExponentialBackoff(backoff, func() (bool, error) {
conn, realErr = lc.Listen(context.Background(), "tcp", fmt.Sprintf("%s:%d", host, port))
if realErr != nil {
return false, nil
}
return true, nil
})
return conn, realErr
}
func (server *ArgoCDServer) Listen() (*Listeners, error) {
mainLn, err := startListener(server.ListenHost, server.ListenPort)
if err != nil {
return nil, err
}
metricsLn, err := startListener(server.ListenHost, server.MetricsPort)
if err != nil {
utilio.Close(mainLn)
return nil, err
}
var dOpts []grpc.DialOption
dOpts = append(dOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(apiclient.MaxGRPCMessageSize)))
dOpts = append(dOpts, grpc.WithUserAgent(fmt.Sprintf("%s/%s", common.ArgoCDUserAgentName, common.GetVersion().Version)))
dOpts = append(dOpts, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
if server.useTLS() {
// The following sets up the dial Options for grpc-gateway to talk to gRPC server over TLS.
// grpc-gateway is just translating HTTP/HTTPS requests as gRPC requests over localhost,
// so we need to supply the same certificates to establish the connections that a normal,
// external gRPC client would need.
tlsConfig := server.settings.TLSConfig()
if server.TLSConfigCustomizer != nil {
server.TLSConfigCustomizer(tlsConfig)
}
tlsConfig.InsecureSkipVerify = true
dCreds := credentials.NewTLS(tlsConfig)
dOpts = append(dOpts, grpc.WithTransportCredentials(dCreds))
} else {
dOpts = append(dOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", server.ListenPort), dOpts...)
if err != nil {
utilio.Close(mainLn)
utilio.Close(metricsLn)
return nil, err
}
return &Listeners{Main: mainLn, Metrics: metricsLn, GatewayConn: conn}, nil
}
// Init starts informers used by the API server
func (server *ArgoCDServer) Init(ctx context.Context) {
go server.projInformer.Run(ctx.Done())
go server.appInformer.Run(ctx.Done())
go server.appsetInformer.Run(ctx.Done())
go server.clusterInformer.Run(ctx.Done())
go server.configMapInformer.Run(ctx.Done())
go server.secretInformer.Run(ctx.Done())
}
// Run runs the API Server
// We use k8s.io/code-generator/cmd/go-to-protobuf to generate the .proto files from the API types.
// k8s.io/ go-to-protobuf uses protoc-gen-gogo, which comes from gogo/protobuf (a fork of
// golang/protobuf).
func (server *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) {
defer func() {
if r := recover(); r != nil {
log.WithField("trace", string(debug.Stack())).Error("Recovered from panic: ", r)
server.terminateRequested.Store(true)
server.Shutdown()
}
}()
metricsServ := metrics.NewMetricsServer(server.MetricsHost, server.MetricsPort)
if server.RedisClient != nil {
cacheutil.CollectMetrics(server.RedisClient, metricsServ, server.userStateStorage.GetLockObject())
}
// Don't init storage until after CollectMetrics. CollectMetrics adds hooks to the Redis client, and Init
// reads those hooks. If this is called first, there may be a data race.
server.userStateStorage.Init(ctx)
svcSet := newArgoCDServiceSet(server)
if server.sessionMgr != nil {
server.sessionMgr.CollectMetrics(metricsServ)
}
server.serviceSet = svcSet
grpcS, appResourceTreeFn := server.newGRPCServer(metricsServ.PrometheusRegistry)
grpcWebS := grpcweb.WrapServer(grpcS)
var httpS *http.Server
var httpsS *http.Server
if server.useTLS() {
httpS = newRedirectServer(server.ListenPort, server.RootPath)
httpsS = server.newHTTPServer(ctx, server.ListenPort, grpcWebS, appResourceTreeFn, listeners.GatewayConn, metricsServ)
} else {
httpS = server.newHTTPServer(ctx, server.ListenPort, grpcWebS, appResourceTreeFn, listeners.GatewayConn, metricsServ)
}
if server.RootPath != "" {
httpS.Handler = withRootPath(httpS.Handler, server)
if httpsS != nil {
httpsS.Handler = withRootPath(httpsS.Handler, server)
}
}
httpS.Handler = &bug21955Workaround{handler: httpS.Handler}
if httpsS != nil {
httpsS.Handler = &bug21955Workaround{handler: httpsS.Handler}
}
// CMux is used to support servicing gRPC and HTTP1.1+JSON on the same port
tcpm := cmux.New(listeners.Main)
var tlsm cmux.CMux
var grpcL net.Listener
var httpL net.Listener
var httpsL net.Listener
if !server.useTLS() {
httpL = tcpm.Match(cmux.HTTP1Fast("PATCH"))
grpcL = tcpm.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
} else {
// We first match on HTTP 1.1 methods.
httpL = tcpm.Match(cmux.HTTP1Fast("PATCH"))
// If not matched, we assume that its TLS.
tlsl := tcpm.Match(cmux.Any())
tlsConfig := tls.Config{
// Advertise that we support both http/1.1 and http2 for application level communication.
// By putting http/1.1 first, we ensure that HTTPS clients will use http/1.1, which is the only
// protocol our server supports for HTTPS clients. By including h2 in the list, we ensure that
// gRPC clients know we support http2 for their communication.
NextProtos: []string{"http/1.1", "h2"},
}
tlsConfig.GetCertificate = func(_ *tls.ClientHelloInfo) (*tls.Certificate, error) {
return server.settings.Certificate, nil
}
if server.TLSConfigCustomizer != nil {
server.TLSConfigCustomizer(&tlsConfig)
}
tlsl = tls.NewListener(tlsl, &tlsConfig)
// Now, we build another mux recursively to match HTTPS and gRPC.
tlsm = cmux.New(tlsl)
httpsL = tlsm.Match(cmux.HTTP1Fast("PATCH"))
grpcL = tlsm.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
}
// Start the muxed listeners for our servers
log.Infof("argocd %s serving on port %d (url: %s, tls: %v, namespace: %s, sso: %v)",
common.GetVersion(), server.ListenPort, server.settings.URL, server.useTLS(), server.Namespace, server.settings.IsSSOConfigured())
log.Infof("Enabled application namespace patterns: %s", server.allowedApplicationNamespacesAsString())
go func() { server.checkServeErr("grpcS", grpcS.Serve(grpcL)) }()
go func() { server.checkServeErr("httpS", httpS.Serve(httpL)) }()
if server.useTLS() {
go func() { server.checkServeErr("httpsS", httpsS.Serve(httpsL)) }()
go func() { server.checkServeErr("tlsm", tlsm.Serve()) }()
}
go server.watchSettings()
go server.rbacPolicyLoader(ctx)
go func() { server.checkServeErr("tcpm", tcpm.Serve()) }()
go func() { server.checkServeErr("metrics", metricsServ.Serve(listeners.Metrics)) }()
if !cache.WaitForCacheSync(ctx.Done(), server.projInformer.HasSynced, server.appInformer.HasSynced, server.clusterInformer.HasSynced) {
log.Fatal("Timed out waiting for project cache to sync")
}
shutdownFunc := func() {
log.Info("API Server shutdown initiated. Shutting down servers...")
server.available.Store(false)
shutdownCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
var wg gosync.WaitGroup
// Shutdown http server
wg.Go(func() {
err := httpS.Shutdown(shutdownCtx)
if err != nil {
log.Errorf("Error shutting down http server: %s", err)
}
})
if server.useTLS() {
// Shutdown https server
wg.Go(func() {
err := httpsS.Shutdown(shutdownCtx)
if err != nil {
log.Errorf("Error shutting down https server: %s", err)
}
})
}
// Shutdown gRPC server
wg.Go(func() {
grpcS.GracefulStop()
})
// Shutdown metrics server
wg.Go(func() {
err := metricsServ.Shutdown(shutdownCtx)
if err != nil {
log.Errorf("Error shutting down metrics server: %s", err)
}
})
if server.useTLS() {
// Shutdown tls server
wg.Go(func() {
tlsm.Close()
})
}
// Shutdown tcp server
wg.Go(func() {
tcpm.Close()
})
c := make(chan struct{})
// This goroutine will wait for all servers to conclude the shutdown
// process
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
log.Info("All servers were gracefully shutdown. Exiting...")
case <-shutdownCtx.Done():
log.Warn("Graceful shutdown timeout. Exiting...")
}
}
server.Shutdown = shutdownFunc
signal.Notify(server.stopCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
server.available.Store(true)
select {
case signal := <-server.stopCh:
log.Infof("API Server received signal: %s", signal.String())
gracefulRestartSignal := GracefulRestartSignal{}
if signal != gracefulRestartSignal {
server.terminateRequested.Store(true)
}
server.Shutdown()
case <-ctx.Done():
log.Infof("API Server: %s", ctx.Err())
server.terminateRequested.Store(true)
server.Shutdown()
}
}
func (server *ArgoCDServer) Initialized() bool {
return server.projInformer.HasSynced() && server.appInformer.HasSynced()
}
// TerminateRequested returns whether a shutdown was initiated by a signal or context cancel
// as opposed to a watch.
func (server *ArgoCDServer) TerminateRequested() bool {
return server.terminateRequested.Load()
}
// checkServeErr checks the error from a .Serve() call to decide if it was a graceful shutdown
func (server *ArgoCDServer) checkServeErr(name string, err error) {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Errorf("Error received from server %s: %v", name, err)
} else {
log.Infof("Graceful shutdown of %s initiated", name)
}
}
func checkOIDCConfigChange(currentOIDCConfig *settings_util.OIDCConfig, newArgoCDSettings *settings_util.ArgoCDSettings) bool {
newOIDCConfig := newArgoCDSettings.OIDCConfig()
if (currentOIDCConfig != nil && newOIDCConfig == nil) || (currentOIDCConfig == nil && newOIDCConfig != nil) {
return true
}
if currentOIDCConfig != nil && newOIDCConfig != nil {
if !reflect.DeepEqual(*currentOIDCConfig, *newOIDCConfig) {
return true
}
}
return false
}
// watchSettings watches the configmap and secret for any setting updates that would warrant a
// restart of the API server.
func (server *ArgoCDServer) watchSettings() {
updateCh := make(chan *settings_util.ArgoCDSettings, 1)
server.settingsMgr.Subscribe(updateCh)
prevURL := server.settings.URL
prevAdditionalURLs := server.settings.AdditionalURLs
prevOIDCConfig := server.settings.OIDCConfig()
prevDexCfgBytes, err := dexutil.GenerateDexConfigYAML(server.settings, server.DexTLSConfig == nil || server.DexTLSConfig.DisableTLS)
errorsutil.CheckError(err)
prevGitHubSecret := server.settings.GetWebhookGitHubSecret()
prevGitLabSecret := server.settings.GetWebhookGitLabSecret()
prevBitbucketUUID := server.settings.GetWebhookBitbucketUUID()
prevBitbucketServerSecret := server.settings.GetWebhookBitbucketServerSecret()
prevGogsSecret := server.settings.GetWebhookGogsSecret()
prevExtConfig := server.settings.ExtensionConfig
var prevCert, prevCertKey string
if server.settings.Certificate != nil && !server.Insecure {
prevCert, prevCertKey = tlsutil.EncodeX509KeyPairString(*server.settings.Certificate)
}
for {
newSettings := <-updateCh
server.settings = newSettings
newDexCfgBytes, err := dexutil.GenerateDexConfigYAML(server.settings, server.DexTLSConfig == nil || server.DexTLSConfig.DisableTLS)
errorsutil.CheckError(err)
if !bytes.Equal(newDexCfgBytes, prevDexCfgBytes) {
log.Infof("dex config modified. restarting")
break
}
if checkOIDCConfigChange(prevOIDCConfig, server.settings) {
log.Infof("oidc config modified. restarting")
break
}
if prevURL != server.settings.URL {
log.Infof("url modified. restarting")
break
}
if !reflect.DeepEqual(prevAdditionalURLs, server.settings.AdditionalURLs) {
log.Infof("additionalURLs modified. restarting")
break
}
if prevGitHubSecret != server.settings.GetWebhookGitHubSecret() {
log.Infof("github secret modified. restarting")
break
}
if prevGitLabSecret != server.settings.GetWebhookGitLabSecret() {
log.Infof("gitlab secret modified. restarting")
break
}
if prevBitbucketUUID != server.settings.GetWebhookBitbucketUUID() {
log.Infof("bitbucket uuid modified. restarting")
break
}
if prevBitbucketServerSecret != server.settings.GetWebhookBitbucketServerSecret() {
log.Infof("bitbucket server secret modified. restarting")
break
}
if prevGogsSecret != server.settings.GetWebhookGogsSecret() {
log.Infof("gogs secret modified. restarting")
break
}
if !reflect.DeepEqual(prevExtConfig, server.settings.ExtensionConfig) {
prevExtConfig = server.settings.ExtensionConfig
log.Infof("extensions configs modified. Updating proxy registry...")
err := server.extensionManager.UpdateExtensionRegistry(server.settings)
if err != nil {
log.Errorf("error updating extensions configs: %s", err)
} else {
log.Info("extensions configs updated successfully")
}
}
if !server.Insecure {
var newCert, newCertKey string
if server.settings.Certificate != nil {
newCert, newCertKey = tlsutil.EncodeX509KeyPairString(*server.settings.Certificate)
}
if newCert != prevCert || newCertKey != prevCertKey {
log.Infof("tls certificate modified. reloading certificate")
// No need to break out of this loop since TlsConfig.GetCertificate will automagically reload the cert.
}
}
}
log.Info("shutting down settings watch")
server.settingsMgr.Unsubscribe(updateCh)
close(updateCh)
// Triggers server restart
server.stopCh <- GracefulRestartSignal{}
}
func (server *ArgoCDServer) rbacPolicyLoader(ctx context.Context) {
err := server.enf.RunPolicyLoader(ctx, func(cm *corev1.ConfigMap) error {
var scopes []string
if scopesStr, ok := cm.Data[rbac.ConfigMapScopesKey]; scopesStr != "" && ok {
scopes = make([]string, 0)
err := yaml.Unmarshal([]byte(scopesStr), &scopes)
if err != nil {
return fmt.Errorf("error unmarshalling scopes: %w", err)
}
}
server.policyEnforcer.SetScopes(scopes)
return nil
})
errorsutil.CheckError(err)
}
func (server *ArgoCDServer) useTLS() bool {
if server.Insecure || server.settings.Certificate == nil {
return false
}
return true
}
func (server *ArgoCDServer) newGRPCServer(prometheusRegistry *prometheus.Registry) (*grpc.Server, application.AppResourceTreeFn) {
var serverMetricsOptions []grpc_prometheus.ServerMetricsOption
if enableGRPCTimeHistogram {
serverMetricsOptions = append(serverMetricsOptions, grpc_prometheus.WithServerHandlingTimeHistogram())
}
serverMetrics := grpc_prometheus.NewServerMetrics(serverMetricsOptions...)
prometheusRegistry.MustRegister(serverMetrics)
sOpts := []grpc.ServerOption{
// Set the both send and receive the bytes limit to be 100MB
// The proper way to achieve high performance is to have pagination
// while we work toward that, we can have high limit first
grpc.MaxRecvMsgSize(apiclient.MaxGRPCMessageSize),
grpc.MaxSendMsgSize(apiclient.MaxGRPCMessageSize),
grpc.ConnectionTimeout(300 * time.Second),
grpc.KeepaliveEnforcementPolicy(
keepalive.EnforcementPolicy{
MinTime: common.GetGRPCKeepAliveEnforcementMinimum(),
},
),
}
sensitiveMethods := map[string]bool{
"/cluster.ClusterService/Create": true,
"/cluster.ClusterService/Update": true,
"/session.SessionService/Create": true,
"/account.AccountService/UpdatePassword": true,
"/gpgkey.GPGKeyService/CreateGnuPGPublicKey": true,
"/repository.RepositoryService/Create": true,
"/repository.RepositoryService/Update": true,
"/repository.RepositoryService/CreateRepository": true,
"/repository.RepositoryService/UpdateRepository": true,
"/repository.RepositoryService/ValidateAccess": true,
"/repocreds.RepoCredsService/CreateRepositoryCredentials": true,
"/repocreds.RepoCredsService/UpdateRepositoryCredentials": true,
"/repository.RepositoryService/CreateWriteRepository": true,
"/repository.RepositoryService/UpdateWriteRepository": true,
"/repository.RepositoryService/ValidateWriteAccess": true,
"/repocreds.RepoCredsService/CreateWriteRepositoryCredentials": true,
"/repocreds.RepoCredsService/UpdateWriteRepositoryCredentials": true,
"/application.ApplicationService/PatchResource": true,
// Remove from logs both because the contents are sensitive and because they may be very large.
"/application.ApplicationService/GetManifestsWithFiles": true,
}
// NOTE: notice we do not configure the gRPC server here with TLS (e.g. grpc.Creds(creds))
// This is because TLS handshaking occurs in cmux handling
sOpts = append(sOpts, grpc.ChainStreamInterceptor(
logging.StreamServerInterceptor(grpc_util.InterceptorLogger(server.log)),
serverMetrics.StreamServerInterceptor(),
grpc_auth.StreamServerInterceptor(server.Authenticate),
grpc_util.UserAgentStreamServerInterceptor(common.ArgoCDUserAgentName, clientConstraint),
grpc_util.PayloadStreamServerInterceptor(server.log, true, func(_ context.Context, c interceptors.CallMeta) bool {
return !sensitiveMethods[c.FullMethod()]
}),
grpc_util.ErrorCodeK8sStreamServerInterceptor(),
grpc_util.ErrorCodeGitStreamServerInterceptor(),
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpc_util.LoggerRecoveryHandler(server.log))),
))
sOpts = append(sOpts, grpc.ChainUnaryInterceptor(
bug21955WorkaroundInterceptor,
logging.UnaryServerInterceptor(grpc_util.InterceptorLogger(server.log)),
serverMetrics.UnaryServerInterceptor(),
grpc_auth.UnaryServerInterceptor(server.Authenticate),
grpc_util.UserAgentUnaryServerInterceptor(common.ArgoCDUserAgentName, clientConstraint),
grpc_util.PayloadUnaryServerInterceptor(server.log, true, func(_ context.Context, c interceptors.CallMeta) bool {
return !sensitiveMethods[c.FullMethod()]
}),
grpc_util.ErrorCodeK8sUnaryServerInterceptor(),
grpc_util.ErrorCodeGitUnaryServerInterceptor(),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpc_util.LoggerRecoveryHandler(server.log))),
))
sOpts = append(sOpts, grpc.StatsHandler(otelgrpc.NewServerHandler()))
grpcS := grpc.NewServer(sOpts...)
healthService := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcS, healthService)
versionpkg.RegisterVersionServiceServer(grpcS, server.serviceSet.VersionService)
clusterpkg.RegisterClusterServiceServer(grpcS, server.serviceSet.ClusterService)
applicationpkg.RegisterApplicationServiceServer(grpcS, server.serviceSet.ApplicationService)
applicationsetpkg.RegisterApplicationSetServiceServer(grpcS, server.serviceSet.ApplicationSetService)
notificationpkg.RegisterNotificationServiceServer(grpcS, server.serviceSet.NotificationService)
repositorypkg.RegisterRepositoryServiceServer(grpcS, server.serviceSet.RepoService)
repocredspkg.RegisterRepoCredsServiceServer(grpcS, server.serviceSet.RepoCredsService)
sessionpkg.RegisterSessionServiceServer(grpcS, server.serviceSet.SessionService)
settingspkg.RegisterSettingsServiceServer(grpcS, server.serviceSet.SettingsService)
projectpkg.RegisterProjectServiceServer(grpcS, server.serviceSet.ProjectService)
accountpkg.RegisterAccountServiceServer(grpcS, server.serviceSet.AccountService)
certificatepkg.RegisterCertificateServiceServer(grpcS, server.serviceSet.CertificateService)
gpgkeypkg.RegisterGPGKeyServiceServer(grpcS, server.serviceSet.GpgkeyService)
// Register reflection service on gRPC server.
reflection.Register(grpcS)
serverMetrics.InitializeMetrics(grpcS)
errorsutil.CheckError(server.serviceSet.ProjectService.NormalizeProjs())
return grpcS, server.serviceSet.AppResourceTreeFn
}
type ArgoCDServiceSet struct {
ClusterService *cluster.Server
RepoService *repository.Server
RepoCredsService *repocreds.Server
SessionService *session.Server
ApplicationService applicationpkg.ApplicationServiceServer
AppResourceTreeFn application.AppResourceTreeFn
ApplicationSetService applicationsetpkg.ApplicationSetServiceServer
ProjectService *project.Server
SettingsService *settings.Server
AccountService *account.Server
NotificationService notificationpkg.NotificationServiceServer
CertificateService *certificate.Server
GpgkeyService *gpgkey.Server
VersionService *version.Server
}
func newArgoCDServiceSet(a *ArgoCDServer) *ArgoCDServiceSet {
kubectl := kubeutil.NewKubectl()
clusterService := cluster.NewServer(a.db, a.enf, a.Cache, kubectl)
repoService := repository.NewServer(a.RepoClientset, a.db, a.enf, a.Cache, a.appLister, a.projInformer, a.Namespace, a.settingsMgr, a.HydratorEnabled)
repoCredsService := repocreds.NewServer(a.db, a.enf)
var loginRateLimiter func() (utilio.Closer, error)
if maxConcurrentLoginRequestsCount > 0 {
loginRateLimiter = session.NewLoginRateLimiter(maxConcurrentLoginRequestsCount)
}
sessionService := session.NewServer(a.sessionMgr, a.settingsMgr, a, a.policyEnforcer, loginRateLimiter)
projectLock := sync.NewKeyLock()
applicationService, appResourceTreeFn := application.NewServer(
a.Namespace,
a.KubeClientset,
a.AppClientset,
a.appLister,
a.appInformer,
nil,
a.RepoClientset,
a.Cache,
kubectl,
a.db,
a.enf,
projectLock,
a.settingsMgr,
a.projInformer,
a.ApplicationNamespaces,
a.EnableK8sEvent,
a.SyncWithReplaceAllowed,
)
applicationSetService := applicationset.NewServer(
a.db,
a.KubeClientset,
a.DynamicClientset,
a.KubeControllerClientset,
a.enf,
a.RepoClientset,
a.AppClientset,
a.appsetInformer,
a.appsetLister,
a.Namespace,
projectLock,
a.ApplicationNamespaces,
a.GitSubmoduleEnabled,
a.EnableNewGitFileGlobbing,
a.ScmRootCAPath,
a.AllowedScmProviders,
a.EnableScmProviders,
a.EnableGitHubAPIMetrics,
a.EnableK8sEvent,
a.clusterInformer,
)
projectService := project.NewServer(a.Namespace, a.KubeClientset, a.AppClientset, a.enf, projectLock, a.sessionMgr, a.policyEnforcer, a.projInformer, a.settingsMgr, a.db, a.EnableK8sEvent)
appsInAnyNamespaceEnabled := len(a.ApplicationNamespaces) > 0
settingsService := settings.NewServer(a.settingsMgr, a.RepoClientset, a, a.DisableAuth, appsInAnyNamespaceEnabled, a.HydratorEnabled, a.SyncWithReplaceAllowed)
accountService := account.NewServer(a.sessionMgr, a.settingsMgr, a.enf)
notificationService := notification.NewServer(a.apiFactory)
certificateService := certificate.NewServer(a.db, a.enf)
gpgkeyService := gpgkey.NewServer(a.db, a.enf)
versionService := version.NewServer(a, func() (bool, error) {
if a.DisableAuth {
return true, nil
}
sett, err := a.settingsMgr.GetSettings()
if err != nil {
return false, err
}
return sett.AnonymousUserEnabled, err
})
return &ArgoCDServiceSet{
ClusterService: clusterService,
RepoService: repoService,
RepoCredsService: repoCredsService,
SessionService: sessionService,
ApplicationService: applicationService,
AppResourceTreeFn: appResourceTreeFn,
ApplicationSetService: applicationSetService,
ProjectService: projectService,
SettingsService: settingsService,
AccountService: accountService,
NotificationService: notificationService,
CertificateService: certificateService,
GpgkeyService: gpgkeyService,
VersionService: versionService,
}
}
// translateGrpcCookieHeader conditionally sets a cookie on the response.
func (server *ArgoCDServer) translateGrpcCookieHeader(ctx context.Context, w http.ResponseWriter, resp golang_proto.Message) error {
if sessionResp, ok := resp.(*sessionpkg.SessionResponse); ok {
token := sessionResp.Token
err := server.setTokenCookie(token, w)
if err != nil {
return fmt.Errorf("error setting token cookie from session response: %w", err)
}
} else if md, ok := runtime.ServerMetadataFromContext(ctx); ok {
renewToken := md.HeaderMD[renewTokenKey]
if len(renewToken) > 0 {
return server.setTokenCookie(renewToken[0], w)
}
}
return nil
}
func (server *ArgoCDServer) setTokenCookie(token string, w http.ResponseWriter) error {
return httputil.SetTokenCookie(token, server.BaseHRef, !server.Insecure, w)
}
func withRootPath(handler http.Handler, a *ArgoCDServer) http.Handler {
// If RootPath is empty, directly return the original handler
if a.RootPath == "" {
return handler
}
// get rid of slashes
root := strings.Trim(a.RootPath, "/")
mux := http.NewServeMux()
mux.Handle("/"+root+"/", http.StripPrefix("/"+root, handler))
healthz.ServeHealthCheck(mux, a.healthCheck)
return mux
}
func compressHandler(handler http.Handler) http.Handler {
compr := handlers.CompressHandler(handler)
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
if request.Header.Get("Accept") == "text/event-stream" {
handler.ServeHTTP(writer, request)
} else {
compr.ServeHTTP(writer, request)
}
})
}
// newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented
// using grpc-gateway as a proxy to the gRPC server.
func (server *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandler http.Handler, appResourceTreeFn application.AppResourceTreeFn, conn *grpc.ClientConn, metricsReg HTTPMetricsRegistry) *http.Server {
endpoint := fmt.Sprintf("localhost:%d", port)
mux := http.NewServeMux()
httpS := http.Server{
Addr: endpoint,
Handler: &handlerSwitcher{
handler: mux,
urlToHandler: map[string]http.Handler{
"/api/badge": otelhttp.NewHandler(badge.NewHandler(server.AppClientset, server.settingsMgr, server.Namespace, server.ApplicationNamespaces), "server.ArgoCDServer/badge"),
common.LogoutEndpoint: otelhttp.NewHandler(logout.NewHandler(server.settingsMgr, server.sessionMgr, server.RootPath, server.BaseHRef), "server.ArgoCDServer/logout"),
},
contentTypeToHandler: map[string]http.Handler{
"application/grpc-web+proto": grpcWebHandler,
},
},
}
// HTTP 1.1+JSON Server
// grpc-ecosystem/grpc-gateway is used to proxy HTTP requests to the corresponding gRPC call
// NOTE: if a marshaller option is not supplied, grpc-gateway will default to the jsonpb from
// golang/protobuf. Which does not support types such as time.Time. gogo/protobuf does support
// time.Time, but does not support custom UnmarshalJSON() and MarshalJSON() methods. Therefore
// we use our own Marshaler
gwMuxOpts := runtime.WithMarshalerOption(runtime.MIMEWildcard, new(grpc_util.JSONMarshaler))
gwCookieOpts := runtime.WithForwardResponseOption(server.translateGrpcCookieHeader)
gwmux := runtime.NewServeMux(gwMuxOpts, gwCookieOpts)
var handler http.Handler = gwmux
if server.EnableGZip {
handler = compressHandler(handler)
}
// withTracingHandler is a middleware that extracts OpenTelemetry trace context from HTTP headers
// and injects it into the request context. This enables trace context propagation from HTTP clients
// to gRPC services, allowing for better distributed tracing across the ArgoCD server.
withTracingHandler := func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
propagator := otel.GetTextMapPropagator()
ctx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
h.ServeHTTP(w, r.WithContext(ctx))
})
}
handler = withTracingHandler(handler)
if len(server.ContentTypes) > 0 {
handler = enforceContentTypes(handler, server.ContentTypes)
} else {
log.WithField(common.SecurityField, common.SecurityHigh).Warnf("Content-Type enforcement is disabled, which may make your API vulnerable to CSRF attacks")
}
mux.Handle("/api/", handler)
terminalOpts := application.TerminalOptions{DisableAuth: server.DisableAuth, Enf: server.enf}
terminal := application.NewHandler(server.appLister, server.Namespace, server.ApplicationNamespaces, server.db, appResourceTreeFn, server.settings.ExecShells, server.sessionMgr, &terminalOpts).
WithFeatureFlagMiddleware(server.settingsMgr.GetSettings)
th := util_session.WithAuthMiddleware(server.DisableAuth, server.settings.IsSSOConfigured(), server.ssoClientApp, server.sessionMgr, terminal)
mux.Handle("/terminal", th)
// Proxy extension is currently an alpha feature and is disabled
// by default.
if server.EnableProxyExtension {
// API server won't panic if extensions fail to register. In
// this case an error log will be sent and no extension route
// will be added in mux.
registerExtensions(mux, server, metricsReg)
}
mustRegisterGWHandler(ctx, versionpkg.RegisterVersionServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, clusterpkg.RegisterClusterServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, applicationpkg.RegisterApplicationServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, applicationsetpkg.RegisterApplicationSetServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, notificationpkg.RegisterNotificationServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, repositorypkg.RegisterRepositoryServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, repocredspkg.RegisterRepoCredsServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, sessionpkg.RegisterSessionServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, settingspkg.RegisterSettingsServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, projectpkg.RegisterProjectServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, accountpkg.RegisterAccountServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, certificatepkg.RegisterCertificateServiceHandler, gwmux, conn)
mustRegisterGWHandler(ctx, gpgkeypkg.RegisterGPGKeyServiceHandler, gwmux, conn)
// Swagger UI
swagger.ServeSwaggerUI(mux, assets.SwaggerJSON, "/swagger-ui", server.RootPath)
healthz.ServeHealthCheck(mux, server.healthCheck)
// Dex reverse proxy and OAuth2 login/callback
server.registerDexHandlers(mux)
// Webhook handler for git events (Note: cache timeouts are hardcoded because API server does not write to cache and not really using them)
argoDB := db.NewDB(server.Namespace, server.settingsMgr, server.KubeClientset)
acdWebhookHandler := webhook.NewHandler(server.Namespace, server.ApplicationNamespaces, server.WebhookParallelism, server.AppClientset, server.appLister, server.settings, server.settingsMgr, server.RepoServerCache, server.Cache, argoDB, server.settingsMgr.GetMaxWebhookPayloadSize())
mux.HandleFunc("/api/webhook", acdWebhookHandler.Handler)
// Serve cli binaries directly from API server
registerDownloadHandlers(mux, "/download")
// Serve extensions
extensionsSharedPath := "/tmp/extensions/"
var extensionsHandler http.Handler = http.HandlerFunc(func(writer http.ResponseWriter, _ *http.Request) {
server.serveExtensions(extensionsSharedPath, writer)
})
if server.EnableGZip {
extensionsHandler = compressHandler(extensionsHandler)
}
mux.Handle("/extensions.js", extensionsHandler)
// Serve UI static assets
var assetsHandler http.Handler = http.HandlerFunc(server.newStaticAssetsHandler())
if server.EnableGZip {
assetsHandler = compressHandler(assetsHandler)
}
mux.Handle("/", assetsHandler)
return &httpS
}
func enforceContentTypes(handler http.Handler, types []string) http.Handler {
allowedTypes := map[string]bool{}
for _, t := range types {
allowedTypes[strings.ToLower(t)] = true
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet || allowedTypes[strings.ToLower(r.Header.Get("Content-Type"))] {
handler.ServeHTTP(w, r)
} else {
http.Error(w, "Invalid content type", http.StatusUnsupportedMediaType)
}
})
}
// registerExtensions will try to register all configured extensions
// in the given mux. If any error is returned while registering
// extensions handlers, no route will be added in the given mux.
func registerExtensions(mux *http.ServeMux, a *ArgoCDServer, metricsReg HTTPMetricsRegistry) {
a.log.Info("Registering extensions...")
extHandler := http.HandlerFunc(a.extensionManager.CallExtension())
authMiddleware := a.sessionMgr.AuthMiddlewareFunc(a.DisableAuth, a.settings.IsSSOConfigured(), a.ssoClientApp)
// auth middleware ensures that requests to all extensions are authenticated first
mux.Handle(extension.URLPrefix+"/", otelhttp.NewHandler(authMiddleware(extHandler), "server.ArgoCDServer/extensions"))
a.extensionManager.AddMetricsRegistry(metricsReg)
err := a.extensionManager.RegisterExtensions()
if err != nil {
a.log.Errorf("Error registering extensions: %s", err)
}
}
var extensionsPattern = regexp.MustCompile(`^extension(.*)\.js$`)
func (server *ArgoCDServer) serveExtensions(extensionsSharedPath string, w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/javascript")
err := filepath.Walk(extensionsSharedPath, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("failed to iterate files in '%s': %w", extensionsSharedPath, err)
}
if !files.IsSymlink(info) && !info.IsDir() && extensionsPattern.MatchString(info.Name()) {
processFile := func() error {
if _, err = fmt.Fprintf(w, "// source: %s/%s \n", filePath, info.Name()); err != nil {
return fmt.Errorf("failed to write to response: %w", err)
}
f, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file '%s': %w", filePath, err)
}
defer utilio.Close(f)
if _, err := goio.Copy(w, f); err != nil {
return fmt.Errorf("failed to copy file '%s': %w", filePath, err)
}
return nil
}
if processFile() != nil {
return fmt.Errorf("failed to serve extension file '%s': %w", filePath, processFile())
}
}
return nil
})
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorf("Failed to walk extensions directory: %v", err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
}
// registerDexHandlers will register dex HTTP handlers
func (server *ArgoCDServer) registerDexHandlers(mux *http.ServeMux) {
if !server.settings.IsSSOConfigured() {
return
}
// Run dex OpenID Connect Identity Provider behind a reverse proxy (served at /api/dex)
mux.Handle(common.DexAPIEndpoint+"/", otelhttp.NewHandler(http.HandlerFunc(dexutil.NewDexHTTPReverseProxy(server.DexServerAddr, server.BaseHRef, server.DexTLSConfig)), "server.dex/Proxy"))
mux.Handle(common.LoginEndpoint, otelhttp.NewHandler(http.HandlerFunc(server.ssoClientApp.HandleLogin), "server.ClientApp/HandleLogin"))
mux.Handle(common.CallbackEndpoint, otelhttp.NewHandler(http.HandlerFunc(server.ssoClientApp.HandleCallback), "server.ClientApp/HandleCallback"))
}
// newRedirectServer returns an HTTP server which does a 307 redirect to the HTTPS server
func newRedirectServer(port int, rootPath string) *http.Server {
var addr string
if rootPath == "" {
addr = fmt.Sprintf("localhost:%d", port)
} else {
addr = fmt.Sprintf("localhost:%d/%s", port, strings.Trim(rootPath, "/"))
}
return &http.Server{
Addr: addr,
Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
target := "https://" + req.Host
if rootPath != "" {
root := strings.Trim(rootPath, "/")
prefix := "/" + root
// If the request path already starts with rootPath, no need to add rootPath again
if strings.HasPrefix(req.URL.Path, prefix) {
target += req.URL.Path
} else {
target += prefix + req.URL.Path
}
} else {
target += req.URL.Path
}
if req.URL.RawQuery != "" {
target += "?" + req.URL.RawQuery
}
http.Redirect(w, req, target, http.StatusTemporaryRedirect)
}),
}
}
// registerDownloadHandlers registers HTTP handlers to support downloads directly from the API server
// (e.g. argocd CLI)
func registerDownloadHandlers(mux *http.ServeMux, base string) {
linuxPath, err := exec.LookPath("argocd")
if err != nil {
log.Warnf("argocd not in PATH")
} else {
mux.HandleFunc(base+"/argocd-linux-"+go_runtime.GOARCH, func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, linuxPath)
})
}
}
func (server *ArgoCDServer) getIndexData() ([]byte, error) {
server.indexDataInit.Do(func() {
data, err := ui.Embedded.ReadFile("dist/app/index.html")
if err != nil {
server.indexDataErr = err
return
}
if server.BaseHRef == "/" || server.BaseHRef == "" {
server.indexData = data
} else {
server.indexData = []byte(replaceBaseHRef(string(data), fmt.Sprintf(`<base href="/%s/">`, strings.Trim(server.BaseHRef, "/"))))
}
})
return server.indexData, server.indexDataErr
}
func (server *ArgoCDServer) uiAssetExists(filename string) bool {
f, err := server.staticAssets.Open(strings.Trim(filename, "/"))
if err != nil {
return false
}
defer utilio.Close(f)
stat, err := f.Stat()
if err != nil {
return false
}
return !stat.IsDir()
}
// newStaticAssetsHandler returns an HTTP handler to serve UI static assets
func (server *ArgoCDServer) newStaticAssetsHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
acceptHTML := false
for acceptType := range strings.SplitSeq(r.Header.Get("Accept"), ",") {
if acceptType == "text/html" || acceptType == "html" {
acceptHTML = true
break
}
}
fileRequest := r.URL.Path != "/index.html" && server.uiAssetExists(r.URL.Path)
// Set X-Frame-Options according to configuration
if server.XFrameOptions != "" {
w.Header().Set("X-Frame-Options", server.XFrameOptions)
}
// Set Content-Security-Policy according to configuration
if server.ContentSecurityPolicy != "" {
w.Header().Set("Content-Security-Policy", server.ContentSecurityPolicy)
}
w.Header().Set("X-XSS-Protection", "1")
// serve index.html for non file requests to support HTML5 History API
if acceptHTML && !fileRequest && (r.Method == http.MethodGet || r.Method == http.MethodHead) {
for k, v := range noCacheHeaders {
w.Header().Set(k, v)
}
data, err := server.getIndexData()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
modTime, err := time.Parse(common.GetVersion().BuildDate, time.RFC3339)
if err != nil {
modTime = time.Now()
}
http.ServeContent(w, r, "index.html", modTime, utilio.NewByteReadSeeker(data))
} else {
if isMainJsBundle(r.URL) {
cacheControl := "public, max-age=31536000, immutable"
if !fileRequest {
cacheControl = "no-cache"
}
w.Header().Set("Cache-Control", cacheControl)
}
http.FileServer(server.staticAssets).ServeHTTP(w, r)
}
}
}
var mainJsBundleRegex = regexp.MustCompile(`^main\.[0-9a-f]{20}\.js$`)
func isMainJsBundle(url *url.URL) bool {
filename := path.Base(url.Path)
return mainJsBundleRegex.MatchString(filename)
}
type registerFunc func(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error
// mustRegisterGWHandler is a convenience function to register a gateway handler
func mustRegisterGWHandler(ctx context.Context, register registerFunc, mux *runtime.ServeMux, conn *grpc.ClientConn) {
err := register(ctx, mux, conn)
if err != nil {
panic(err)
}
}
func replaceBaseHRef(data string, replaceWith string) string {
return baseHRefRegex.ReplaceAllString(data, replaceWith)
}
// Authenticate checks for the presence of a valid token when accessing server-side resources.
func (server *ArgoCDServer) Authenticate(ctx context.Context) (context.Context, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, "server.ArgoCDServer.Authenticate")
defer span.End()
if server.DisableAuth {
return ctx, nil
}
claims, newToken, claimsErr := server.getClaims(ctx)
if claims != nil {
// Add claims to the context to inspect for RBAC
//nolint:staticcheck
ctx = context.WithValue(ctx, "claims", claims)
if newToken != "" {
// Session tokens that are expiring soon should be regenerated if user stays active.
// The renewed token is stored in outgoing ServerMetadata. Metadata is available to grpc-gateway
// response forwarder that will translate it into Set-Cookie header.
if err := grpc.SendHeader(ctx, metadata.New(map[string]string{renewTokenKey: newToken})); err != nil {
log.Warnf("Failed to set %s header", renewTokenKey)
}
}
}
if claimsErr != nil {
//nolint:staticcheck
ctx = context.WithValue(ctx, util_session.AuthErrorCtxKey, claimsErr)
}
if claimsErr != nil {
argoCDSettings, err := server.settingsMgr.GetSettings()
if err != nil {
return ctx, status.Errorf(codes.Internal, "unable to load settings: %v", err)
}
if !argoCDSettings.AnonymousUserEnabled {
return ctx, claimsErr
}
//nolint:staticcheck
ctx = context.WithValue(ctx, "claims", "")
}
return ctx, nil
}
// getClaims extracts, validates and refreshes a JWT token from an incoming request context.
func (server *ArgoCDServer) getClaims(ctx context.Context) (jwt.Claims, string, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, "server.ArgoCDServer.getClaims")
defer span.End()
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
span.SetStatus(otel_codes.Error, ErrNoSession.Error())
return nil, "", ErrNoSession
}
tokenString := getToken(md)
if tokenString == "" {
span.SetStatus(otel_codes.Error, ErrNoSession.Error())
return nil, "", ErrNoSession
}
// A valid argocd-issued token is automatically refreshed here prior to expiration.
// OIDC tokens will be verified but will not be refreshed here.
claims, newToken, err := server.sessionMgr.VerifyToken(ctx, tokenString)
if err != nil {
span.SetStatus(otel_codes.Error, err.Error())
return claims, "", status.Errorf(codes.Unauthenticated, "invalid session: %v", err)
}
finalClaims := claims
if server.settings.IsSSOConfigured() {
updatedClaims, err := server.ssoClientApp.SetGroupsFromUserInfo(ctx, claims, util_session.SessionManagerClaimsIssuer)
if err != nil {
return claims, "", status.Errorf(codes.Unauthenticated, "invalid session: %v", err)
}
finalClaims = updatedClaims
// OIDC tokens are automatically refreshed here prior to expiration
refreshedToken, err := server.ssoClientApp.CheckAndRefreshToken(ctx, updatedClaims, server.settings.OIDCRefreshTokenThreshold)
if err != nil {
log.Errorf("error checking and refreshing token: %v", err)
}
if refreshedToken != "" && refreshedToken != tokenString {
newToken = refreshedToken
log.Infof("refreshed token for subject: %v", jwtutil.StringField(updatedClaims, "sub"))
}
}
return finalClaims, newToken, nil
}
// getToken extracts the token from gRPC metadata or cookie headers
func getToken(md metadata.MD) string {
// check the "token" metadata
{
tokens, ok := md[apiclient.MetaDataTokenKey]
if ok && len(tokens) > 0 {
return tokens[0]
}
}
// looks for the HTTP header `Authorization: Bearer ...`
// argocd prefers bearer token over cookie
for _, t := range md["authorization"] {
token := strings.TrimPrefix(t, "Bearer ")
if strings.HasPrefix(t, "Bearer ") && jwtutil.IsValid(token) {
return token
}
}
// check the HTTP cookie
for _, t := range md["grpcgateway-cookie"] {
header := http.Header{}
header.Add("Cookie", t)
request := http.Request{Header: header}
token, err := httputil.JoinCookies(common.AuthCookieName, request.Cookies())
if err == nil && jwtutil.IsValid(token) {
return token
}
}
return ""
}
type handlerSwitcher struct {
handler http.Handler
urlToHandler map[string]http.Handler
contentTypeToHandler map[string]http.Handler
}
func (s *handlerSwitcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if urlHandler, ok := s.urlToHandler[r.URL.Path]; ok {
urlHandler.ServeHTTP(w, r)
} else if contentHandler, ok := s.contentTypeToHandler[r.Header.Get("content-type")]; ok {
contentHandler.ServeHTTP(w, r)
} else {
s.handler.ServeHTTP(w, r)
}
}
// Workaround for https://github.com/golang/go/issues/21955 to support escaped URLs in URL path.
type bug21955Workaround struct {
handler http.Handler
}
var pathPatters = []*regexp.Regexp{
regexp.MustCompile(`/api/v1/clusters/[^/]+`),
regexp.MustCompile(`/api/v1/repositories/[^/]+`),
regexp.MustCompile(`/api/v1/repocreds/[^/]+`),
regexp.MustCompile(`/api/v1/repositories/[^/]+/apps`),
regexp.MustCompile(`/api/v1/repositories/[^/]+/apps/[^/]+`),
regexp.MustCompile(`/settings/clusters/[^/]+`),
}
func (bf *bug21955Workaround) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for _, pattern := range pathPatters {
if pattern.MatchString(r.URL.RawPath) {
r.URL.Path = r.URL.RawPath
break
}
}
bf.handler.ServeHTTP(w, r)
}
func bug21955WorkaroundInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
switch req := req.(type) {
case *repositorypkg.RepoQuery:
repo, err := url.QueryUnescape(req.Repo)
if err != nil {
return nil, err
}
req.Repo = repo
case *repositorypkg.RepoAppsQuery:
repo, err := url.QueryUnescape(req.Repo)
if err != nil {
return nil, err
}
req.Repo = repo
case *repositorypkg.RepoAppDetailsQuery:
repo, err := url.QueryUnescape(req.Source.RepoURL)
if err != nil {
return nil, err
}
req.Source.RepoURL = repo
case *repositorypkg.RepoUpdateRequest:
repo, err := url.QueryUnescape(req.Repo.Repo)
if err != nil {
return nil, err
}
req.Repo.Repo = repo
case *repocredspkg.RepoCredsQuery:
pattern, err := url.QueryUnescape(req.Url)
if err != nil {
return nil, err
}
req.Url = pattern
case *repocredspkg.RepoCredsDeleteRequest:
pattern, err := url.QueryUnescape(req.Url)
if err != nil {
return nil, err
}
req.Url = pattern
case *clusterpkg.ClusterQuery:
if req.Id != nil {
val, err := url.QueryUnescape(req.Id.Value)
if err != nil {
return nil, err
}
req.Id.Value = val
}
case *clusterpkg.ClusterUpdateRequest:
if req.Id != nil {
val, err := url.QueryUnescape(req.Id.Value)
if err != nil {
return nil, err
}
req.Id.Value = val
}
}
return handler(ctx, req)
}
// allowedApplicationNamespacesAsString returns a string containing comma-separated list
// of allowed application namespaces
func (server *ArgoCDServer) allowedApplicationNamespacesAsString() string {
ns := server.Namespace
if len(server.ApplicationNamespaces) > 0 {
ns += ", "
ns += strings.Join(server.ApplicationNamespaces, ", ")
}
return ns
}