mirror of
https://github.com/moby/moby.git
synced 2026-01-11 10:41:43 +00:00
Avoids extra allocations. Added in Go 1.24. Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
459 lines
14 KiB
Go
459 lines
14 KiB
Go
package containerd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd/v2/core/content"
|
|
c8dimages "github.com/containerd/containerd/v2/core/images"
|
|
"github.com/containerd/containerd/v2/core/remotes"
|
|
"github.com/containerd/containerd/v2/core/remotes/docker"
|
|
containerdlabels "github.com/containerd/containerd/v2/pkg/labels"
|
|
cerrdefs "github.com/containerd/errdefs"
|
|
"github.com/containerd/log"
|
|
"github.com/containerd/platforms"
|
|
"github.com/distribution/reference"
|
|
"github.com/moby/moby/api/types/auxprogress"
|
|
"github.com/moby/moby/api/types/events"
|
|
"github.com/moby/moby/api/types/registry"
|
|
"github.com/moby/moby/v2/daemon/internal/metrics"
|
|
"github.com/moby/moby/v2/daemon/internal/progress"
|
|
"github.com/moby/moby/v2/daemon/internal/streamformatter"
|
|
"github.com/moby/moby/v2/daemon/server/imagebackend"
|
|
"github.com/moby/moby/v2/errdefs"
|
|
"github.com/opencontainers/go-digest"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
// PushImage initiates a push operation of the image pointed to by sourceRef.
|
|
// If reference is untagged, all tags from the reference repository are pushed.
|
|
// Image manifest (or index) is pushed as is, which will probably fail if you
|
|
// don't have all content referenced by the index.
|
|
// Cross-repo mounts will be attempted for non-existing blobs.
|
|
//
|
|
// It will also add distribution source labels to the pushed content
|
|
// pointing to the new target repository. This will allow subsequent pushes
|
|
// to perform cross-repo mounts of the shared content when pushing to a different
|
|
// repository on the same registry.
|
|
func (i *ImageService) PushImage(ctx context.Context, sourceRef reference.Named, options imagebackend.PushOptions) (retErr error) {
|
|
if len(options.Platforms) > 1 {
|
|
// TODO(thaJeztah): add support for pushing multiple platforms
|
|
return cerrdefs.ErrInvalidArgument.WithMessage("multiple platforms is not supported")
|
|
}
|
|
start := time.Now()
|
|
defer func() {
|
|
if retErr == nil {
|
|
metrics.ImageActions.WithValues("push").UpdateSince(start)
|
|
}
|
|
}()
|
|
var platform *ocispec.Platform
|
|
if len(options.Platforms) > 0 {
|
|
p := options.Platforms[0]
|
|
platform = &p
|
|
}
|
|
out := streamformatter.NewJSONProgressOutput(options.OutStream, false)
|
|
progress.Messagef(out, "", "The push refers to repository [%s]", sourceRef.Name())
|
|
|
|
if _, tagged := sourceRef.(reference.Tagged); !tagged {
|
|
if _, digested := sourceRef.(reference.Digested); !digested {
|
|
// Image is not tagged nor digested, that means all tags push was requested.
|
|
|
|
// Find all images with the same repository.
|
|
imgs, err := i.getAllImagesWithRepository(ctx, sourceRef)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(imgs) == 0 {
|
|
return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(sourceRef))
|
|
}
|
|
|
|
for _, img := range imgs {
|
|
named, err := reference.ParseNamed(img.Name)
|
|
if err != nil {
|
|
// This shouldn't happen, but log a warning just in case.
|
|
log.G(ctx).WithFields(log.Fields{
|
|
"image": img.Name,
|
|
"sourceRef": sourceRef,
|
|
}).Warn("refusing to push an invalid tag")
|
|
continue
|
|
}
|
|
|
|
if err := i.pushRef(ctx, named, platform, options.MetaHeaders, options.AuthConfig, out); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return i.pushRef(ctx, sourceRef, platform, options.MetaHeaders, options.AuthConfig, out)
|
|
}
|
|
|
|
func (i *ImageService) pushRef(ctx context.Context, targetRef reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, out progress.Output) (retErr error) {
|
|
leasedCtx, release, err := i.client.WithLease(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err := release(context.WithoutCancel(leasedCtx)); err != nil {
|
|
log.G(ctx).WithField("image", targetRef).WithError(err).Warn("failed to release lease created for push")
|
|
}
|
|
}()
|
|
|
|
img, err := i.images.Get(ctx, targetRef.String())
|
|
if err != nil {
|
|
if cerrdefs.IsNotFound(err) {
|
|
return errdefs.NotFound(fmt.Errorf("tag does not exist: %s", reference.FamiliarString(targetRef)))
|
|
}
|
|
return errdefs.System(err)
|
|
}
|
|
|
|
target := img.Target
|
|
if platform != nil {
|
|
target, err = i.getPushDescriptor(ctx, img, platform)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
store := i.content
|
|
resolver, tracker := i.newResolverFromAuthConfig(ctx, authConfig, targetRef, metaHeaders)
|
|
pp := pushProgress{Tracker: tracker}
|
|
jobsQueue := newJobs()
|
|
finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
|
|
&pp,
|
|
&pullProgress{showExists: false, store: store},
|
|
}))
|
|
defer func() {
|
|
finishProgress()
|
|
if retErr == nil {
|
|
if tagged, ok := targetRef.(reference.Tagged); ok {
|
|
progress.Messagef(out, "", "%s: digest: %s size: %d", tagged.Tag(), target.Digest, target.Size)
|
|
}
|
|
}
|
|
}()
|
|
|
|
var limiter *semaphore.Weighted // TODO: Respect max concurrent downloads/uploads
|
|
|
|
mountableBlobs, err := findMissingMountable(ctx, store, jobsQueue, target, targetRef, limiter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create a store which fakes the local existence of possibly mountable blobs.
|
|
// Otherwise they can't be pushed at all.
|
|
realStore := store
|
|
wrapped := wrapWithFakeMountableBlobs(store, mountableBlobs)
|
|
store = wrapped
|
|
addLayerJobs := c8dimages.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
if showBlobProgress(desc) {
|
|
jobsQueue.Add(desc)
|
|
}
|
|
|
|
return nil, nil
|
|
})
|
|
|
|
handlerWrapper := func(h c8dimages.Handler) c8dimages.Handler {
|
|
return c8dimages.Handlers(addLayerJobs, h)
|
|
}
|
|
|
|
push := func(ctx context.Context, desc ocispec.Descriptor) error {
|
|
ref := targetRef
|
|
|
|
if _, digested := ref.(reference.Digested); !digested {
|
|
// Annotate ref with digest to push only push tag for single digest
|
|
ref, err = reference.WithDigest(ref, target.Digest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
pusher, err := resolver.Pusher(ctx, ref.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return remotes.PushContent(ctx, pusher, desc, store, limiter, platforms.All, handlerWrapper)
|
|
}
|
|
|
|
err = push(ctx, target)
|
|
if err != nil {
|
|
// If push failed because of a missing content, no specific platform was requested
|
|
// and the target is an index, select a platform-specific manifest to push instead.
|
|
if cerrdefs.IsNotFound(err) && c8dimages.IsIndexType(target.MediaType) && platform == nil {
|
|
var newTarget ocispec.Descriptor
|
|
newTarget, err = i.getPushDescriptor(ctx, img, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Retry only if the new push candidate is different from the previous one.
|
|
if newTarget.Digest != target.Digest {
|
|
orgTarget := target
|
|
target = newTarget
|
|
pp.TurnNotStartedIntoUnavailable()
|
|
err = push(ctx, target)
|
|
|
|
if err == nil {
|
|
progress.Aux(out, auxprogress.ManifestPushedInsteadOfIndex{
|
|
ManifestPushedInsteadOfIndex: true,
|
|
OriginalIndex: orgTarget,
|
|
SelectedManifest: newTarget,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
if !cerrdefs.IsNotFound(err) {
|
|
return translateRegistryError(ctx, err)
|
|
}
|
|
progress.Aux(out, auxprogress.ContentMissing{
|
|
ContentMissing: true,
|
|
Desc: target,
|
|
})
|
|
return errdefs.NotFound(err)
|
|
}
|
|
}
|
|
|
|
appendDistributionSourceLabel(ctx, realStore, targetRef, target)
|
|
|
|
i.LogImageEvent(ctx, reference.FamiliarString(targetRef), reference.FamiliarName(targetRef), events.ActionPush)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (i *ImageService) getPushDescriptor(ctx context.Context, img c8dimages.Image, platform *ocispec.Platform) (ocispec.Descriptor, error) {
|
|
pm := i.matchRequestedOrDefault(platforms.OnlyStrict, platform)
|
|
|
|
anyMissing := false
|
|
|
|
var bestMatchPlatform ocispec.Platform
|
|
var bestMatch *ImageManifest
|
|
var presentMatchingManifests []*ImageManifest
|
|
err := i.walkReachableImageManifests(ctx, img, func(im *ImageManifest) error {
|
|
available, err := im.CheckContentAvailable(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to determine availability of image manifest %s: %w", im.Target().Digest, err)
|
|
}
|
|
|
|
if !available {
|
|
anyMissing = true
|
|
return nil
|
|
}
|
|
|
|
if im.IsAttestation() {
|
|
return nil
|
|
}
|
|
|
|
imgPlatform, err := im.ImagePlatform(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to determine platform of image %s: %w", img.Name, err)
|
|
}
|
|
|
|
if !pm.Match(imgPlatform) {
|
|
return nil
|
|
}
|
|
|
|
presentMatchingManifests = append(presentMatchingManifests, im)
|
|
if bestMatch == nil || pm.Less(imgPlatform, bestMatchPlatform) {
|
|
bestMatchPlatform = imgPlatform
|
|
bestMatch = im
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return ocispec.Descriptor{}, errdefs.System(err)
|
|
}
|
|
|
|
switch len(presentMatchingManifests) {
|
|
case 0:
|
|
err := &errPlatformNotFound{imageRef: imageFamiliarName(img)}
|
|
if pm.Requested != nil {
|
|
err.wanted = *pm.Requested
|
|
}
|
|
return ocispec.Descriptor{}, err
|
|
case 1:
|
|
// Only one manifest is available AND matching the requested platform.
|
|
|
|
if platform != nil {
|
|
// Explicit platform was requested
|
|
return presentMatchingManifests[0].Target(), nil
|
|
}
|
|
|
|
// No specific platform was requested, but only one manifest is available.
|
|
if anyMissing {
|
|
return presentMatchingManifests[0].Target(), nil
|
|
}
|
|
|
|
// Index has only one manifest anyway, select the full index.
|
|
return img.Target, nil
|
|
default:
|
|
if platform == nil {
|
|
if !anyMissing {
|
|
// No specific platform requested, and all manifests are available, select the full index.
|
|
return img.Target, nil
|
|
}
|
|
|
|
// No specific platform requested and not all manifests are available.
|
|
// Select the manifest that matches the host platform the best.
|
|
if bestMatch != nil && i.hostPlatformMatcher().Match(bestMatchPlatform) {
|
|
return bestMatch.Target(), nil
|
|
}
|
|
|
|
return ocispec.Descriptor{}, errdefs.Conflict(errors.Errorf("multiple matching manifests found but no specific platform requested"))
|
|
}
|
|
|
|
return ocispec.Descriptor{}, errdefs.Conflict(errors.Errorf("multiple manifests found for platform %s", platforms.FormatAll(*platform)))
|
|
}
|
|
}
|
|
|
|
func appendDistributionSourceLabel(ctx context.Context, realStore content.Store, targetRef reference.Named, target ocispec.Descriptor) {
|
|
appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
|
|
if err != nil {
|
|
// This shouldn't happen at this point because the reference would have to be invalid
|
|
// and if it was, then it would error out earlier.
|
|
log.G(ctx).WithError(err).Warn("failed to create an handler that appends distribution source label to pushed content")
|
|
return
|
|
}
|
|
|
|
handler := presentChildrenHandler(realStore, appendSource)
|
|
if err := c8dimages.Dispatch(ctx, handler, nil, target); err != nil {
|
|
// Shouldn't happen, but even if it would fail, then make it only a warning
|
|
// because it doesn't affect the pushed data.
|
|
log.G(ctx).WithError(err).Warn("failed to append distribution source labels to pushed content")
|
|
}
|
|
}
|
|
|
|
// findMissingMountable will walk the target descriptor recursively and return
|
|
// missing contents with their distribution source which could potentially
|
|
// be cross-repo mounted.
|
|
func findMissingMountable(ctx context.Context, store content.Store, queue *jobs,
|
|
target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted,
|
|
) (map[digest.Digest]distributionSource, error) {
|
|
mountableBlobs := map[digest.Digest]distributionSource{}
|
|
var mutex sync.Mutex
|
|
|
|
sources, err := getDigestSources(ctx, store, target.Digest)
|
|
if err != nil {
|
|
if !cerrdefs.IsNotFound(err) {
|
|
return nil, err
|
|
}
|
|
log.G(ctx).WithField("target", target).Debug("distribution source label not found")
|
|
return mountableBlobs, nil
|
|
}
|
|
|
|
handler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
_, err := store.Info(ctx, desc.Digest)
|
|
if err != nil {
|
|
if !cerrdefs.IsNotFound(err) {
|
|
return nil, errdefs.System(errors.Wrapf(err, "failed to get metadata of content %s", desc.Digest.String()))
|
|
}
|
|
|
|
for _, source := range sources {
|
|
if canBeMounted(desc.MediaType, targetRef, source) {
|
|
mutex.Lock()
|
|
mountableBlobs[desc.Digest] = source
|
|
mutex.Unlock()
|
|
queue.Add(desc)
|
|
break
|
|
}
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
return c8dimages.Children(ctx, store, desc)
|
|
}
|
|
|
|
err = c8dimages.Dispatch(ctx, c8dimages.HandlerFunc(handler), limiter, target)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return mountableBlobs, nil
|
|
}
|
|
|
|
func getDigestSources(ctx context.Context, store content.Manager, digest digest.Digest) ([]distributionSource, error) {
|
|
info, err := store.Info(ctx, digest)
|
|
if err != nil {
|
|
if cerrdefs.IsNotFound(err) {
|
|
return nil, errdefs.NotFound(err)
|
|
}
|
|
return nil, errdefs.System(err)
|
|
}
|
|
|
|
sources := extractDistributionSources(info.Labels)
|
|
if sources == nil {
|
|
return nil, errdefs.NotFound(fmt.Errorf("label %q is not attached to %s", containerdlabels.LabelDistributionSource, digest.String()))
|
|
}
|
|
|
|
return sources, nil
|
|
}
|
|
|
|
func extractDistributionSources(labels map[string]string) []distributionSource {
|
|
var sources []distributionSource
|
|
|
|
// Check if this blob has a distributionSource label
|
|
// if yes, read it as source
|
|
for k, v := range labels {
|
|
if reg := strings.TrimPrefix(k, containerdlabels.LabelDistributionSource); reg != k {
|
|
for repo := range strings.SplitSeq(v, ",") {
|
|
ref, err := reference.ParseNamed(reg + "/" + repo)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
sources = append(sources, distributionSource{
|
|
registryRef: ref,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return sources
|
|
}
|
|
|
|
type distributionSource struct {
|
|
registryRef reference.Named
|
|
}
|
|
|
|
// ToAnnotation returns key and value
|
|
func (source distributionSource) ToAnnotation() (string, string) {
|
|
domain := reference.Domain(source.registryRef)
|
|
v := reference.Path(source.registryRef)
|
|
return containerdlabels.LabelDistributionSource + domain, v
|
|
}
|
|
|
|
func (source distributionSource) GetReference(dgst digest.Digest) (reference.Named, error) {
|
|
return reference.WithDigest(source.registryRef, dgst)
|
|
}
|
|
|
|
// canBeMounted returns if the content with given media type can be cross-repo
|
|
// mounted when pushing it to a remote reference ref.
|
|
func canBeMounted(mediaType string, targetRef reference.Named, source distributionSource) bool {
|
|
if c8dimages.IsManifestType(mediaType) {
|
|
return false
|
|
}
|
|
if c8dimages.IsIndexType(mediaType) {
|
|
return false
|
|
}
|
|
|
|
reg := reference.Domain(targetRef)
|
|
// Remove :port suffix from domain
|
|
// containerd distribution source label doesn't store port
|
|
if portIdx := strings.LastIndex(reg, ":"); portIdx != -1 {
|
|
reg = reg[:portIdx]
|
|
}
|
|
|
|
// If the source registry is the same as the one we are pushing to
|
|
// then the cross-repo mount will work.
|
|
return reg == reference.Domain(source.registryRef)
|
|
}
|