mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
The monitorDaemon() goroutine calls startContainerd() then blocks on
<-daemonWaitCh to wait for it to exit. The startContainerd() function
would (re)initialize the daemonWaitCh so a restarted containerd could be
waited on. This implementation was race-free because startContainerd()
would synchronously initialize the daemonWaitCh before returning. When
the call to start the managed containerd process was moved into the
waiter goroutine, the code to initialize the daemonWaitCh struct field
was also moved into the goroutine. This introduced a race condition.
Move the daemonWaitCh initialization to guarantee that it happens before
the startContainerd() call returns.
Signed-off-by: Cory Snider <csnider@mirantis.com>
(cherry picked from commit dd20bf4862)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
350 lines
8.4 KiB
Go
350 lines
8.4 KiB
Go
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd"
|
|
"github.com/containerd/containerd/services/server/config"
|
|
"github.com/containerd/containerd/sys"
|
|
"github.com/docker/docker/pkg/pidfile"
|
|
"github.com/docker/docker/pkg/process"
|
|
"github.com/docker/docker/pkg/system"
|
|
"github.com/pelletier/go-toml"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
maxConnectionRetryCount = 3
|
|
healthCheckTimeout = 3 * time.Second
|
|
shutdownTimeout = 15 * time.Second
|
|
startupTimeout = 15 * time.Second
|
|
configFile = "containerd.toml"
|
|
binaryName = "containerd"
|
|
pidFile = "containerd.pid"
|
|
)
|
|
|
|
type remote struct {
|
|
config.Config
|
|
|
|
// configFile is the location where the generated containerd configuration
|
|
// file is saved.
|
|
configFile string
|
|
|
|
daemonPid int
|
|
pidFile string
|
|
logger *logrus.Entry
|
|
|
|
daemonWaitCh chan struct{}
|
|
daemonStartCh chan error
|
|
daemonStopCh chan struct{}
|
|
|
|
stateDir string
|
|
|
|
// oomScore adjusts the OOM score for the containerd process.
|
|
oomScore int
|
|
|
|
// logLevel overrides the containerd logging-level through the --log-level
|
|
// command-line option.
|
|
logLevel string
|
|
}
|
|
|
|
// Daemon represents a running containerd daemon
|
|
type Daemon interface {
|
|
WaitTimeout(time.Duration) error
|
|
Address() string
|
|
}
|
|
|
|
// DaemonOpt allows to configure parameters of container daemons
|
|
type DaemonOpt func(c *remote) error
|
|
|
|
// Start starts a containerd daemon and monitors it
|
|
func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Daemon, error) {
|
|
r := &remote{
|
|
stateDir: stateDir,
|
|
Config: config.Config{
|
|
Version: 2,
|
|
Root: filepath.Join(rootDir, "daemon"),
|
|
State: filepath.Join(stateDir, "daemon"),
|
|
},
|
|
configFile: filepath.Join(stateDir, configFile),
|
|
daemonPid: -1,
|
|
pidFile: filepath.Join(stateDir, pidFile),
|
|
logger: logrus.WithField("module", "libcontainerd"),
|
|
daemonStartCh: make(chan error, 1),
|
|
daemonStopCh: make(chan struct{}),
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
if err := opt(r); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
r.setDefaults()
|
|
|
|
if err := system.MkdirAll(stateDir, 0700); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go r.monitorDaemon(ctx)
|
|
|
|
timeout := time.NewTimer(startupTimeout)
|
|
defer timeout.Stop()
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return nil, errors.New("timeout waiting for containerd to start")
|
|
case err := <-r.daemonStartCh:
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
func (r *remote) WaitTimeout(d time.Duration) error {
|
|
timeout := time.NewTimer(d)
|
|
defer timeout.Stop()
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return errors.New("timeout waiting for containerd to stop")
|
|
case <-r.daemonStopCh:
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *remote) Address() string {
|
|
return r.GRPC.Address
|
|
}
|
|
|
|
func (r *remote) getContainerdConfig() (string, error) {
|
|
f, err := os.OpenFile(r.configFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
|
|
if err != nil {
|
|
return "", errors.Wrapf(err, "failed to open containerd config file (%s)", r.configFile)
|
|
}
|
|
defer f.Close()
|
|
|
|
if err := toml.NewEncoder(f).Encode(r); err != nil {
|
|
return "", errors.Wrapf(err, "failed to write containerd config file (%s)", r.configFile)
|
|
}
|
|
return r.configFile, nil
|
|
}
|
|
|
|
func (r *remote) startContainerd() error {
|
|
pid, err := pidfile.Read(r.pidFile)
|
|
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
|
return err
|
|
}
|
|
|
|
if pid > 0 {
|
|
r.daemonPid = pid
|
|
r.logger.WithField("pid", pid).Infof("%s is still running", binaryName)
|
|
return nil
|
|
}
|
|
|
|
cfgFile, err := r.getContainerdConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
args := []string{"--config", cfgFile}
|
|
|
|
if r.logLevel != "" {
|
|
args = append(args, "--log-level", r.logLevel)
|
|
}
|
|
|
|
cmd := exec.Command(binaryName, args...)
|
|
// redirect containerd logs to docker logs
|
|
cmd.Stdout = os.Stdout
|
|
cmd.Stderr = os.Stderr
|
|
cmd.SysProcAttr = containerdSysProcAttr()
|
|
// clear the NOTIFY_SOCKET from the env when starting containerd
|
|
cmd.Env = nil
|
|
for _, e := range os.Environ() {
|
|
if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
|
|
cmd.Env = append(cmd.Env, e)
|
|
}
|
|
}
|
|
|
|
startedCh := make(chan error)
|
|
go func() {
|
|
// On Linux, when cmd.SysProcAttr.Pdeathsig is set,
|
|
// the signal is sent to the subprocess when the creating thread
|
|
// terminates. The runtime terminates a thread if a goroutine
|
|
// exits while locked to it. Prevent the containerd process
|
|
// from getting killed prematurely by ensuring that the thread
|
|
// used to start it remains alive until it or the daemon process
|
|
// exits. See https://go.dev/issue/27505 for more details.
|
|
runtime.LockOSThread()
|
|
defer runtime.UnlockOSThread()
|
|
err := cmd.Start()
|
|
if err != nil {
|
|
startedCh <- err
|
|
return
|
|
}
|
|
r.daemonWaitCh = make(chan struct{})
|
|
startedCh <- nil
|
|
|
|
// Reap our child when needed
|
|
if err := cmd.Wait(); err != nil {
|
|
r.logger.WithError(err).Errorf("containerd did not exit successfully")
|
|
}
|
|
close(r.daemonWaitCh)
|
|
}()
|
|
if err := <-startedCh; err != nil {
|
|
return err
|
|
}
|
|
|
|
r.daemonPid = cmd.Process.Pid
|
|
|
|
if err := r.adjustOOMScore(); err != nil {
|
|
r.logger.WithError(err).Warn("failed to adjust OOM score")
|
|
}
|
|
|
|
err = pidfile.Write(r.pidFile, r.daemonPid)
|
|
if err != nil {
|
|
process.Kill(r.daemonPid)
|
|
return errors.Wrap(err, "libcontainerd: failed to save daemon pid to disk")
|
|
}
|
|
|
|
r.logger.WithField("pid", r.daemonPid).WithField("address", r.Address()).Infof("started new %s process", binaryName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *remote) adjustOOMScore() error {
|
|
if r.oomScore == 0 || r.daemonPid <= 1 {
|
|
// no score configured, or daemonPid contains an invalid PID (we don't
|
|
// expect containerd to be running as PID 1 :)).
|
|
return nil
|
|
}
|
|
if err := sys.SetOOMScore(r.daemonPid, r.oomScore); err != nil {
|
|
return errors.Wrap(err, "failed to adjust OOM score for containerd process")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *remote) monitorDaemon(ctx context.Context) {
|
|
var (
|
|
transientFailureCount = 0
|
|
client *containerd.Client
|
|
err error
|
|
delay time.Duration
|
|
timer = time.NewTimer(0)
|
|
started bool
|
|
)
|
|
|
|
defer func() {
|
|
if r.daemonPid != -1 {
|
|
r.stopDaemon()
|
|
}
|
|
|
|
// cleanup some files
|
|
_ = os.Remove(r.pidFile)
|
|
|
|
r.platformCleanup()
|
|
|
|
close(r.daemonStopCh)
|
|
timer.Stop()
|
|
}()
|
|
|
|
// ensure no races on sending to timer.C even though there is a 0 duration.
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
|
|
for {
|
|
timer.Reset(delay)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
r.logger.Info("stopping healthcheck following graceful shutdown")
|
|
if client != nil {
|
|
client.Close()
|
|
}
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
|
|
if r.daemonPid == -1 {
|
|
if r.daemonWaitCh != nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
r.logger.Info("stopping containerd startup following graceful shutdown")
|
|
return
|
|
case <-r.daemonWaitCh:
|
|
}
|
|
}
|
|
|
|
os.RemoveAll(r.GRPC.Address)
|
|
if err := r.startContainerd(); err != nil {
|
|
if !started {
|
|
r.daemonStartCh <- err
|
|
return
|
|
}
|
|
r.logger.WithError(err).Error("failed restarting containerd")
|
|
delay = 50 * time.Millisecond
|
|
continue
|
|
}
|
|
|
|
client, err = containerd.New(r.GRPC.Address, containerd.WithTimeout(60*time.Second))
|
|
if err != nil {
|
|
r.logger.WithError(err).Error("failed connecting to containerd")
|
|
delay = 100 * time.Millisecond
|
|
continue
|
|
}
|
|
r.logger.WithField("address", r.GRPC.Address).Debug("created containerd monitoring client")
|
|
}
|
|
|
|
if client != nil {
|
|
tctx, cancel := context.WithTimeout(ctx, healthCheckTimeout)
|
|
_, err := client.IsServing(tctx)
|
|
cancel()
|
|
if err == nil {
|
|
if !started {
|
|
close(r.daemonStartCh)
|
|
started = true
|
|
}
|
|
|
|
transientFailureCount = 0
|
|
|
|
select {
|
|
case <-r.daemonWaitCh:
|
|
case <-ctx.Done():
|
|
}
|
|
|
|
// Set a small delay in case there is a recurring failure (or bug in this code)
|
|
// to ensure we don't end up in a super tight loop.
|
|
delay = 500 * time.Millisecond
|
|
continue
|
|
}
|
|
|
|
r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
|
|
|
|
transientFailureCount++
|
|
if transientFailureCount < maxConnectionRetryCount || process.Alive(r.daemonPid) {
|
|
delay = time.Duration(transientFailureCount) * 200 * time.Millisecond
|
|
continue
|
|
}
|
|
client.Close()
|
|
client = nil
|
|
}
|
|
|
|
if process.Alive(r.daemonPid) {
|
|
r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
|
|
r.killDaemon()
|
|
}
|
|
|
|
r.daemonPid = -1
|
|
delay = 0
|
|
transientFailureCount = 0
|
|
}
|
|
}
|