Add containerd migration to daemon startup

Add layer migration on startup
Use image size threshold rather than image count
Add daemon integration test
Add test for migrating to containerd snapshotters
Add vfs migration
Add tar export for containerd migration
Add containerd migration test with save and load

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-10-09 15:19:32 -07:00
parent 4b97831992
commit 9f5f4f5a42
4 changed files with 645 additions and 51 deletions

View File

@@ -10,6 +10,7 @@ import (
"crypto/sha256"
"encoding/binary"
"fmt"
"math"
"net"
"net/netip"
"os"
@@ -42,6 +43,7 @@ import (
"github.com/moby/moby/v2/daemon/config"
"github.com/moby/moby/v2/daemon/container"
ctrd "github.com/moby/moby/v2/daemon/containerd"
"github.com/moby/moby/v2/daemon/containerd/migration"
"github.com/moby/moby/v2/daemon/events"
_ "github.com/moby/moby/v2/daemon/graphdriver/register" // register graph drivers
"github.com/moby/moby/v2/daemon/images"
@@ -202,15 +204,15 @@ func (daemon *Daemon) UsesSnapshotter() bool {
return daemon.usesSnapshotter
}
func (daemon *Daemon) restore(cfg *configStore) error {
func (daemon *Daemon) loadContainers() (map[string]map[string]*container.Container, error) {
var mapLock sync.Mutex
containers := make(map[string]*container.Container)
driverContainers := make(map[string]map[string]*container.Container)
log.G(context.TODO()).Info("Loading containers: start.")
dir, err := os.ReadDir(daemon.repository)
if err != nil {
return err
return nil, err
}
// parallelLimit is the maximum number of parallel startup jobs that we
@@ -238,29 +240,39 @@ func (daemon *Daemon) restore(cfg *configStore) error {
logger.WithError(err).Error("failed to load container")
return
}
if c.Driver != daemon.imageService.StorageDriver() {
// Ignore the container if it wasn't created with the current storage-driver
logger.Debugf("not restoring container because it was created with another storage driver (%s)", c.Driver)
return
}
rwlayer, err := daemon.imageService.GetLayerByID(c.ID)
if err != nil {
logger.WithError(err).Error("failed to load container mount")
return
}
c.RWLayer = rwlayer
logger.WithFields(log.Fields{
"running": c.IsRunning(),
"paused": c.IsPaused(),
}).Debug("loaded container")
mapLock.Lock()
containers[c.ID] = c
if containers, ok := driverContainers[c.Driver]; !ok {
driverContainers[c.Driver] = map[string]*container.Container{
c.ID: c,
}
} else {
containers[c.ID] = c
}
mapLock.Unlock()
}(v.Name())
}
group.Wait()
return driverContainers, nil
}
func (daemon *Daemon) restore(cfg *configStore, containers map[string]*container.Container) error {
var mapLock sync.Mutex
log.G(context.TODO()).Info("Restoring containers: start.")
// parallelLimit is the maximum number of parallel startup jobs that we
// allow (this is the limited used for all startup semaphores). The multipler
// (128) was chosen after some fairly significant benchmarking -- don't change
// it unless you've tested it significantly (this value is adjusted if
// RLIMIT_NOFILE is small to avoid EMFILE).
parallelLimit := adjustParallelLimit(len(containers), 128*runtime.NumCPU())
// Re-used for all parallel startup jobs.
var group sync.WaitGroup
sem := semaphore.NewWeighted(int64(parallelLimit))
removeContainers := make(map[string]*container.Container)
restartContainers := make(map[*container.Container]chan struct{})
activeSandboxes := make(map[string]any)
@@ -274,6 +286,17 @@ func (daemon *Daemon) restore(cfg *configStore) error {
logger := log.G(context.TODO()).WithField("container", c.ID)
rwlayer, err := daemon.imageService.GetLayerByID(c.ID)
if err != nil {
logger.WithError(err).Error("failed to load container mount")
return
}
c.RWLayer = rwlayer
logger.WithFields(log.Fields{
"running": c.IsRunning(),
"paused": c.IsPaused(),
}).Debug("loaded container")
if err := daemon.registerName(c); err != nil {
logger.WithError(err).Errorf("failed to register container name: %s", c.Name)
mapLock.Lock()
@@ -523,7 +546,7 @@ func (daemon *Daemon) restore(cfg *configStore) error {
//
// Note that we cannot initialize the network controller earlier, as it
// needs to know if there's active sandboxes (running containers).
if err = daemon.initNetworkController(&cfg.Config, activeSandboxes); err != nil {
if err := daemon.initNetworkController(&cfg.Config, activeSandboxes); err != nil {
return fmt.Errorf("Error initializing network controller: %v", err)
}
@@ -833,10 +856,17 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
d.configStore.Store(cfgStore)
// TEST_INTEGRATION_USE_SNAPSHOTTER is used for integration tests only.
migrationThreshold := int64(-1)
if os.Getenv("TEST_INTEGRATION_USE_SNAPSHOTTER") != "" {
d.usesSnapshotter = true
} else {
d.usesSnapshotter = config.Features["containerd-snapshotter"]
log.G(ctx).WithField("features", config.Features).Debug("Checking features for migration")
if config.Features["containerd-migration"] {
// TODO: Allow setting the threshold
migrationThreshold = math.MaxInt64
} else {
d.usesSnapshotter = config.Features["containerd-snapshotter"]
}
}
// Ensure the daemon is properly shutdown if there is a failure during
@@ -1033,12 +1063,17 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
d.linkIndex = newLinkIndex()
containers, err := d.loadContainers()
if err != nil {
return nil, err
}
// On Windows we don't support the environment variable, or a user supplied graphdriver
// Unix platforms however run a single graphdriver for all containers, and it can
// be set through an environment variable, a daemon start parameter, or chosen through
// initialization of the layerstore through driver priority order for example.
driverName := os.Getenv("DOCKER_DRIVER")
if isWindows && d.UsesSnapshotter() {
if isWindows && d.usesSnapshotter {
// Containerd WCOW snapshotter
driverName = "windows"
} else if isWindows {
@@ -1050,33 +1085,8 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
driverName = cfgStore.GraphDriver
}
if d.UsesSnapshotter() {
if os.Getenv("TEST_INTEGRATION_USE_SNAPSHOTTER") != "" {
log.G(ctx).Warn("Enabling containerd snapshotter through the $TEST_INTEGRATION_USE_SNAPSHOTTER environment variable. This should only be used for testing.")
}
log.G(ctx).Info("Starting daemon with containerd snapshotter integration enabled")
// FIXME(thaJeztah): implement automatic snapshotter-selection similar to graph-driver selection; see https://github.com/moby/moby/issues/44076
if driverName == "" {
driverName = defaults.DefaultSnapshotter
}
// Configure and validate the kernels security support. Note this is a Linux/FreeBSD
// operation only, so it is safe to pass *just* the runtime OS graphdriver.
if err := configureKernelSecuritySupport(&cfgStore.Config, driverName); err != nil {
return nil, err
}
d.imageService = ctrd.NewService(ctrd.ImageServiceConfig{
Client: d.containerdClient,
Containers: d.containers,
Snapshotter: driverName,
RegistryHosts: d.RegistryHosts,
Registry: d.registryService,
EventsService: d.EventsService,
IDMapping: idMapping,
RefCountMounter: snapshotter.NewMounter(config.Root, driverName, idMapping),
})
} else {
var migrationConfig migration.Config
if !d.usesSnapshotter {
layerStore, err := layer.NewStoreFromOptions(layer.StoreOptions{
Root: cfgStore.Root,
GraphDriver: driverName,
@@ -1161,6 +1171,93 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
log.G(ctx).Debugf("Max Concurrent Downloads: %d", imgSvcConfig.MaxConcurrentDownloads)
log.G(ctx).Debugf("Max Concurrent Uploads: %d", imgSvcConfig.MaxConcurrentUploads)
log.G(ctx).Debugf("Max Download Attempts: %d", imgSvcConfig.MaxDownloadAttempts)
// If no containers are running, check whether can migrate image service
if drv := d.imageService.StorageDriver(); len(containers[drv]) == 0 && migrationThreshold >= 0 {
switch drv {
case "overlay2":
driverName = "overlayfs"
case "windowsfilter":
driverName = "windows"
migrationThreshold = 0
case "vfs":
driverName = "native"
default:
migrationThreshold = -1
log.G(ctx).Infof("Not migrating to containerd snapshotter, no migration defined for graph driver %q", drv)
}
var totalSize int64
ic := d.imageService.CountImages(ctx)
if migrationThreshold >= 0 && ic > 0 {
sum, err := d.imageService.Images(ctx, imagetypes.ListOptions{All: true})
if err != nil {
return nil, err
}
for _, s := range sum {
// Just add the size, don't consider shared size since this
// represents a maximum size
totalSize += s.Size
}
}
if totalSize <= migrationThreshold {
log.G(ctx).WithField("total", totalSize).Infof("Enabling containerd snapshotter because migration set with no containers and %d images in graph driver", ic)
d.usesSnapshotter = true
migrationConfig.LayerStore = imgSvcConfig.LayerStore
migrationConfig.DockerImageStore = imgSvcConfig.ImageStore
migrationConfig.ReferenceStore = imgSvcConfig.ReferenceStore
} else if migrationThreshold >= 0 {
log.G(ctx).Warnf("Not migrating to containerd snapshotter because still have %d images in graph driver", ic)
}
} else {
log.G(ctx).Debugf("Not attempting migration with %d containers and %d image threshold", len(containers[d.imageService.StorageDriver()]), migrationThreshold)
}
}
if d.usesSnapshotter {
if os.Getenv("TEST_INTEGRATION_USE_SNAPSHOTTER") != "" {
log.G(ctx).Warn("Enabling containerd snapshotter through the $TEST_INTEGRATION_USE_SNAPSHOTTER environment variable. This should only be used for testing.")
}
log.G(ctx).Info("Starting daemon with containerd snapshotter integration enabled")
// FIXME(thaJeztah): implement automatic snapshotter-selection similar to graph-driver selection; see https://github.com/moby/moby/issues/44076
if driverName == "" {
driverName = defaults.DefaultSnapshotter
}
// Configure and validate the kernels security support. Note this is a Linux/FreeBSD
// operation only, so it is safe to pass *just* the runtime OS graphdriver.
if err := configureKernelSecuritySupport(&cfgStore.Config, driverName); err != nil {
return nil, err
}
oldImageService := d.imageService
d.imageService = ctrd.NewService(ctrd.ImageServiceConfig{
Client: d.containerdClient,
Containers: d.containers,
Snapshotter: driverName,
RegistryHosts: d.RegistryHosts,
Registry: d.registryService,
EventsService: d.EventsService,
IDMapping: idMapping,
RefCountMounter: snapshotter.NewMounter(config.Root, driverName, idMapping),
})
if oldImageService != nil {
if count := oldImageService.CountImages(ctx); count > 0 {
migrationConfig.Leases = d.containerdClient.LeasesService()
migrationConfig.Content = d.containerdClient.ContentStore()
migrationConfig.ImageStore = d.containerdClient.ImageService()
m := migration.NewLayerMigrator(migrationConfig)
err := m.MigrateTocontainerd(ctx, driverName, d.containerdClient.SnapshotService(driverName))
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to migrate images to containerd, images in graph driver %q are no longer visible", oldImageService.StorageDriver())
} else {
log.G(ctx).WithField("image_count", count).Infof("Successfully migrated images from %q to containerd", oldImageService.StorageDriver())
}
}
}
}
go d.execCommandGC()
@@ -1169,9 +1266,22 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
return nil, err
}
if err := d.restore(cfgStore); err != nil {
driverContainers, ok := containers[driverName]
// Log containers which are not loaded with current driver
if (!ok && len(containers) > 0) || len(containers) > 1 {
for driver, all := range containers {
if driver == driverName {
continue
}
for id := range all {
log.G(ctx).WithField("container", id).Debugf("not restoring container because it was created with another storage driver (%s)", driver)
}
}
}
if err := d.restore(cfgStore, driverContainers); err != nil {
return nil, err
}
// Wait for migration to complete
close(d.startupDone)
info, err := d.SystemInfo(ctx)