Files
Tonis Tiigi 3874ca5984 vendor: update buildkit to v0.26.0-rc2
Currently requires replace rule for swarmkit etcd.

BuildKit itself doesn't use etcd but version gets bumped
via unused dependency.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-11-11 23:15:07 +01:00

666 lines
19 KiB
Go

package worker
import (
"context"
"fmt"
"io"
nethttp "net/http"
"time"
"github.com/containerd/containerd/v2/core/content"
c8dimages "github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/pkg/gc"
"github.com/containerd/containerd/v2/pkg/rootfs"
cerrdefs "github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/platforms"
"github.com/moby/buildkit/cache"
cacheconfig "github.com/moby/buildkit/cache/config"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb/sourceresolver"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/exporter"
localexporter "github.com/moby/buildkit/exporter/local"
tarexporter "github.com/moby/buildkit/exporter/tar"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver/cdidevices"
"github.com/moby/buildkit/solver/llbsolver/mounts"
"github.com/moby/buildkit/solver/llbsolver/ops"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/source/containerimage"
"github.com/moby/buildkit/source/git"
"github.com/moby/buildkit/source/http"
"github.com/moby/buildkit/source/local"
"github.com/moby/buildkit/util/archutil"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/version"
imageadapter "github.com/moby/moby/v2/daemon/internal/builder-next/adapters/containerimage"
mobyexporter "github.com/moby/moby/v2/daemon/internal/builder-next/exporter"
"github.com/moby/moby/v2/daemon/internal/builder-next/worker/mod"
distmetadata "github.com/moby/moby/v2/daemon/internal/distribution/metadata"
"github.com/moby/moby/v2/daemon/internal/distribution/xfer"
"github.com/moby/moby/v2/daemon/internal/layer"
pkgprogress "github.com/moby/moby/v2/daemon/internal/progress"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)
func init() {
if v := mod.Version("github.com/moby/buildkit"); v != "" {
version.Version = v
}
}
const labelCreatedAt = "buildkit/createdat"
// LayerAccess provides access to a moby layer from a snapshot
type LayerAccess interface {
GetDiffIDs(ctx context.Context, key string) ([]layer.DiffID, error)
EnsureLayer(ctx context.Context, key string) ([]layer.DiffID, error)
}
// Opt defines a structure for creating a worker.
type Opt struct {
ID string
Labels map[string]string
GCPolicy []client.PruneInfo
Executor executor.Executor
Snapshotter snapshot.Snapshotter
ContentStore *containerdsnapshot.Store
CacheManager cache.Manager
LeaseManager *leaseutil.Manager
GarbageCollect func(context.Context) (gc.Stats, error)
ImageSource *imageadapter.Source
DownloadManager *xfer.LayerDownloadManager
V2MetadataService distmetadata.V2MetadataService
Transport nethttp.RoundTripper
Exporter exporter.Exporter
Layers LayerAccess
Platforms []ocispec.Platform
CDIManager *cdidevices.Manager
}
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
// TODO: s/Worker/OpWorker/g ?
type Worker struct {
Opt
SourceManager *source.Manager
GitSource *git.Source
HTTPSource *http.Source
}
var _ interface {
GetRemotes(context.Context, cache.ImmutableRef, bool, cacheconfig.RefConfig, bool, session.Group) ([]*solver.Remote, error)
} = &Worker{}
// NewWorker instantiates a local worker
func NewWorker(opt Opt) (*Worker, error) {
sm, err := source.NewManager()
if err != nil {
return nil, err
}
cm := opt.CacheManager
sm.Register(opt.ImageSource)
gs, err := git.NewSource(git.Opt{
CacheAccessor: cm,
})
if err == nil {
sm.Register(gs)
} else {
log.G(context.TODO()).Warnf("Could not register builder git source: %s", err)
}
hs, err := http.NewSource(http.Opt{
CacheAccessor: cm,
Transport: opt.Transport,
})
if err == nil {
sm.Register(hs)
} else {
log.G(context.TODO()).Warnf("Could not register builder http source: %s", err)
}
ss, err := local.NewSource(local.Opt{
CacheAccessor: cm,
})
if err == nil {
sm.Register(ss)
} else {
log.G(context.TODO()).Warnf("Could not register builder local source: %s", err)
}
return &Worker{
Opt: opt,
SourceManager: sm,
GitSource: gs,
HTTPSource: hs,
}, nil
}
// ID returns worker ID
func (w *Worker) ID() string {
return w.Opt.ID
}
// Labels returns map of all worker labels
func (w *Worker) Labels() map[string]string {
return w.Opt.Labels
}
// Platforms returns one or more platforms supported by the image.
func (w *Worker) Platforms(noCache bool) []ocispec.Platform {
if noCache {
w.Opt.Platforms = mergePlatforms(w.Opt.Platforms, archutil.SupportedPlatforms(noCache))
}
if len(w.Opt.Platforms) == 0 {
return []ocispec.Platform{platforms.DefaultSpec()}
}
return w.Opt.Platforms
}
// mergePlatforms merges the defined platforms with the supported platforms
// and returns a new slice of platforms. It ensures no duplicates.
func mergePlatforms(defined, supported []ocispec.Platform) []ocispec.Platform {
result := []ocispec.Platform{}
matchers := make([]platforms.MatchComparer, len(defined))
for i, p := range defined {
result = append(result, p)
matchers[i] = platforms.Only(p)
}
for _, p := range supported {
exists := false
for _, m := range matchers {
if m.Match(p) {
exists = true
break
}
}
if !exists {
result = append(result, p)
}
}
return result
}
// GCPolicy returns automatic GC Policy
func (w *Worker) GCPolicy() []client.PruneInfo {
return w.Opt.GCPolicy
}
// BuildkitVersion returns BuildKit version
func (w *Worker) BuildkitVersion() client.BuildkitVersion {
return client.BuildkitVersion{
Package: version.Package,
Version: version.Version + "-moby",
Revision: version.Revision,
}
}
func (w *Worker) GarbageCollect(ctx context.Context) error {
if w.Opt.GarbageCollect == nil {
return nil
}
_, err := w.Opt.GarbageCollect(ctx)
return err
}
// Close closes the worker and releases all resources
func (w *Worker) Close() error {
return nil
}
// ContentStore returns the wrapped content store
func (w *Worker) ContentStore() *containerdsnapshot.Store {
return w.Opt.ContentStore
}
// LeaseManager returns the wrapped lease manager
func (w *Worker) LeaseManager() *leaseutil.Manager {
return w.Opt.LeaseManager
}
// LoadRef loads a reference by ID
func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.ImmutableRef, error) {
var opts []cache.RefOption
if hidden {
opts = append(opts, cache.NoUpdateLastUsed)
}
if id == "" {
// results can have nil refs if they are optimized out to be equal to scratch,
// i.e. Diff(A,A) == scratch
return nil, nil
}
return w.CacheManager().Get(ctx, id, nil, opts...)
}
func (w *Worker) ResolveSourceMetadata(ctx context.Context, op *pb.SourceOp, opt sourceresolver.Opt, sm *session.Manager, jobCtx solver.JobContext) (*sourceresolver.MetaResponse, error) {
if opt.SourcePolicies != nil {
return nil, errors.New("source policies can not be set for worker")
}
var p *ocispec.Platform
if imgOpt := opt.ImageOpt; imgOpt != nil && imgOpt.Platform != nil {
p = imgOpt.Platform
} else if ociOpt := opt.OCILayoutOpt; ociOpt != nil && ociOpt.Platform != nil {
p = ociOpt.Platform
}
var platform *pb.Platform
if p != nil {
platform = &pb.Platform{
Architecture: p.Architecture,
OS: p.OS,
Variant: p.Variant,
OSVersion: p.OSVersion,
}
}
id, err := w.SourceManager.Identifier(&pb.Op_Source{Source: op}, platform)
if err != nil {
return nil, err
}
switch idt := id.(type) {
case *containerimage.ImageIdentifier:
if opt.ImageOpt == nil {
opt.ImageOpt = &sourceresolver.ResolveImageOpt{}
}
if p != nil {
opt.ImageOpt.Platform = p
}
if opt.ImageOpt.AttestationChain {
return nil, errors.Errorf("resolving image attestation chain requires containerd image store support in dockerd")
}
dgst, config, err := w.ImageSource.ResolveImageConfig(ctx, idt.Reference.String(), opt, sm, jobCtx)
if err != nil {
return nil, err
}
if opt.ImageOpt.NoConfig {
config = nil
}
return &sourceresolver.MetaResponse{
Op: op,
Image: &sourceresolver.ResolveImageResponse{
Digest: dgst,
Config: config,
},
}, nil
case *git.GitIdentifier:
if w.GitSource == nil {
return nil, errors.New("git source is not supported")
}
mdOpt := git.MetadataOpts{}
if opt.GitOpt != nil {
mdOpt.ReturnObject = opt.GitOpt.ReturnObject
}
md, err := w.GitSource.ResolveMetadata(ctx, idt, sm, jobCtx, mdOpt)
if err != nil {
return nil, err
}
return &sourceresolver.MetaResponse{
Op: op,
Git: &sourceresolver.ResolveGitResponse{
Checksum: md.Checksum,
Ref: md.Ref,
CommitChecksum: md.CommitChecksum,
CommitObject: md.CommitObject,
TagObject: md.TagObject,
},
}, nil
case *http.HTTPIdentifier:
if w.HTTPSource == nil {
return nil, errors.New("http source is not supported")
}
md, err := w.HTTPSource.ResolveMetadata(ctx, idt, sm, jobCtx)
if err != nil {
return nil, err
}
return &sourceresolver.MetaResponse{
Op: op,
HTTP: &sourceresolver.ResolveHTTPResponse{
Digest: md.Digest,
Filename: md.Filename,
LastModified: md.LastModified,
},
}, nil
}
return &sourceresolver.MetaResponse{
Op: op,
}, nil
}
// ResolveOp converts a LLB vertex into a LLB operation
func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error) {
if baseOp, ok := v.Sys().(*pb.Op); ok {
// TODO do we need to pass a value here? Where should it come from? https://github.com/moby/buildkit/commit/b3cf7c43cfefdfd7a945002c0e76b54e346ab6cf
var parallelism *semaphore.Weighted
switch op := baseOp.Op.(type) {
case *pb.Op_Source:
return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, parallelism, sm, w)
case *pb.Op_Exec:
return ops.NewExecOp(v, op, baseOp.Platform, w.CacheManager(), parallelism, sm, w.Executor(), w)
case *pb.Op_File:
return ops.NewFileOp(v, op, w.CacheManager(), parallelism, w)
case *pb.Op_Build:
return ops.NewBuildOp(v, op, s, w)
case *pb.Op_Merge:
return ops.NewMergeOp(v, op, w)
case *pb.Op_Diff:
return ops.NewDiffOp(v, op, w)
}
}
return nil, errors.Errorf("could not resolve %v", v)
}
// ResolveImageConfig returns image config for an image
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt sourceresolver.Opt, sm *session.Manager, jobCtx solver.JobContext) (digest.Digest, []byte, error) {
return w.ImageSource.ResolveImageConfig(ctx, ref, opt, sm, jobCtx)
}
// DiskUsage returns disk usage report
func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
return w.CacheManager().DiskUsage(ctx, opt)
}
// Prune deletes reclaimable build cache
func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo, info ...client.PruneInfo) error {
return w.CacheManager().Prune(ctx, ch, info...)
}
// Exporter returns exporter by name
func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, error) {
switch name {
case mobyexporter.Moby:
return w.Opt.Exporter, nil
case client.ExporterLocal:
return localexporter.New(localexporter.Opt{
SessionManager: sm,
})
case client.ExporterTar:
return tarexporter.New(tarexporter.Opt{
SessionManager: sm,
})
default:
return nil, errors.Errorf("exporter %q could not be found", name)
}
}
// GetRemotes returns the remote snapshot references given a local reference
func (w *Worker) GetRemotes(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool, _ cacheconfig.RefConfig, all bool, s session.Group) ([]*solver.Remote, error) {
if ref == nil {
return nil, nil
}
var diffIDs []layer.DiffID
var err error
if !createIfNeeded {
diffIDs, err = w.Layers.GetDiffIDs(ctx, ref.ID())
if err != nil {
return nil, err
}
} else {
if err := ref.Finalize(ctx); err != nil {
return nil, err
}
if err := ref.Extract(ctx, s); err != nil {
return nil, err
}
diffIDs, err = w.Layers.EnsureLayer(ctx, ref.ID())
if err != nil {
return nil, err
}
}
descriptors := make([]ocispec.Descriptor, len(diffIDs))
for i, dgst := range diffIDs {
descriptors[i] = ocispec.Descriptor{
MediaType: c8dimages.MediaTypeDockerSchema2Layer,
Digest: dgst,
Size: -1,
}
}
return []*solver.Remote{{
Descriptors: descriptors,
Provider: &emptyProvider{},
}}, nil
}
// PruneCacheMounts removes the current cache snapshots for specified IDs
func (w *Worker) PruneCacheMounts(ctx context.Context, ids map[string]bool) error {
mu := mounts.CacheMountsLocker()
mu.Lock()
defer mu.Unlock()
for id, nested := range ids {
mds, err := mounts.SearchCacheDir(ctx, w.CacheManager(), id, nested)
if err != nil {
return err
}
for _, md := range mds {
if err := md.SetCachePolicyDefault(); err != nil {
return err
}
if err := md.ClearCacheDirIndex(); err != nil {
return err
}
// if ref is unused try to clean it up right away by releasing it
if mref, err := w.CacheManager().GetMutable(ctx, md.ID()); err == nil {
go mref.Release(context.TODO())
}
}
}
mounts.ClearActiveCacheMounts()
return nil
}
func (w *Worker) getRef(ctx context.Context, diffIDs []layer.DiffID, opts ...cache.RefOption) (cache.ImmutableRef, error) {
var parent cache.ImmutableRef
if len(diffIDs) > 1 {
var err error
parent, err = w.getRef(ctx, diffIDs[:len(diffIDs)-1], opts...)
if err != nil {
return nil, err
}
defer parent.Release(context.TODO())
}
return w.CacheManager().GetByBlob(context.TODO(), ocispec.Descriptor{
Annotations: map[string]string{
"containerd.io/uncompressed": diffIDs[len(diffIDs)-1].String(),
},
}, parent, opts...)
}
// FromRemote converts a remote snapshot reference to a local one
func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) {
rootfs, err := getLayers(ctx, remote.Descriptors)
if err != nil {
return nil, err
}
layers := make([]xfer.DownloadDescriptor, 0, len(rootfs))
for _, l := range rootfs {
// ongoing.add(desc)
layers = append(layers, &layerDescriptor{
desc: l.Blob,
diffID: l.Diff.Digest,
provider: remote.Provider,
w: w,
pctx: ctx,
})
}
defer func() {
for _, l := range rootfs {
w.ContentStore().Delete(context.TODO(), l.Blob.Digest)
}
}()
rootFS, release, err := w.DownloadManager.Download(ctx, layers, &discardProgress{})
if err != nil {
return nil, err
}
defer release()
if len(rootFS.DiffIDs) != len(layers) {
return nil, errors.Errorf("invalid layer count mismatch %d vs %d", len(rootFS.DiffIDs), len(layers))
}
for i := range rootFS.DiffIDs {
tm := time.Now()
if tmstr, ok := remote.Descriptors[i].Annotations[labelCreatedAt]; ok {
if err := (&tm).UnmarshalText([]byte(tmstr)); err != nil {
return nil, err
}
}
descr := fmt.Sprintf("imported %s", remote.Descriptors[i].Digest)
if v, ok := remote.Descriptors[i].Annotations["buildkit/description"]; ok {
descr = v
}
ref, err := w.getRef(ctx, rootFS.DiffIDs[:i+1], cache.WithDescription(descr), cache.WithCreationTime(tm))
if err != nil {
return nil, err
}
if i == len(remote.Descriptors)-1 {
return ref, nil
}
defer ref.Release(context.TODO())
}
return nil, errors.Errorf("unreachable")
}
// Executor returns executor.Executor for running processes
func (w *Worker) Executor() executor.Executor {
return w.Opt.Executor
}
// CacheManager returns cache.Manager for accessing local storage
func (w *Worker) CacheManager() cache.Manager {
return w.Opt.CacheManager
}
func (w *Worker) CDIManager() *cdidevices.Manager {
return w.Opt.CDIManager
}
type discardProgress struct{}
func (*discardProgress) WriteProgress(_ pkgprogress.Progress) error {
return nil
}
// Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
type layerDescriptor struct {
provider content.Provider
desc ocispec.Descriptor
diffID layer.DiffID
// ref ctdreference.Spec
w *Worker
pctx context.Context
}
func (ld *layerDescriptor) Key() string {
return "v2:" + ld.desc.Digest.String()
}
func (ld *layerDescriptor) ID() string {
return ld.desc.Digest.String()
}
func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
return ld.diffID, nil
}
func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) {
done := oneOffProgress(ld.pctx, fmt.Sprintf("pulling %s", ld.desc.Digest))
// TODO should this write output to progressOutput? Or use something similar to loggerFromContext()? see https://github.com/moby/buildkit/commit/aa29e7729464f3c2a773e27795e584023c751cb8
discardLogs := func(_ []byte) {}
if err := contentutil.Copy(ctx, ld.w.ContentStore(), ld.provider, ld.desc, "", discardLogs); err != nil {
return nil, 0, done(err)
}
_ = done(nil)
ra, err := ld.w.ContentStore().ReaderAt(ctx, ld.desc)
if err != nil {
return nil, 0, err
}
return io.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
}
func (ld *layerDescriptor) Close() {
// ld.is.ContentStore().Delete(context.TODO(), ld.desc.Digest)
}
func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
// Cache mapping from this layer's DiffID to the blobsum
ld.w.V2MetadataService.Add(diffID, distmetadata.V2Metadata{Digest: ld.desc.Digest})
}
func getLayers(ctx context.Context, descs []ocispec.Descriptor) ([]rootfs.Layer, error) {
layers := make([]rootfs.Layer, len(descs))
for i, desc := range descs {
diffIDStr := desc.Annotations["containerd.io/uncompressed"]
if diffIDStr == "" {
return nil, errors.Errorf("%s missing uncompressed digest", desc.Digest)
}
diffID, err := digest.Parse(diffIDStr)
if err != nil {
return nil, err
}
layers[i].Diff = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayer,
Digest: diffID,
}
layers[i].Blob = ocispec.Descriptor{
MediaType: desc.MediaType,
Digest: desc.Digest,
Size: desc.Size,
}
}
return layers, nil
}
func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.NewFromContext(ctx)
s := time.Now()
st := progress.Status{
Started: &s,
}
_ = pw.Write(id, st)
return func(err error) error {
// TODO: set error on status
c := time.Now()
st.Completed = &c
_ = pw.Write(id, st)
_ = pw.Close()
return err
}
}
type emptyProvider struct{}
func (p *emptyProvider) ReaderAt(ctx context.Context, dec ocispec.Descriptor) (content.ReaderAt, error) {
return nil, errors.Errorf("ReaderAt not implemented for empty provider")
}
func (p *emptyProvider) Info(ctx context.Context, d digest.Digest) (content.Info, error) {
return content.Info{}, errors.Wrapf(cerrdefs.ErrNotImplemented, "Info not implemented for empty provider")
}