mirror of
https://github.com/argoproj/argo-cd.git
synced 2026-02-20 01:28:45 +01:00
724 lines
22 KiB
Go
724 lines
22 KiB
Go
package oci
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"math"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
|
|
securejoin "github.com/cyphar/filepath-securejoin"
|
|
imagev1 "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"oras.land/oras-go/v2/content/oci"
|
|
|
|
"github.com/argoproj/argo-cd/v3/util/versions"
|
|
|
|
"github.com/argoproj/pkg/sync"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/argoproj/argo-cd/v3/util/cache"
|
|
utilio "github.com/argoproj/argo-cd/v3/util/io"
|
|
"github.com/argoproj/argo-cd/v3/util/io/files"
|
|
"github.com/argoproj/argo-cd/v3/util/proxy"
|
|
|
|
"oras.land/oras-go/v2"
|
|
"oras.land/oras-go/v2/content/file"
|
|
"oras.land/oras-go/v2/registry/remote"
|
|
"oras.land/oras-go/v2/registry/remote/auth"
|
|
)
|
|
|
|
var (
|
|
globalLock = sync.NewKeyLock()
|
|
indexLock = sync.NewKeyLock()
|
|
)
|
|
|
|
const (
|
|
helmOCIConfigType = "application/vnd.cncf.helm.config.v1+json"
|
|
helmOCILayerType = "application/vnd.cncf.helm.chart.content.v1.tar+gzip"
|
|
)
|
|
|
|
var _ Client = &nativeOCIClient{}
|
|
|
|
type tagsCache interface {
|
|
SetOCITags(repo string, indexData []byte) error
|
|
GetOCITags(repo string, indexData *[]byte) error
|
|
}
|
|
|
|
// Client is a generic OCI client interface that provides methods for interacting with an OCI (Open Container Initiative) registry.
|
|
type Client interface {
|
|
// ResolveRevision resolves a tag, digest, or semantic version constraint to a concrete digest.
|
|
// If noCache is true, the resolution bypasses the local tags cache and queries the remote registry.
|
|
// If the revision is already a digest, it is returned as-is.
|
|
ResolveRevision(ctx context.Context, revision string, noCache bool) (string, error)
|
|
|
|
// DigestMetadata retrieves an OCI manifest for a given digest.
|
|
DigestMetadata(ctx context.Context, digest string) (*imagev1.Manifest, error)
|
|
|
|
// CleanCache is invoked on a hard-refresh or when the manifest cache has expired. This removes the OCI image from
|
|
// the cached path, which is looked up by the specified revision.
|
|
CleanCache(revision string) error
|
|
|
|
// Extract retrieves and unpacks the contents of an OCI image identified by the specified revision.
|
|
// If successful, the extracted contents are extracted to a randomized tempdir.
|
|
Extract(ctx context.Context, revision string) (string, utilio.Closer, error)
|
|
|
|
// TestRepo verifies the connectivity and accessibility of the repository.
|
|
TestRepo(ctx context.Context) (bool, error)
|
|
|
|
// GetTags retrieves the list of tags for the repository.
|
|
GetTags(ctx context.Context, noCache bool) ([]string, error)
|
|
}
|
|
|
|
type Creds struct {
|
|
Username string
|
|
Password string
|
|
CAPath string
|
|
CertData []byte
|
|
KeyData []byte
|
|
InsecureSkipVerify bool
|
|
InsecureHTTPOnly bool
|
|
}
|
|
|
|
type ClientOpts func(c *nativeOCIClient)
|
|
|
|
func WithIndexCache(indexCache tagsCache) ClientOpts {
|
|
return func(c *nativeOCIClient) {
|
|
c.tagsCache = indexCache
|
|
}
|
|
}
|
|
|
|
func WithImagePaths(repoCachePaths utilio.TempPaths) ClientOpts {
|
|
return func(c *nativeOCIClient) {
|
|
c.repoCachePaths = repoCachePaths
|
|
}
|
|
}
|
|
|
|
func WithManifestMaxExtractedSize(manifestMaxExtractedSize int64) ClientOpts {
|
|
return func(c *nativeOCIClient) {
|
|
c.manifestMaxExtractedSize = manifestMaxExtractedSize
|
|
}
|
|
}
|
|
|
|
func WithDisableManifestMaxExtractedSize(disableManifestMaxExtractedSize bool) ClientOpts {
|
|
return func(c *nativeOCIClient) {
|
|
c.disableManifestMaxExtractedSize = disableManifestMaxExtractedSize
|
|
}
|
|
}
|
|
|
|
func NewClient(repoURL string, creds Creds, proxy, noProxy string, layerMediaTypes []string, opts ...ClientOpts) (Client, error) {
|
|
return NewClientWithLock(repoURL, creds, globalLock, proxy, noProxy, layerMediaTypes, opts...)
|
|
}
|
|
|
|
func NewClientWithLock(repoURL string, creds Creds, repoLock sync.KeyLock, proxyURL, noProxy string, layerMediaTypes []string, opts ...ClientOpts) (Client, error) {
|
|
ociRepo := strings.TrimPrefix(repoURL, "oci://")
|
|
repo, err := remote.NewRepository(ociRepo)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize repository: %w", err)
|
|
}
|
|
|
|
repo.PlainHTTP = creds.InsecureHTTPOnly
|
|
|
|
var tlsConf *tls.Config
|
|
if !repo.PlainHTTP {
|
|
tlsConf, err = newTLSConfig(creds)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed setup tlsConfig: %w", err)
|
|
}
|
|
}
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: proxy.GetCallback(proxyURL, noProxy),
|
|
TLSClientConfig: tlsConf,
|
|
DisableKeepAlives: true,
|
|
},
|
|
/*
|
|
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
|
return errors.New("redirects are not allowed")
|
|
},
|
|
*/
|
|
}
|
|
repo.Client = &auth.Client{
|
|
Client: client,
|
|
Cache: nil,
|
|
Credential: auth.StaticCredential(repo.Reference.Registry, auth.Credential{
|
|
Username: creds.Username,
|
|
Password: creds.Password,
|
|
}),
|
|
}
|
|
|
|
parsed, err := url.Parse(repoURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse oci repo url: %w", err)
|
|
}
|
|
|
|
reg, err := remote.NewRegistry(parsed.Host)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to setup registry config: %w", err)
|
|
}
|
|
reg.PlainHTTP = repo.PlainHTTP
|
|
reg.Client = repo.Client
|
|
return newClientWithLock(ociRepo, repoLock, repo, func(ctx context.Context, last string) ([]string, error) {
|
|
var t []string
|
|
|
|
err := repo.Tags(ctx, last, func(tags []string) error {
|
|
t = append(t, tags...)
|
|
return nil
|
|
})
|
|
|
|
return t, err
|
|
}, reg.Ping, layerMediaTypes, opts...), nil
|
|
}
|
|
|
|
func newClientWithLock(repoURL string, repoLock sync.KeyLock, repo oras.ReadOnlyTarget, tagsFunc func(context.Context, string) ([]string, error), pingFunc func(ctx context.Context) error, layerMediaTypes []string, opts ...ClientOpts) Client {
|
|
c := &nativeOCIClient{
|
|
repoURL: repoURL,
|
|
repoLock: repoLock,
|
|
repo: repo,
|
|
tagsFunc: tagsFunc,
|
|
pingFunc: pingFunc,
|
|
allowedMediaTypes: layerMediaTypes,
|
|
}
|
|
for i := range opts {
|
|
opts[i](c)
|
|
}
|
|
return c
|
|
}
|
|
|
|
type EventHandlers struct {
|
|
OnExtract func(repo string) func()
|
|
OnResolveRevision func(repo string) func()
|
|
OnDigestMetadata func(repo string) func()
|
|
OnTestRepo func(repo string) func()
|
|
OnGetTags func(repo string) func()
|
|
OnExtractFail func(repo string) func(revision string)
|
|
OnResolveRevisionFail func(repo string) func(revision string)
|
|
OnDigestMetadataFail func(repo string) func(revision string)
|
|
OnTestRepoFail func(repo string) func()
|
|
OnGetTagsFail func(repo string) func()
|
|
}
|
|
|
|
// nativeOCIClient implements Client interface using oras-go
|
|
type nativeOCIClient struct {
|
|
EventHandlers
|
|
|
|
repoURL string
|
|
repo oras.ReadOnlyTarget
|
|
tagsFunc func(context.Context, string) ([]string, error)
|
|
repoLock sync.KeyLock
|
|
tagsCache tagsCache
|
|
repoCachePaths utilio.TempPaths
|
|
allowedMediaTypes []string
|
|
manifestMaxExtractedSize int64
|
|
disableManifestMaxExtractedSize bool
|
|
pingFunc func(ctx context.Context) error
|
|
}
|
|
|
|
// TestRepo verifies that the remote OCI repo can be connected to.
|
|
func (c *nativeOCIClient) TestRepo(ctx context.Context) (bool, error) {
|
|
inc := c.OnTestRepo(c.repoURL)
|
|
defer inc()
|
|
// Currently doesn't do anything in regard to measuring spans, but keep it consistent with OnTestRepo()
|
|
fail := c.OnTestRepoFail(c.repoURL)
|
|
err := c.pingFunc(ctx)
|
|
if err != nil {
|
|
fail()
|
|
}
|
|
return err == nil, err
|
|
}
|
|
|
|
func (c *nativeOCIClient) Extract(ctx context.Context, digest string) (string, utilio.Closer, error) {
|
|
inc := c.OnExtract(c.repoURL)
|
|
defer inc()
|
|
// Currently doesn't do anything in regard to measuring spans, but keep it consistent with OnExtract()
|
|
fail := c.OnExtractFail(c.repoURL)
|
|
extract, closer, err := c.extract(ctx, digest)
|
|
if err != nil {
|
|
fail(digest)
|
|
}
|
|
return extract, closer, err
|
|
}
|
|
|
|
func (c *nativeOCIClient) extract(ctx context.Context, digest string) (string, utilio.Closer, error) {
|
|
cachedPath, err := c.getCachedPath(digest)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("error getting oci path for digest %s: %w", digest, err)
|
|
}
|
|
|
|
c.repoLock.Lock(cachedPath)
|
|
defer c.repoLock.Unlock(cachedPath)
|
|
|
|
exists, err := fileExists(cachedPath)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
var isHelmChart bool
|
|
|
|
if !exists {
|
|
ociManifest, err := getOCIManifest(ctx, digest, c.repo)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
// Add a guard to defend against a ridiculous amount of layers. No idea what a good amount is, but normally we
|
|
// shouldn't expect more than 2-3 in most real world use cases.
|
|
if len(ociManifest.Layers) > 10 {
|
|
return "", nil, fmt.Errorf("expected no more than 10 oci layers, got %d", len(ociManifest.Layers))
|
|
}
|
|
|
|
isHelmChart = ociManifest.Config.MediaType == helmOCIConfigType
|
|
|
|
contentLayers := 0
|
|
|
|
// Strictly speaking we only allow for a single content layer. There are images which contains extra layers, such
|
|
// as provenance/attestation layers. Pending a better story to do this natively, we will skip such layers for now.
|
|
for _, layer := range ociManifest.Layers {
|
|
// For Helm charts, only look for the specific Helm chart content layer
|
|
if isHelmChart {
|
|
if isHelmOCI(layer.MediaType) {
|
|
if !slices.Contains(c.allowedMediaTypes, layer.MediaType) {
|
|
return "", nil, fmt.Errorf("oci layer media type %s is not in the list of allowed media types", layer.MediaType)
|
|
}
|
|
contentLayers++
|
|
}
|
|
} else if isContentLayer(layer.MediaType) {
|
|
if !slices.Contains(c.allowedMediaTypes, layer.MediaType) {
|
|
return "", nil, fmt.Errorf("oci layer media type %s is not in the list of allowed media types", layer.MediaType)
|
|
}
|
|
contentLayers++
|
|
}
|
|
}
|
|
|
|
if contentLayers != 1 {
|
|
return "", nil, fmt.Errorf("expected only a single oci content layer, got %d", contentLayers)
|
|
}
|
|
|
|
err = saveCompressedImageToPath(ctx, digest, c.repo, cachedPath)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("could not save oci digest %s: %w", digest, err)
|
|
}
|
|
}
|
|
|
|
maxSize := c.manifestMaxExtractedSize
|
|
if c.disableManifestMaxExtractedSize {
|
|
maxSize = math.MaxInt64
|
|
}
|
|
|
|
if !isHelmChart {
|
|
// Get the manifest to determine if it's a Helm chart for extraction
|
|
ociManifest, err := getOCIManifestFromCache(ctx, cachedPath, digest)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("error getting oci manifest for extraction: %w", err)
|
|
}
|
|
isHelmChart = ociManifest.Config.MediaType == helmOCIConfigType
|
|
}
|
|
manifestsDir, err := extractContentToManifestsDir(ctx, cachedPath, digest, maxSize, isHelmChart)
|
|
if err != nil {
|
|
return manifestsDir, nil, fmt.Errorf("cannot extract contents of oci image with revision %s: %w", digest, err)
|
|
}
|
|
|
|
return manifestsDir, utilio.NewCloser(func() error {
|
|
return os.RemoveAll(manifestsDir)
|
|
}), nil
|
|
}
|
|
|
|
func (c *nativeOCIClient) getCachedPath(version string) (string, error) {
|
|
keyData, err := json.Marshal(map[string]string{"url": c.repoURL, "version": version})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return c.repoCachePaths.GetPath(string(keyData))
|
|
}
|
|
|
|
func (c *nativeOCIClient) CleanCache(revision string) error {
|
|
cachePath, err := c.getCachedPath(revision)
|
|
if err != nil {
|
|
return fmt.Errorf("error cleaning oci path for revision %s: %w", revision, err)
|
|
}
|
|
return os.RemoveAll(cachePath)
|
|
}
|
|
|
|
// DigestMetadata extracts the OCI manifest for a given revision and returns it to the caller.
|
|
func (c *nativeOCIClient) DigestMetadata(ctx context.Context, digest string) (*imagev1.Manifest, error) {
|
|
inc := c.OnDigestMetadata(c.repoURL)
|
|
defer inc()
|
|
// Currently doesn't do anything in regard to measuring spans, but keep it consistent with OnDigestMetadata()
|
|
fail := c.OnDigestMetadataFail(c.repoURL)
|
|
metadata, err := c.digestMetadata(ctx, digest)
|
|
if err != nil {
|
|
fail(digest)
|
|
}
|
|
return metadata, err
|
|
}
|
|
|
|
func (c *nativeOCIClient) digestMetadata(ctx context.Context, digest string) (*imagev1.Manifest, error) {
|
|
path, err := c.getCachedPath(digest)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error fetching oci metadata path for digest %s: %w", digest, err)
|
|
}
|
|
return getOCIManifestFromCache(ctx, path, digest)
|
|
}
|
|
|
|
func (c *nativeOCIClient) ResolveRevision(ctx context.Context, revision string, noCache bool) (string, error) {
|
|
inc := c.OnResolveRevision(c.repoURL)
|
|
defer inc()
|
|
// Currently doesn't do anything in regard to measuring spans, but keep it consistent with OnResolveRevision()
|
|
fail := c.OnResolveRevisionFail(c.repoURL)
|
|
resolveRevision, err := c.resolveRevision(ctx, revision, noCache)
|
|
if err != nil {
|
|
fail(revision)
|
|
}
|
|
return resolveRevision, err
|
|
}
|
|
|
|
func (c *nativeOCIClient) resolveRevision(ctx context.Context, revision string, noCache bool) (string, error) {
|
|
digest, err := c.resolveDigest(ctx, revision) // Lookup explicit revision
|
|
if err != nil {
|
|
// If the revision is not a semver constraint, just return the error
|
|
if !versions.IsConstraint(revision) {
|
|
return digest, err
|
|
}
|
|
|
|
tags, err := c.GetTags(ctx, noCache)
|
|
if err != nil {
|
|
return "", fmt.Errorf("error fetching tags: %w", err)
|
|
}
|
|
|
|
// Look to see if revision is a semver constraint
|
|
version, err := versions.MaxVersion(revision, tags)
|
|
if err != nil {
|
|
return "", fmt.Errorf("no version for constraints: %w", err)
|
|
}
|
|
// Look up the digest for the resolved version
|
|
return c.resolveDigest(ctx, version)
|
|
}
|
|
|
|
return digest, nil
|
|
}
|
|
|
|
func (c *nativeOCIClient) GetTags(ctx context.Context, noCache bool) ([]string, error) {
|
|
inc := c.OnGetTags(c.repoURL)
|
|
defer inc()
|
|
// Currently doesn't do anything in regard to measuring spans, but keep it consistent with OnGetTags()
|
|
fail := c.OnGetTagsFail(c.repoURL)
|
|
tags, err := c.getTags(ctx, noCache)
|
|
if err != nil {
|
|
fail()
|
|
}
|
|
return tags, err
|
|
}
|
|
|
|
func (c *nativeOCIClient) getTags(ctx context.Context, noCache bool) ([]string, error) {
|
|
indexLock.Lock(c.repoURL)
|
|
defer indexLock.Unlock(c.repoURL)
|
|
|
|
var data []byte
|
|
if !noCache && c.tagsCache != nil {
|
|
if err := c.tagsCache.GetOCITags(c.repoURL, &data); err != nil && !errors.Is(err, cache.ErrCacheMiss) {
|
|
log.Warnf("Failed to load index cache for repo: %s: %s", c.repoLock, err)
|
|
}
|
|
}
|
|
|
|
var tags []string
|
|
if len(data) == 0 {
|
|
start := time.Now()
|
|
result, err := c.tagsFunc(ctx, "")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get tags: %w", err)
|
|
}
|
|
|
|
for _, tag := range result {
|
|
// By convention: Change underscore (_) back to plus (+) to get valid SemVer
|
|
convertedTag := strings.ReplaceAll(tag, "_", "+")
|
|
tags = append(tags, convertedTag)
|
|
}
|
|
|
|
log.WithFields(
|
|
log.Fields{"seconds": time.Since(start).Seconds(), "repo": c.repoURL},
|
|
).Info("took to get tags")
|
|
|
|
if c.tagsCache != nil {
|
|
if err := c.tagsCache.SetOCITags(c.repoURL, data); err != nil {
|
|
log.Warnf("Failed to store tags list cache for repo: %s: %s", c.repoURL, err)
|
|
}
|
|
}
|
|
} else if err := json.Unmarshal(data, &tags); err != nil {
|
|
return nil, fmt.Errorf("failed to decode tags: %w", err)
|
|
}
|
|
|
|
return tags, nil
|
|
}
|
|
|
|
// resolveDigest resolves a digest from a tag.
|
|
func (c *nativeOCIClient) resolveDigest(ctx context.Context, revision string) (string, error) {
|
|
descriptor, err := c.repo.Resolve(ctx, revision)
|
|
if err != nil {
|
|
return "", fmt.Errorf("cannot get digest for revision %s: %w", revision, err)
|
|
}
|
|
|
|
return descriptor.Digest.String(), nil
|
|
}
|
|
|
|
func newTLSConfig(creds Creds) (*tls.Config, error) {
|
|
tlsConfig := &tls.Config{InsecureSkipVerify: creds.InsecureSkipVerify}
|
|
|
|
if creds.CAPath != "" {
|
|
caData, err := os.ReadFile(creds.CAPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
caCertPool := x509.NewCertPool()
|
|
caCertPool.AppendCertsFromPEM(caData)
|
|
tlsConfig.RootCAs = caCertPool
|
|
}
|
|
|
|
// If a client cert & key is provided then configure TLS config accordingly.
|
|
if len(creds.CertData) > 0 && len(creds.KeyData) > 0 {
|
|
cert, err := tls.X509KeyPair(creds.CertData, creds.KeyData)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tlsConfig.Certificates = []tls.Certificate{cert}
|
|
}
|
|
//nolint:staticcheck
|
|
tlsConfig.BuildNameToCertificate()
|
|
|
|
return tlsConfig, nil
|
|
}
|
|
|
|
func fileExists(filePath string) (bool, error) {
|
|
if _, err := os.Stat(filePath); err != nil {
|
|
if os.IsNotExist(err) {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// TODO: A content layer could in theory be something that is not a compressed file, e.g a single yaml file or like.
|
|
// While IMO the utility in the context of Argo CD is limited, I'd at least like to make it known here and add an extensibility
|
|
// point for it in case we decide to loosen the current requirements.
|
|
func isContentLayer(mediaType string) bool {
|
|
return isCompressedLayer(mediaType)
|
|
}
|
|
|
|
func isCompressedLayer(mediaType string) bool {
|
|
// TODO: Is zstd something which is used in the wild? For now let's stick to these suffixes
|
|
return strings.HasSuffix(mediaType, "tar+gzip") || strings.HasSuffix(mediaType, "tar.gzip") || strings.HasSuffix(mediaType, "tar")
|
|
}
|
|
|
|
func createTarFile(from, to string) error {
|
|
f, err := os.Create(to)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err = files.Tar(from, nil, nil, f); err != nil {
|
|
_ = os.RemoveAll(to)
|
|
}
|
|
return f.Close()
|
|
}
|
|
|
|
// saveCompressedImageToPath downloads a remote OCI image on a given digest and stores it as a TAR file in cachedPath.
|
|
func saveCompressedImageToPath(ctx context.Context, digest string, repo oras.ReadOnlyTarget, cachedPath string) error {
|
|
tempDir, err := files.CreateTempDir(os.TempDir())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer os.RemoveAll(tempDir)
|
|
|
|
store, err := oci.New(tempDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Copy remote repo at the given digest to the scratch dir.
|
|
if _, err = oras.Copy(ctx, repo, digest, store, digest, oras.DefaultCopyOptions); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove redundant ingest folder; this is an artifact from the oras.Copy call above
|
|
err = os.RemoveAll(path.Join(tempDir, "ingest"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Save contents to tar file
|
|
return createTarFile(tempDir, cachedPath)
|
|
}
|
|
|
|
// extractContentToManifestsDir looks up a locally stored OCI image, and extracts the embedded compressed layer which contains
|
|
// K8s manifests to a temp dir.
|
|
func extractContentToManifestsDir(ctx context.Context, cachedPath, digest string, maxSize int64, isHelmChart bool) (string, error) {
|
|
manifestsDir, err := files.CreateTempDir(os.TempDir())
|
|
if err != nil {
|
|
return manifestsDir, err
|
|
}
|
|
|
|
ociReadOnlyStore, err := oci.NewFromTar(ctx, cachedPath)
|
|
if err != nil {
|
|
return manifestsDir, err
|
|
}
|
|
|
|
tempDir, err := files.CreateTempDir(os.TempDir())
|
|
if err != nil {
|
|
return manifestsDir, err
|
|
}
|
|
defer os.RemoveAll(tempDir)
|
|
|
|
fs, err := newCompressedLayerFileStore(manifestsDir, tempDir, maxSize, isHelmChart)
|
|
if err != nil {
|
|
return manifestsDir, err
|
|
}
|
|
defer fs.Close()
|
|
|
|
// copies the whole artifact to the tempdir, here compressedLayerFileStore.Push will be called
|
|
_, err = oras.Copy(ctx, ociReadOnlyStore, digest, fs, digest, oras.DefaultCopyOptions)
|
|
return manifestsDir, err
|
|
}
|
|
|
|
type compressedLayerExtracterStore struct {
|
|
*file.Store
|
|
dest string
|
|
maxSize int64
|
|
isHelmChart bool
|
|
}
|
|
|
|
func newCompressedLayerFileStore(dest, tempDir string, maxSize int64, isHelmChart bool) (*compressedLayerExtracterStore, error) {
|
|
f, err := file.New(tempDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &compressedLayerExtracterStore{f, dest, maxSize, isHelmChart}, nil
|
|
}
|
|
|
|
func isHelmOCI(mediaType string) bool {
|
|
return mediaType == helmOCILayerType
|
|
}
|
|
|
|
// Push looks in all the layers of an OCI image. Once it finds a layer that is compressed, it extracts the layer to a tempDir
|
|
// and then renames the temp dir to the directory where the repo-server expects to find k8s manifests.
|
|
func (s *compressedLayerExtracterStore) Push(ctx context.Context, desc imagev1.Descriptor, content io.Reader) error {
|
|
// For Helm charts, only extract the Helm chart content layer, skip all other layers
|
|
if s.isHelmChart && !isHelmOCI(desc.MediaType) {
|
|
return s.Store.Push(ctx, desc, content)
|
|
}
|
|
|
|
if isContentLayer(desc.MediaType) {
|
|
srcDir, err := files.CreateTempDir(os.TempDir())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer os.RemoveAll(srcDir)
|
|
|
|
if strings.HasSuffix(desc.MediaType, "tar+gzip") || strings.HasSuffix(desc.MediaType, "tar.gzip") {
|
|
err = files.Untgz(srcDir, content, s.maxSize, false)
|
|
} else {
|
|
err = files.Untar(srcDir, content, s.maxSize, false)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("could not decompress layer: %w", err)
|
|
}
|
|
|
|
if isHelmOCI(desc.MediaType) {
|
|
infos, err := os.ReadDir(srcDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// For a Helm chart we expect a single directory
|
|
if len(infos) != 1 || !infos[0].IsDir() {
|
|
return fmt.Errorf("expected 1 directory, found %v", len(infos))
|
|
}
|
|
|
|
// For Helm charts, we will move the contents of the unpacked directory to the root of its final destination
|
|
srcDir, err = securejoin.SecureJoin(srcDir, infos[0].Name())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return filepath.WalkDir(srcDir, func(path string, d fs.DirEntry, _ error) error {
|
|
if path != srcDir {
|
|
// Calculate the relative path from srcDir
|
|
relPath, err := filepath.Rel(srcDir, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dstPath, err := securejoin.SecureJoin(s.dest, relPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Move the file by renaming it
|
|
if d.IsDir() {
|
|
info, err := d.Info()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return os.MkdirAll(dstPath, info.Mode())
|
|
}
|
|
|
|
return os.Rename(path, dstPath)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return s.Store.Push(ctx, desc, content)
|
|
}
|
|
|
|
func getOCIManifest(ctx context.Context, digest string, repo oras.ReadOnlyTarget) (*imagev1.Manifest, error) {
|
|
desc, err := repo.Resolve(ctx, digest)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error resolving oci repo from digest, %w", err)
|
|
}
|
|
|
|
rc, err := repo.Fetch(ctx, desc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error fetching oci manifest for digest %s: %w", digest, err)
|
|
}
|
|
|
|
manifest := imagev1.Manifest{}
|
|
decoder := json.NewDecoder(rc)
|
|
if err = decoder.Decode(&manifest); err != nil {
|
|
return nil, fmt.Errorf("error decoding oci manifest for digest %s: %w", digest, err)
|
|
}
|
|
|
|
return &manifest, nil
|
|
}
|
|
|
|
// getOCIManifestFromCache retrieves an OCI manifest from a cached tar file
|
|
func getOCIManifestFromCache(ctx context.Context, cachedPath, digest string) (*imagev1.Manifest, error) {
|
|
repo, err := oci.NewFromTar(ctx, cachedPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating oci store from cache: %w", err)
|
|
}
|
|
return getOCIManifest(ctx, digest, repo)
|
|
}
|
|
|
|
// WithEventHandlers sets the git client event handlers
|
|
func WithEventHandlers(handlers EventHandlers) ClientOpts {
|
|
return func(c *nativeOCIClient) {
|
|
c.EventHandlers = handlers
|
|
}
|
|
}
|