Merge pull request #48009 from dmcgowan/containerd-migration

Update containerd to default storage and add support for migration
This commit is contained in:
Austin Vazquez
2025-08-11 13:33:14 -07:00
committed by GitHub
19 changed files with 806 additions and 123 deletions

View File

@@ -10,6 +10,7 @@ import (
"crypto/sha256"
"encoding/binary"
"fmt"
"maps"
"net"
"net/netip"
"os"
@@ -28,6 +29,7 @@ import (
"github.com/containerd/log"
"github.com/distribution/reference"
dist "github.com/docker/distribution"
"github.com/docker/go-units"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/tracing"
"github.com/moby/locker"
@@ -37,12 +39,27 @@ import (
registrytypes "github.com/moby/moby/api/types/registry"
"github.com/moby/moby/api/types/swarm"
volumetypes "github.com/moby/moby/api/types/volume"
"github.com/moby/sys/user"
"github.com/moby/sys/userns"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
"resenje.org/singleflight"
"tags.cncf.io/container-device-interface/pkg/cdi"
"github.com/moby/moby/v2/daemon/builder"
executorpkg "github.com/moby/moby/v2/daemon/cluster/executor"
"github.com/moby/moby/v2/daemon/config"
"github.com/moby/moby/v2/daemon/container"
ctrd "github.com/moby/moby/v2/daemon/containerd"
"github.com/moby/moby/v2/daemon/containerd/migration"
"github.com/moby/moby/v2/daemon/events"
"github.com/moby/moby/v2/daemon/graphdriver"
_ "github.com/moby/moby/v2/daemon/graphdriver/register" // register graph drivers
"github.com/moby/moby/v2/daemon/images"
"github.com/moby/moby/v2/daemon/internal/distribution"
@@ -72,18 +89,6 @@ import (
"github.com/moby/moby/v2/pkg/authorization"
"github.com/moby/moby/v2/pkg/plugingetter"
"github.com/moby/moby/v2/pkg/sysinfo"
"github.com/moby/sys/user"
"github.com/moby/sys/userns"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
"resenje.org/singleflight"
"tags.cncf.io/container-device-interface/pkg/cdi"
)
type configStore struct {
@@ -203,15 +208,15 @@ func (daemon *Daemon) UsesSnapshotter() bool {
return daemon.usesSnapshotter
}
func (daemon *Daemon) restore(cfg *configStore) error {
func (daemon *Daemon) loadContainers(ctx context.Context) (map[string]map[string]*container.Container, error) {
var mapLock sync.Mutex
containers := make(map[string]*container.Container)
driverContainers := make(map[string]map[string]*container.Container)
log.G(context.TODO()).Info("Loading containers: start.")
log.G(ctx).Info("Loading containers: start.")
dir, err := os.ReadDir(daemon.repository)
if err != nil {
return err
return nil, err
}
// parallelLimit is the maximum number of parallel startup jobs that we
@@ -232,36 +237,46 @@ func (daemon *Daemon) restore(cfg *configStore) error {
_ = sem.Acquire(context.Background(), 1)
defer sem.Release(1)
logger := log.G(context.TODO()).WithField("container", id)
logger := log.G(ctx).WithField("container", id)
c, err := daemon.load(id)
if err != nil {
logger.WithError(err).Error("failed to load container")
return
}
if c.Driver != daemon.imageService.StorageDriver() {
// Ignore the container if it wasn't created with the current storage-driver
logger.Debugf("not restoring container because it was created with another storage driver (%s)", c.Driver)
return
}
rwlayer, err := daemon.imageService.GetLayerByID(c.ID)
if err != nil {
logger.WithError(err).Error("failed to load container mount")
return
}
c.RWLayer = rwlayer
logger.WithFields(log.Fields{
"running": c.IsRunning(),
"paused": c.IsPaused(),
}).Debug("loaded container")
mapLock.Lock()
containers[c.ID] = c
if containers, ok := driverContainers[c.Driver]; !ok {
driverContainers[c.Driver] = map[string]*container.Container{
c.ID: c,
}
} else {
containers[c.ID] = c
}
mapLock.Unlock()
}(v.Name())
}
group.Wait()
return driverContainers, nil
}
func (daemon *Daemon) restore(ctx context.Context, cfg *configStore, containers map[string]*container.Container) error {
var mapLock sync.Mutex
log.G(ctx).Info("Restoring containers: start.")
// parallelLimit is the maximum number of parallel startup jobs that we
// allow (this is the limited used for all startup semaphores). The multipler
// (128) was chosen after some fairly significant benchmarking -- don't change
// it unless you've tested it significantly (this value is adjusted if
// RLIMIT_NOFILE is small to avoid EMFILE).
parallelLimit := adjustParallelLimit(len(containers), 128*runtime.NumCPU())
// Re-used for all parallel startup jobs.
var group sync.WaitGroup
sem := semaphore.NewWeighted(int64(parallelLimit))
removeContainers := make(map[string]*container.Container)
restartContainers := make(map[*container.Container]chan struct{})
activeSandboxes := make(map[string]any)
@@ -273,17 +288,28 @@ func (daemon *Daemon) restore(cfg *configStore) error {
_ = sem.Acquire(context.Background(), 1)
defer sem.Release(1)
logger := log.G(context.TODO()).WithField("container", c.ID)
logger := log.G(ctx).WithField("container", c.ID)
rwlayer, err := daemon.imageService.GetLayerByID(c.ID)
if err != nil {
logger.WithError(err).Error("failed to load container mount")
return
}
c.RWLayer = rwlayer
logger.WithFields(log.Fields{
"running": c.IsRunning(),
"paused": c.IsPaused(),
}).Debug("loaded container")
if err := daemon.registerName(c); err != nil {
logger.WithError(err).Errorf("failed to register container name: %s", c.Name)
log.G(ctx).WithError(err).Errorf("failed to register container name: %s", c.Name)
mapLock.Lock()
delete(containers, c.ID)
mapLock.Unlock()
return
}
if err := daemon.register(context.TODO(), c); err != nil {
logger.WithError(err).Error("failed to register container")
log.G(ctx).WithError(err).Error("failed to register container")
mapLock.Lock()
delete(containers, c.ID)
mapLock.Unlock()
@@ -300,7 +326,7 @@ func (daemon *Daemon) restore(cfg *configStore) error {
_ = sem.Acquire(context.Background(), 1)
defer sem.Release(1)
baseLogger := log.G(context.TODO()).WithField("container", c.ID)
baseLogger := log.G(ctx).WithField("container", c.ID)
if c.HostConfig != nil {
// Migrate containers that don't have the default ("no") restart-policy set.
@@ -524,7 +550,7 @@ func (daemon *Daemon) restore(cfg *configStore) error {
//
// Note that we cannot initialize the network controller earlier, as it
// needs to know if there's active sandboxes (running containers).
if err = daemon.initNetworkController(&cfg.Config, activeSandboxes); err != nil {
if err := daemon.initNetworkController(&cfg.Config, activeSandboxes); err != nil {
return fmt.Errorf("Error initializing network controller: %v", err)
}
@@ -553,7 +579,7 @@ func (daemon *Daemon) restore(cfg *configStore) error {
_ = sem.Acquire(context.Background(), 1)
if err := daemon.registerLinks(c, c.HostConfig); err != nil {
log.G(context.TODO()).WithField("container", c.ID).WithError(err).Error("failed to register link for container")
log.G(ctx).WithField("container", c.ID).WithError(err).Error("failed to register link for container")
}
sem.Release(1)
@@ -567,7 +593,7 @@ func (daemon *Daemon) restore(cfg *configStore) error {
go func(c *container.Container, chNotify chan struct{}) {
_ = sem.Acquire(context.Background(), 1)
logger := log.G(context.TODO()).WithField("container", c.ID)
logger := log.G(ctx).WithField("container", c.ID)
logger.Debug("starting container")
@@ -607,7 +633,7 @@ func (daemon *Daemon) restore(cfg *configStore) error {
_ = sem.Acquire(context.Background(), 1)
if err := daemon.containerRm(&cfg.Config, cid, &backend.ContainerRmConfig{ForceRemove: true, RemoveVolume: true}); err != nil {
log.G(context.TODO()).WithField("container", cid).WithError(err).Error("failed to remove container")
log.G(ctx).WithField("container", cid).WithError(err).Error("failed to remove container")
}
sem.Release(1)
@@ -638,7 +664,7 @@ func (daemon *Daemon) restore(cfg *configStore) error {
_ = sem.Acquire(context.Background(), 1)
if err := daemon.prepareMountPoints(c); err != nil {
log.G(context.TODO()).WithField("container", c.ID).WithError(err).Error("failed to prepare mountpoints for container")
log.G(ctx).WithField("container", c.ID).WithError(err).Error("failed to prepare mountpoints for container")
}
sem.Release(1)
@@ -647,7 +673,7 @@ func (daemon *Daemon) restore(cfg *configStore) error {
}
group.Wait()
log.G(context.TODO()).Info("Loading containers: done.")
log.G(ctx).Info("Loading containers: done.")
return nil
}
@@ -833,11 +859,39 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
}
d.configStore.Store(cfgStore)
// TEST_INTEGRATION_USE_SNAPSHOTTER is used for integration tests only.
if os.Getenv("TEST_INTEGRATION_USE_SNAPSHOTTER") != "" {
d.usesSnapshotter = true
} else {
d.usesSnapshotter = config.Features["containerd-snapshotter"]
migrationThreshold := int64(-1)
isGraphDriver := func(driver string) (bool, error) {
return graphdriver.IsRegistered(driver), nil
}
if enabled, ok := config.Features["containerd-snapshotter"]; (ok && !enabled) || os.Getenv("TEST_INTEGRATION_USE_GRAPHDRIVER") != "" {
isGraphDriver = func(driver string) (bool, error) {
if driver == "" || graphdriver.IsRegistered(driver) {
return true, nil
}
return false, fmt.Errorf("graphdriver is explicitly enabled but %q is not registered, %v %v", driver, config.Features, os.Getenv("TEST_INTEGRATION_USE_GRAPHDRIVER"))
}
}
if config.Features["containerd-migration"] {
if ts := os.Getenv("DOCKER_MIGRATE_SNAPSHOTTER_THRESHOLD"); ts != "" {
v, err := units.FromHumanSize(ts)
if err == nil {
migrationThreshold = v
} else {
log.G(ctx).WithError(err).WithField("size", ts).Warn("Invalid migration threshold value, defaulting to 0")
migrationThreshold = 0
}
} else {
migrationThreshold = 0
}
if migrationThreshold > 0 {
log.G(ctx).WithField("max_size", migrationThreshold).Info("(Experimental) Migration to containerd is enabled, driver will be switched to snapshotter after migration is complete")
} else {
log.G(ctx).WithField("env", os.Environ()).Info("Migration to containerd is enabled, driver will be switched to snapshotter if there are no images or containers")
}
}
if config.Features["containerd-snapshotter"] {
log.G(ctx).Warn(`"containerd-snapshotter" is now the default and no longer needed to be set`)
}
// Ensure the daemon is properly shutdown if there is a failure during
@@ -1034,50 +1088,49 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
d.linkIndex = newLinkIndex()
// On Windows we don't support the environment variable, or a user supplied graphdriver
containers, err := d.loadContainers(ctx)
if err != nil {
return nil, err
}
// On Windows we don't support the environment variable, or a user supplied graphdriver,
// but it is allowed when using snapshotters.
// Unix platforms however run a single graphdriver for all containers, and it can
// be set through an environment variable, a daemon start parameter, or chosen through
// initialization of the layerstore through driver priority order for example.
driverName := os.Getenv("DOCKER_DRIVER")
if isWindows && d.UsesSnapshotter() {
// Containerd WCOW snapshotter
driverName = "windows"
} else if isWindows {
// Docker WCOW graphdriver
driverName = "windowsfilter"
driverName := os.Getenv("DOCKER_GRAPHDRIVER")
if isWindows {
if driverName == "" {
driverName = cfgStore.GraphDriver
}
switch driverName {
case "windows":
// Docker WCOW snapshotters
case "windowsfilter":
// Docker WCOW graphdriver
case "":
// Use graph driver but enable migration
driverName = "windowsfilter"
if os.Getenv("TEST_INTEGRATION_USE_GRAPHDRIVER") == "" {
// Don't force migration if graph driver is explicit
migrationThreshold = 0
}
default:
log.G(ctx).Infof("Using non-default snapshotter %s", driverName)
}
} else if driverName != "" {
log.G(ctx).Infof("Setting the storage driver from the $DOCKER_DRIVER environment variable (%s)", driverName)
log.G(ctx).Infof("Setting the storage driver from the $DOCKER_GRAPHDRIVER environment variable (%s)", driverName)
} else {
driverName = cfgStore.GraphDriver
}
if d.UsesSnapshotter() {
if os.Getenv("TEST_INTEGRATION_USE_SNAPSHOTTER") != "" {
log.G(ctx).Warn("Enabling containerd snapshotter through the $TEST_INTEGRATION_USE_SNAPSHOTTER environment variable. This should only be used for testing.")
}
log.G(ctx).Info("Starting daemon with containerd snapshotter integration enabled")
// FIXME(thaJeztah): implement automatic snapshotter-selection similar to graph-driver selection; see https://github.com/moby/moby/issues/44076
if driverName == "" {
driverName = defaults.DefaultSnapshotter
}
// Configure and validate the kernels security support. Note this is a Linux/FreeBSD
// operation only, so it is safe to pass *just* the runtime OS graphdriver.
if err := configureKernelSecuritySupport(&cfgStore.Config, driverName); err != nil {
return nil, err
}
d.imageService = ctrd.NewService(ctrd.ImageServiceConfig{
Client: d.containerdClient,
Containers: d.containers,
Snapshotter: driverName,
RegistryHosts: d.RegistryHosts,
Registry: d.registryService,
EventsService: d.EventsService,
IDMapping: idMapping,
RefCountMounter: snapshotter.NewMounter(config.Root, driverName, idMapping),
})
} else {
var migrationConfig migration.Config
tryGraphDriver, err := isGraphDriver(driverName)
if err != nil {
return nil, err
}
if tryGraphDriver {
layerStore, err := layer.NewStoreFromOptions(layer.StoreOptions{
Root: cfgStore.Root,
GraphDriver: driverName,
@@ -1154,14 +1207,129 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
}
}
// TODO: imageStore, distributionMetadataStore, and ReferenceStore are only
// used above to run migration. They could be initialized in ImageService
// if migration is called from daemon/images. layerStore might move as well.
d.imageService = images.NewImageService(imgSvcConfig)
// If no containers are present, check whether can migrate image service
if drv := layerStore.DriverName(); len(containers[drv]) == 0 && migrationThreshold >= 0 {
switch drv {
case "overlay2":
driverName = "overlayfs"
case "windowsfilter":
driverName = "windows"
case "vfs":
driverName = "native"
default:
migrationThreshold = -1
log.G(ctx).Infof("Not migrating to containerd snapshotter, no migration defined for graph driver %q", drv)
}
log.G(ctx).Debugf("Max Concurrent Downloads: %d", imgSvcConfig.MaxConcurrentDownloads)
log.G(ctx).Debugf("Max Concurrent Uploads: %d", imgSvcConfig.MaxConcurrentUploads)
log.G(ctx).Debugf("Max Download Attempts: %d", imgSvcConfig.MaxDownloadAttempts)
var totalSize int64
ic := imgSvcConfig.ImageStore.Len()
if migrationThreshold >= 0 && ic > 0 {
for _, img := range imgSvcConfig.ImageStore.Map() {
if layerID := img.RootFS.ChainID(); layerID != "" {
l, err := imgSvcConfig.LayerStore.Get(layerID)
if err != nil {
if errors.Is(err, layer.ErrLayerDoesNotExist) {
continue
}
return nil, err
}
// Just look at layer size for considering maximum size
totalSize += l.Size()
layer.ReleaseAndLog(imgSvcConfig.LayerStore, l)
}
}
}
if totalSize <= migrationThreshold {
log.G(ctx).WithField("total", totalSize).Infof("Enabling containerd snapshotter because migration set with no containers and %d images in graph driver", ic)
migrationConfig = migration.Config{
ImageCount: ic,
LayerStore: imgSvcConfig.LayerStore,
DockerImageStore: imgSvcConfig.ImageStore,
ReferenceStore: imgSvcConfig.ReferenceStore,
}
} else if migrationThreshold >= 0 {
log.G(ctx).WithField("total", totalSize).Warnf("Not migrating to containerd snapshotter because still have %d images in graph driver", ic)
d.imageService = images.NewImageService(ctx, imgSvcConfig)
} else {
d.imageService = images.NewImageService(ctx, imgSvcConfig)
}
} else {
log.G(ctx).Debugf("Not attempting migration with %d containers and %d image threshold", len(containers[drv]), migrationThreshold)
d.imageService = images.NewImageService(ctx, imgSvcConfig)
}
}
if d.imageService == nil {
log.G(ctx).Info("Starting daemon with containerd snapshotter integration enabled")
resp, err := d.containerdClient.IntrospectionService().Plugins(ctx, `type=="io.containerd.snapshotter.v1"`)
if err != nil {
return nil, fmt.Errorf("failed to get containerd plugins: %w", err)
}
if resp == nil || len(resp.Plugins) == 0 {
return nil, fmt.Errorf("failed to get containerd plugins response: %w", cerrdefs.ErrUnavailable)
}
availableDrivers := map[string]struct{}{}
for _, p := range resp.Plugins {
if p == nil || p.Type != "io.containerd.snapshotter.v1" {
continue
}
if p.InitErr == nil {
availableDrivers[p.ID] = struct{}{}
} else if (p.ID == driverName) || (driverName == "" && p.ID == defaults.DefaultSnapshotter) {
log.G(ctx).WithField("message", p.InitErr.Message).Warn("Preferred snapshotter not available in containerd")
}
}
if driverName == "" {
if _, ok := availableDrivers[defaults.DefaultSnapshotter]; ok {
driverName = defaults.DefaultSnapshotter
} else if _, ok := availableDrivers["native"]; ok {
driverName = "native"
} else {
log.G(ctx).WithField("available", maps.Keys(availableDrivers)).Debug("Preferred snapshotter not available in containerd")
return nil, fmt.Errorf("snapshotter selection failed, no drivers available: %w", cerrdefs.ErrUnavailable)
}
} else if _, ok := availableDrivers[driverName]; !ok {
return nil, fmt.Errorf("configured driver %q not available: %w", driverName, cerrdefs.ErrUnavailable)
}
// Configure and validate the kernels security support. Note this is a Linux/FreeBSD
// operation only, so it is safe to pass *just* the runtime OS graphdriver.
if err := configureKernelSecuritySupport(&cfgStore.Config, driverName); err != nil {
return nil, err
}
d.usesSnapshotter = true
d.imageService = ctrd.NewService(ctrd.ImageServiceConfig{
Client: d.containerdClient,
Containers: d.containers,
Snapshotter: driverName,
RegistryHosts: d.RegistryHosts,
Registry: d.registryService,
EventsService: d.EventsService,
IDMapping: idMapping,
RefCountMounter: snapshotter.NewMounter(config.Root, driverName, idMapping),
})
if migrationConfig.ImageCount > 0 {
if d.imageService.CountImages(ctx) > 0 {
log.G(ctx).WithField("image_count", migrationConfig.ImageCount).Warnf("Images not migrated because images already exist in containerd %q", migrationConfig.LayerStore.DriverName())
} else {
migrationConfig.Leases = d.containerdClient.LeasesService()
migrationConfig.Content = d.containerdClient.ContentStore()
migrationConfig.ImageStore = d.containerdClient.ImageService()
m := migration.NewLayerMigrator(migrationConfig)
err := m.MigrateTocontainerd(ctx, driverName, d.containerdClient.SnapshotService(driverName))
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to migrate images to containerd, images in graph driver %q are no longer visible", migrationConfig.LayerStore.DriverName())
} else {
log.G(ctx).WithField("image_count", migrationConfig.ImageCount).Infof("Successfully migrated images from %q to containerd", migrationConfig.LayerStore.DriverName())
}
}
}
}
go d.execCommandGC()
@@ -1171,9 +1339,22 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
return nil, err
}
if err := d.restore(cfgStore); err != nil {
driverContainers, ok := containers[driverName]
// Log containers which are not loaded with current driver
if (!ok && len(containers) > 0) || len(containers) > 1 {
for driver, all := range containers {
if driver == driverName {
continue
}
for id := range all {
log.G(ctx).WithField("container", id).Debugf("not restoring container because it was created with another storage driver (%s)", driver)
}
}
}
if err := d.restore(ctx, cfgStore, driverContainers); err != nil {
return nil, err
}
// Wait for migration to complete
close(d.startupDone)
info, err := d.SystemInfo(ctx)