Files
moby/daemon/containerd/migration/migration.go
Derek McGowan 632de98f75 Enable containerd snapshotters by default
Signed-off-by: Derek McGowan <derek@mcg.dev>
2025-08-08 12:07:41 -07:00

318 lines
8.5 KiB
Go

package migration
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/leases"
"github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/core/snapshots"
"github.com/containerd/containerd/v2/pkg/archive/compression"
"github.com/containerd/continuity/fs"
cerrdefs "github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/moby/moby/v2/daemon/internal/image"
"github.com/moby/moby/v2/daemon/internal/layer"
refstore "github.com/moby/moby/v2/daemon/internal/refstore"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
)
type LayerMigrator struct {
layers layer.Store
refs refstore.Store
dis image.Store
leases leases.Manager
content content.Store
cis images.Store
}
type Config struct {
ImageCount int
LayerStore layer.Store
ReferenceStore refstore.Store
DockerImageStore image.Store
Leases leases.Manager
Content content.Store
ImageStore images.Store
}
func NewLayerMigrator(config Config) *LayerMigrator {
return &LayerMigrator{
layers: config.LayerStore,
refs: config.ReferenceStore,
dis: config.DockerImageStore,
leases: config.Leases,
content: config.Content,
cis: config.ImageStore,
}
}
// MigrateTocontainerd migrates containers from overlay2 to overlayfs or vfs to native
func (lm *LayerMigrator) MigrateTocontainerd(ctx context.Context, snKey string, sn snapshots.Snapshotter) error {
if sn == nil {
return fmt.Errorf("no snapshotter to migrate to: %w", cerrdefs.ErrNotImplemented)
}
switch driver := lm.layers.DriverName(); driver {
case "overlay2":
case "vfs":
default:
return fmt.Errorf("%q not supported for migration: %w", driver, cerrdefs.ErrNotImplemented)
}
var (
// Zstd makes migration 10x faster
// TODO: make configurable
layerMediaType = ocispec.MediaTypeImageLayerZstd
layerCompression = compression.Zstd
)
l, err := lm.leases.Create(ctx, leases.WithRandomID(), leases.WithExpiration(24*time.Hour))
if err != nil {
return err
}
defer func() {
lm.leases.Delete(ctx, l)
}()
ctx = leases.WithLease(ctx, l.ID)
for imgID, img := range lm.dis.Heads() {
diffids := img.RootFS.DiffIDs
if len(diffids) == 0 {
continue
}
var (
parent string
manifest = ocispec.Manifest{
MediaType: ocispec.MediaTypeImageManifest,
Versioned: specs.Versioned{
SchemaVersion: 2,
},
Layers: make([]ocispec.Descriptor, len(diffids)),
}
ml sync.Mutex
eg, egctx = errgroup.WithContext(ctx)
)
for i := range diffids {
chainID := identity.ChainID(diffids[:i+1])
l, err := lm.layers.Get(chainID)
if err != nil {
return fmt.Errorf("failed to get layer [%d] %q: %w", i, chainID, err)
}
layerIndex := i
eg.Go(func() error {
ctx := egctx
t1 := time.Now()
ts, err := l.TarStream()
if err != nil {
return err
}
desc := ocispec.Descriptor{
MediaType: layerMediaType,
}
cw, err := lm.content.Writer(ctx,
content.WithRef(fmt.Sprintf("ingest-%s", chainID)),
content.WithDescriptor(desc))
if err != nil {
return fmt.Errorf("failed to get content writer: %w", err)
}
dgstr := digest.Canonical.Digester()
cs, _ := compression.CompressStream(io.MultiWriter(cw, dgstr.Hash()), layerCompression)
_, err = io.Copy(cs, ts)
if err != nil {
return fmt.Errorf("failed to copy to compressed stream: %w", err)
}
cs.Close()
status, err := cw.Status()
if err != nil {
return err
}
desc.Size = status.Offset
desc.Digest = dgstr.Digest()
if err := cw.Commit(ctx, desc.Size, desc.Digest); err != nil && !cerrdefs.IsAlreadyExists(err) {
return err
}
log.G(ctx).WithFields(log.Fields{
"t": time.Since(t1),
"size": desc.Size,
"digest": desc.Digest,
}).Debug("Converted layer to content tar")
ml.Lock()
manifest.Layers[layerIndex] = desc
ml.Unlock()
return nil
})
metadata, err := l.Metadata()
if err != nil {
return err
}
src, ok := metadata["UpperDir"]
if !ok {
src, ok = metadata["SourceDir"]
if !ok {
log.G(ctx).WithField("metadata", metadata).WithField("driver", lm.layers.DriverName()).Debug("no source directory metadata")
return fmt.Errorf("graphdriver not supported: %w", cerrdefs.ErrNotImplemented)
}
}
log.G(ctx).WithField("metadata", metadata).Debugf("migrating %s from %s", chainID, src)
active := fmt.Sprintf("migration-%s", chainID)
key := chainID.String()
snapshotLabels := map[string]string{
"containerd.io/snapshot.ref": key,
}
mounts, err := sn.Prepare(ctx, active, parent, snapshots.WithLabels(snapshotLabels))
parent = key
if err != nil {
if cerrdefs.IsAlreadyExists(err) {
continue
}
return err
}
dst, err := extractSource(mounts)
if err != nil {
return err
}
t1 := time.Now()
if err := fs.CopyDir(dst, src); err != nil {
return err
}
log.G(ctx).WithFields(log.Fields{
"t": time.Since(t1),
"key": key,
}).Debug("Copied layer to snapshot")
if err := sn.Commit(ctx, key, active); err != nil && !cerrdefs.IsAlreadyExists(err) {
return err
}
}
configBytes := img.RawJSON()
digest.FromBytes(configBytes)
manifest.Config = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageConfig,
Digest: digest.FromBytes(configBytes),
Size: int64(len(configBytes)),
}
configLabels := map[string]string{
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snKey): parent,
}
if err = content.WriteBlob(ctx, lm.content, "config"+manifest.Config.Digest.String(), bytes.NewReader(configBytes), manifest.Config, content.WithLabels(configLabels)); err != nil && !cerrdefs.IsAlreadyExists(err) {
return err
}
if err := eg.Wait(); err != nil {
return err
}
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return err
}
manifestDesc := ocispec.Descriptor{
MediaType: manifest.MediaType,
Digest: digest.FromBytes(manifestBytes),
Size: int64(len(manifestBytes)),
}
manifestLabels := map[string]string{
"containerd.io/gc.ref.content.config": manifest.Config.Digest.String(),
}
for i := range manifest.Layers {
manifestLabels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = manifest.Layers[i].Digest.String()
}
if err = content.WriteBlob(ctx, lm.content, "manifest"+manifestDesc.Digest.String(), bytes.NewReader(manifestBytes), manifestDesc, content.WithLabels(manifestLabels)); err != nil && !cerrdefs.IsAlreadyExists(err) {
return err
}
childrenHandler := images.ChildrenHandler(lm.content)
childrenHandler = images.SetChildrenMappedLabels(lm.content, childrenHandler, nil)
if err = images.Walk(ctx, childrenHandler, manifestDesc); err != nil {
return err
}
var added bool
for _, named := range lm.refs.References(digest.Digest(imgID)) {
img := images.Image{
Name: named.String(),
Target: manifestDesc,
// TODO: Any labels?
}
img, err = lm.cis.Create(ctx, img)
if err != nil && !cerrdefs.IsAlreadyExists(err) {
return err
} else if err != nil {
log.G(ctx).Infof("Tag already exists: %s", named)
continue
}
log.G(ctx).Infof("Migrated image %s to %s", img.Name, img.Target.Digest)
added = true
}
if !added {
img := images.Image{
Name: "moby-dangling@" + manifestDesc.Digest.String(),
Target: manifestDesc,
// TODO: Any labels?
}
img, err = lm.cis.Create(ctx, img)
if err != nil && !cerrdefs.IsAlreadyExists(err) {
return err
} else if err == nil {
log.G(ctx).Infof("Migrated image %s to %s", img.Name, img.Target.Digest)
}
}
}
return nil
}
func extractSource(mounts []mount.Mount) (string, error) {
if len(mounts) != 1 {
return "", fmt.Errorf("cannot support snapshotters with multiple mount sources: %w", cerrdefs.ErrNotImplemented)
}
switch mounts[0].Type {
case "bind":
return mounts[0].Source, nil
case "overlay":
for _, option := range mounts[0].Options {
if strings.HasPrefix(option, "upperdir=") {
return option[9:], nil
}
}
default:
return "", fmt.Errorf("mount type %q not supported: %w", mounts[0].Type, cerrdefs.ErrNotImplemented)
}
return "", fmt.Errorf("mount is missing upper option: %w", cerrdefs.ErrNotImplemented)
}