mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
Merge pull request #50883 from thaJeztah/libc8d_cleanups
libcontainerd/remote: assorted cleanups
This commit is contained in:
@@ -149,16 +149,10 @@ func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spe
|
||||
|
||||
// NewTask creates a task for the specified containerd id
|
||||
func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
|
||||
var (
|
||||
checkpoint *types.Descriptor
|
||||
t containerd.Task
|
||||
rio cio.IO
|
||||
stdinCloseSync = make(chan containerd.Process, 1)
|
||||
)
|
||||
|
||||
ctx, span := otel.Tracer("").Start(ctx, "libcontainerd.remote.NewTask")
|
||||
defer span.End()
|
||||
|
||||
var checkpoint *types.Descriptor
|
||||
if checkpointDir != "" {
|
||||
// write checkpoint to the content store
|
||||
tar := archive.Diff(ctx, "", checkpointDir)
|
||||
@@ -167,8 +161,7 @@ func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin
|
||||
// remove the checkpoint when we're done
|
||||
defer func() {
|
||||
if checkpoint != nil {
|
||||
err := c.client.client.ContentStore().Delete(ctx, digest.Digest(checkpoint.Digest))
|
||||
if err != nil {
|
||||
if err := c.client.client.ContentStore().Delete(ctx, digest.Digest(checkpoint.Digest)); err != nil {
|
||||
c.client.logger.WithError(err).WithFields(log.Fields{
|
||||
"ref": checkpointDir,
|
||||
"digest": checkpoint.Digest,
|
||||
@@ -220,7 +213,9 @@ func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin
|
||||
taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level))
|
||||
}
|
||||
|
||||
t, err = c.c8dCtr.NewTask(ctx,
|
||||
var rio cio.IO
|
||||
stdinCloseSync := make(chan containerd.Process, 1)
|
||||
t, err := c.c8dCtr.NewTask(ctx,
|
||||
func(id string) (cio.IO, error) {
|
||||
fifos := newFIFOSet(bundle, id, withStdin, spec.Process.Terminal)
|
||||
|
||||
@@ -233,7 +228,7 @@ func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin
|
||||
close(stdinCloseSync)
|
||||
if rio != nil {
|
||||
rio.Cancel()
|
||||
rio.Close()
|
||||
_ = rio.Close()
|
||||
}
|
||||
return nil, pkgerrors.Wrap(wrapError(err), "failed to create task for container")
|
||||
}
|
||||
@@ -257,9 +252,8 @@ func (t *task) Start(ctx context.Context) error {
|
||||
// for the container main process, the stdin fifo will be created in Create not
|
||||
// the Start call. stdinCloseSync channel should be closed after Start exec
|
||||
// process.
|
||||
func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
|
||||
func (t *task) Exec(ctx context.Context, execID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Process, retErr error) {
|
||||
var (
|
||||
p containerd.Process
|
||||
rio cio.IO
|
||||
stdinCloseSync = make(chan containerd.Process, 1)
|
||||
)
|
||||
@@ -271,18 +265,17 @@ func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process,
|
||||
return nil, wrapError(err)
|
||||
}
|
||||
|
||||
fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
|
||||
fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], execID, withStdin, spec.Terminal)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if rio != nil {
|
||||
rio.Cancel()
|
||||
rio.Close()
|
||||
}
|
||||
if retErr != nil && rio != nil {
|
||||
rio.Cancel()
|
||||
_ = rio.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
|
||||
p, err := t.Task.Exec(ctx, execID, spec, func(id string) (cio.IO, error) {
|
||||
var err error
|
||||
rio, err = t.ctr.createIO(fifos, stdinCloseSync, attachStdio)
|
||||
return rio, err
|
||||
})
|
||||
@@ -299,13 +292,19 @@ func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process,
|
||||
// the stdin of exec process will be created after p.Start in containerd
|
||||
defer func() { stdinCloseSync <- p }()
|
||||
|
||||
if err = p.Start(ctx); err != nil {
|
||||
// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
|
||||
// we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in
|
||||
// older containerd-shim
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
|
||||
if err := p.Start(ctx); err != nil {
|
||||
// don't cancel cleanup if the context is cancelled, but add a timeout
|
||||
// to make sure we are not waiting forever if containerd is unresponsive
|
||||
// or to work around fifo cancelling issues in older containerd-shim.
|
||||
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 45*time.Second)
|
||||
defer cancel()
|
||||
p.Delete(ctx)
|
||||
if _, err := p.Delete(ctx); err != nil && !cerrdefs.IsNotFound(err) {
|
||||
log.G(ctx).WithFields(log.Fields{
|
||||
"error": err,
|
||||
"container": t.ID(),
|
||||
"execID": execID,
|
||||
}).Warn("Failed to delete exec process after failing to start")
|
||||
}
|
||||
return nil, wrapError(err)
|
||||
}
|
||||
return process{p}, nil
|
||||
@@ -494,22 +493,18 @@ func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) {
|
||||
// createIO creates the io to be used by a process
|
||||
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
|
||||
func (c *container) createIO(fifos *cio.FIFOSet, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
|
||||
var (
|
||||
io *cio.DirectIO
|
||||
err error
|
||||
)
|
||||
io, err = c.client.newDirectIO(context.Background(), fifos)
|
||||
dio, err := c.client.newDirectIO(context.Background(), fifos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if io.Stdin != nil {
|
||||
if dio.Stdin != nil {
|
||||
var (
|
||||
errs []error
|
||||
stdinOnce sync.Once
|
||||
)
|
||||
pipe := io.Stdin
|
||||
io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
|
||||
pipe := dio.Stdin
|
||||
dio.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
|
||||
stdinOnce.Do(func() {
|
||||
errs = append(errs, pipe.Close())
|
||||
|
||||
@@ -538,10 +533,10 @@ func (c *container) createIO(fifos *cio.FIFOSet, stdinCloseSync chan containerd.
|
||||
})
|
||||
}
|
||||
|
||||
rio, err := attachStdio(io)
|
||||
rio, err := attachStdio(dio)
|
||||
if err != nil {
|
||||
io.Cancel()
|
||||
io.Close()
|
||||
dio.Cancel()
|
||||
_ = dio.Close()
|
||||
}
|
||||
return rio, err
|
||||
}
|
||||
@@ -731,13 +726,11 @@ func (c *client) bundleDir(id string) string {
|
||||
}
|
||||
|
||||
func wrapError(err error) error {
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil
|
||||
case cerrdefs.IsNotFound(err):
|
||||
return errdefs.NotFound(err)
|
||||
if err == nil || cerrdefs.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(thaJeztah): don't depend on string-matching errors and remove wrapError; https://github.com/moby/moby/issues/50882
|
||||
msg := err.Error()
|
||||
for _, s := range []string{"container does not exist", "not found", "no such container"} {
|
||||
if strings.Contains(msg, s) {
|
||||
|
||||
Reference in New Issue
Block a user