Files
argo-cd/util/cache/redis.go
Papapetrou Patroklos b2df60414c chore: bumps go redis client 9.17.2 (#25643)
Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Patroklos Papapetrou <ppapapetrou76@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-17 09:25:04 +02:00

235 lines
5.6 KiB
Go

package cache
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
utilio "github.com/argoproj/argo-cd/v3/util/io"
rediscache "github.com/go-redis/cache/v9"
"github.com/redis/go-redis/v9"
)
type RedisCompressionType string
var (
RedisCompressionNone RedisCompressionType = "none"
RedisCompressionGZip RedisCompressionType = "gzip"
)
func CompressionTypeFromString(s string) (RedisCompressionType, error) {
switch s {
case string(RedisCompressionNone):
return RedisCompressionNone, nil
case string(RedisCompressionGZip):
return RedisCompressionGZip, nil
}
return "", fmt.Errorf("unknown compression type: %s", s)
}
func NewRedisCache(client *redis.Client, expiration time.Duration, compressionType RedisCompressionType) CacheClient {
return &redisCache{
client: client,
expiration: expiration,
cache: rediscache.New(&rediscache.Options{Redis: client}),
redisCompressionType: compressionType,
}
}
// compile-time validation of adherence of the CacheClient contract
var _ CacheClient = &redisCache{}
type redisCache struct {
expiration time.Duration
client *redis.Client
cache *rediscache.Cache
redisCompressionType RedisCompressionType
}
func (r *redisCache) getKey(key string) string {
switch r.redisCompressionType {
case RedisCompressionGZip:
return key + ".gz"
default:
return key
}
}
func (r *redisCache) marshal(obj any) ([]byte, error) {
buf := bytes.NewBuffer([]byte{})
var w io.Writer = buf
if r.redisCompressionType == RedisCompressionGZip {
w = gzip.NewWriter(buf)
}
encoder := json.NewEncoder(w)
if err := encoder.Encode(obj); err != nil {
return nil, err
}
if flusher, ok := w.(interface{ Flush() error }); ok {
if err := flusher.Flush(); err != nil {
return nil, err
}
}
if closer, ok := w.(interface{ Close() error }); ok {
if err := closer.Close(); err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}
func (r *redisCache) unmarshal(data []byte, obj any) error {
buf := bytes.NewReader(data)
var reader io.Reader = buf
if r.redisCompressionType == RedisCompressionGZip {
gzipReader, err := gzip.NewReader(buf)
if err != nil {
return err
}
reader = gzipReader
}
if err := json.NewDecoder(reader).Decode(obj); err != nil {
return fmt.Errorf("failed to decode cached data: %w", err)
}
return nil
}
func (r *redisCache) Rename(oldKey string, newKey string, _ time.Duration) error {
err := r.client.Rename(context.TODO(), r.getKey(oldKey), r.getKey(newKey)).Err()
if err != nil && err.Error() == "ERR no such key" {
err = ErrCacheMiss
}
return err
}
func (r *redisCache) Set(item *Item) error {
expiration := item.CacheActionOpts.Expiration
if expiration == 0 {
expiration = r.expiration
}
val, err := r.marshal(item.Object)
if err != nil {
return err
}
return r.cache.Set(&rediscache.Item{
Key: r.getKey(item.Key),
Value: val,
TTL: expiration,
SetNX: item.CacheActionOpts.DisableOverwrite,
})
}
func (r *redisCache) Get(key string, obj any) error {
var data []byte
err := r.cache.Get(context.TODO(), r.getKey(key), &data)
if errors.Is(err, rediscache.ErrCacheMiss) {
err = ErrCacheMiss
}
if err != nil {
return err
}
return r.unmarshal(data, obj)
}
func (r *redisCache) Delete(key string) error {
return r.cache.Delete(context.TODO(), r.getKey(key))
}
func (r *redisCache) OnUpdated(ctx context.Context, key string, callback func() error) error {
pubsub := r.client.Subscribe(ctx, key)
defer utilio.Close(pubsub)
ch := pubsub.Channel()
for {
select {
case <-ctx.Done():
return nil
case <-ch:
if err := callback(); err != nil {
return err
}
}
}
}
func (r *redisCache) NotifyUpdated(key string) error {
return r.client.Publish(context.TODO(), key, "").Err()
}
type MetricsRegistry interface {
IncRedisRequest(failed bool)
ObserveRedisRequestDuration(duration time.Duration)
}
type redisHook struct {
registry MetricsRegistry
}
// ignoredRedisCommandNames are commands that go-redis may issue during connection setup / bookkeeping
// and which we don't want to count as application-level requests in metrics.
var ignoredRedisCommandNames = map[string]struct{}{
"hello": {},
"client": {},
// Optional: we can enable if we want also want to exclude other setup/noise commands.
// "auth": {},
// "select": {},
// "ping": {},
}
func shouldIgnoreRedisCmd(cmd redis.Cmder) bool {
name := strings.ToLower(strings.TrimSpace(cmd.Name()))
_, ok := ignoredRedisCommandNames[name]
return ok
}
func (rh *redisHook) DialHook(next redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := next(ctx, network, addr)
return conn, err
}
}
func (rh *redisHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
startTime := time.Now()
err := next(ctx, cmd)
if shouldIgnoreRedisCmd(cmd) {
return err
}
rh.registry.IncRedisRequest(err != nil && !errors.Is(err, redis.Nil))
rh.registry.ObserveRedisRequestDuration(time.Since(startTime))
return err
}
}
func (redisHook) ProcessPipelineHook(_ redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return nil
}
// CollectMetrics add transport wrapper that pushes metrics into the specified metrics registry
// Lock should be shared between functions that can add/process a Redis hook.
func CollectMetrics(client *redis.Client, registry MetricsRegistry, lock *sync.RWMutex) {
if lock != nil {
lock.Lock()
defer lock.Unlock()
}
client.AddHook(&redisHook{registry: registry})
}