tarexport: Plumb ctx, add OTEL spans, handle cancellation

Pass `context.Context` through `tarexport.Load` and `tarexport.Save`.
Create OTEL spans for the most time consuming operations.

Also, handle context cancellations to actually end saving/loading when
the operation is cancelled - before this PR the daemon would still be
performing the operation even though the user already cancelled it.

Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
This commit is contained in:
Paweł Gronowski
2024-03-25 14:31:57 +01:00
parent 4554d871d7
commit ad0f263eb5
6 changed files with 195 additions and 21 deletions

View File

@@ -16,7 +16,7 @@ import (
// outStream is the writer which the images are written to.
func (i *ImageService) ExportImage(ctx context.Context, names []string, outStream io.Writer) error {
imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i)
return imageExporter.Save(names, outStream)
return imageExporter.Save(ctx, names, outStream)
}
func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Container, fn func(root string) error) error {
@@ -46,5 +46,5 @@ func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Conta
// ball containing images and metadata.
func (i *ImageService) LoadImage(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) error {
imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i)
return imageExporter.Load(inTar, outStream, quiet)
return imageExporter.Load(ctx, inTar, outStream, quiet)
}

View File

@@ -1,6 +1,7 @@
package image // import "github.com/docker/docker/image"
import (
"context"
"encoding/json"
"errors"
"io"
@@ -279,9 +280,9 @@ func NewHistory(author, comment, createdBy string, isEmptyLayer bool) History {
// Exporter provides interface for loading and saving images
type Exporter interface {
Load(io.ReadCloser, io.Writer, bool) error
Load(context.Context, io.ReadCloser, io.Writer, bool) error
// TODO: Load(net.Context, io.ReadCloser, <- chan StatusMessage) error
Save([]string, io.Writer) error
Save(context.Context, []string, io.Writer) error
}
// NewFromJSON creates an Image configuration from json.

View File

@@ -11,12 +11,14 @@ import (
"reflect"
"runtime"
"github.com/containerd/containerd/tracing"
"github.com/containerd/log"
"github.com/distribution/reference"
"github.com/docker/distribution"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/image"
v1 "github.com/docker/docker/image/v1"
"github.com/docker/docker/internal/ioutils"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/chrootarchive"
@@ -28,7 +30,13 @@ import (
"github.com/opencontainers/go-digest"
)
func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) error {
func (l *tarexporter) Load(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) (outErr error) {
ctx, span := tracing.StartSpan(ctx, "tarexport.Load")
defer span.End()
defer func() {
span.SetStatus(outErr)
}()
var progressOutput progress.Output
if !quiet {
progressOutput = streamformatter.NewJSONProgressOutput(outStream, false)
@@ -41,9 +49,10 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
}
defer os.RemoveAll(tmpDir)
if err := chrootarchive.Untar(inTar, tmpDir, nil); err != nil {
if err := untar(ctx, inTar, tmpDir); err != nil {
return err
}
// read manifest, if no file then load in legacy mode
manifestPath, err := safePath(tmpDir, manifestFileName)
if err != nil {
@@ -72,6 +81,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
var imageRefCount int
for _, m := range manifest {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
configPath, err := safePath(tmpDir, m.Config)
if err != nil {
return err
@@ -95,6 +109,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
}
for i, diffID := range img.RootFS.DiffIDs {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
layerPath, err := safePath(tmpDir, m.Layers[i])
if err != nil {
return err
@@ -103,7 +122,7 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
r.Append(diffID)
newLayer, err := l.lss.Get(r.ChainID())
if err != nil {
newLayer, err = l.loadLayer(layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput)
newLayer, err = l.loadLayer(ctx, layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput)
if err != nil {
return err
}
@@ -155,6 +174,15 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
return nil
}
func untar(ctx context.Context, inTar io.ReadCloser, tmpDir string) error {
_, trace := tracing.StartSpan(ctx, "chrootarchive.Untar")
defer trace.End()
err := chrootarchive.Untar(ioutils.NewCtxReader(ctx, inTar), tmpDir, nil)
trace.SetStatus(err)
return err
}
func (l *tarexporter) setParentID(id, parentID image.ID) error {
img, err := l.is.Get(id)
if err != nil {
@@ -170,7 +198,14 @@ func (l *tarexporter) setParentID(id, parentID image.ID) error {
return l.is.SetParent(id, parentID)
}
func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (layer.Layer, error) {
func (l *tarexporter) loadLayer(ctx context.Context, filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (_ layer.Layer, outErr error) {
ctx, span := tracing.StartSpan(ctx, "loadLayer")
span.SetAttributes(tracing.Attribute("image.id", id))
defer span.End()
defer func() {
span.SetStatus(outErr)
}()
// We use sequential file access to avoid depleting the standby list on Windows.
// On Linux, this equates to a regular os.Open.
rawTar, err := sequential.Open(filename)
@@ -193,7 +228,7 @@ func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string,
r = rawTar
}
inflatedLayerData, err := archive.DecompressStream(r)
inflatedLayerData, err := archive.DecompressStream(ioutils.NewCtxReader(ctx, r))
if err != nil {
return nil, err
}
@@ -332,7 +367,7 @@ func (l *tarexporter) legacyLoadImage(oldID, sourceDir string, loadedMap map[str
if err != nil {
return err
}
newLayer, err := l.loadLayer(layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput)
newLayer, err := l.loadLayer(context.TODO(), layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput)
if err != nil {
return err
}

View File

@@ -11,12 +11,14 @@ import (
"time"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/tracing"
"github.com/containerd/log"
"github.com/distribution/reference"
"github.com/docker/distribution"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/image"
v1 "github.com/docker/docker/image/v1"
"github.com/docker/docker/internal/ioutils"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/system"
@@ -42,20 +44,20 @@ type saveSession struct {
savedConfigs map[string]struct{}
}
func (l *tarexporter) Save(names []string, outStream io.Writer) error {
images, err := l.parseNames(names)
func (l *tarexporter) Save(ctx context.Context, names []string, outStream io.Writer) error {
images, err := l.parseNames(ctx, names)
if err != nil {
return err
}
// Release all the image top layer references
defer l.releaseLayerReferences(images)
return (&saveSession{tarexporter: l, images: images}).save(outStream)
return (&saveSession{tarexporter: l, images: images}).save(ctx, outStream)
}
// parseNames will parse the image names to a map which contains image.ID to *imageDescriptor.
// Each imageDescriptor holds an image top layer reference named 'layerRef'. It is taken here, should be released later.
func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescriptor, rErr error) {
func (l *tarexporter) parseNames(ctx context.Context, names []string) (desc map[image.ID]*imageDescriptor, rErr error) {
imgDescr := make(map[image.ID]*imageDescriptor)
defer func() {
if rErr != nil {
@@ -92,6 +94,12 @@ func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescri
}
for _, name := range names {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ref, err := reference.ParseAnyReference(name)
if err != nil {
return nil, err
@@ -179,7 +187,7 @@ func (l *tarexporter) releaseLayerReferences(imgDescr map[image.ID]*imageDescrip
return nil
}
func (s *saveSession) save(outStream io.Writer) error {
func (s *saveSession) save(ctx context.Context, outStream io.Writer) error {
s.savedConfigs = make(map[string]struct{})
s.savedLayers = make(map[layer.DiffID]distribution.Descriptor)
@@ -199,7 +207,13 @@ func (s *saveSession) save(outStream io.Writer) error {
var manifestDescriptors []ocispec.Descriptor
for id, imageDescr := range s.images {
foreignSrcs, err := s.saveImage(id)
select {
case <-ctx.Done():
return ctx.Err()
default:
}
foreignSrcs, err := s.saveImage(ctx, id)
if err != nil {
return err
}
@@ -370,17 +384,34 @@ func (s *saveSession) save(outStream io.Writer) error {
return errors.Wrap(err, "error writing oci index file")
}
return s.writeTar(ctx, tempDir, outStream)
}
func (s *saveSession) writeTar(ctx context.Context, tempDir string, outStream io.Writer) error {
ctx, span := tracing.StartSpan(ctx, "writeTar")
defer span.End()
fs, err := archive.Tar(tempDir, archive.Uncompressed)
if err != nil {
span.SetStatus(err)
return err
}
defer fs.Close()
_, err = io.Copy(outStream, fs)
_, err = ioutils.CopyCtx(ctx, outStream, fs)
span.SetStatus(err)
return err
}
func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Descriptor, error) {
func (s *saveSession) saveImage(ctx context.Context, id image.ID) (_ map[layer.DiffID]distribution.Descriptor, outErr error) {
ctx, span := tracing.StartSpan(ctx, "saveImage")
span.SetAttributes(tracing.Attribute("image.id", id.String()))
defer span.End()
defer func() {
span.SetStatus(outErr)
}()
img := s.images[id].image
if len(img.RootFS.DiffIDs) == 0 {
return nil, fmt.Errorf("empty export - not implemented")
@@ -390,6 +421,11 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc
var layers []layer.DiffID
var foreignSrcs map[layer.DiffID]distribution.Descriptor
for i, diffID := range img.RootFS.DiffIDs {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
v1ImgCreated := time.Unix(0, 0)
v1Img := image.V1Image{
// This is for backward compatibility used for
@@ -412,7 +448,7 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc
}
v1Img.OS = img.OS
src, err := s.saveConfigAndLayer(rootFS.ChainID(), v1Img, img.Created)
src, err := s.saveConfigAndLayer(ctx, rootFS.ChainID(), v1Img, img.Created)
if err != nil {
return nil, err
}
@@ -457,7 +493,17 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc
return foreignSrcs, nil
}
func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (distribution.Descriptor, error) {
func (s *saveSession) saveConfigAndLayer(ctx context.Context, id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (_ distribution.Descriptor, outErr error) {
ctx, span := tracing.StartSpan(ctx, "saveConfigAndLayer")
span.SetAttributes(
tracing.Attribute("layer.id", id.String()),
tracing.Attribute("image.id", legacyImg.ID),
)
defer span.End()
defer func() {
span.SetStatus(outErr)
}()
outDir := filepath.Join(s.outDir, ocispec.ImageBlobsDir)
if _, ok := s.savedConfigs[legacyImg.ID]; !ok {
@@ -512,7 +558,7 @@ func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Ima
digester := digest.Canonical.Digester()
digestedArch := io.TeeReader(arch, digester.Hash())
tarSize, err := io.Copy(tarFile, digestedArch)
tarSize, err := ioutils.CopyCtx(ctx, tarFile, digestedArch)
if err != nil {
return distribution.Descriptor{}, err
}

57
internal/ioutils/copy.go Normal file
View File

@@ -0,0 +1,57 @@
package ioutils
import (
"context"
"io"
)
// CopyCtx copies from src to dst until either EOF is reached on src or a context is cancelled.
// The writer is not closed when the context is cancelled.
//
// After CopyCtx exits due to context cancellation, the goroutine that performed
// the copy may still be running if either the reader or writer blocks.
func CopyCtx(ctx context.Context, dst io.Writer, src io.Reader) (n int64, err error) {
copyDone := make(chan struct{})
src = &readerCtx{ctx: ctx, r: src}
go func() {
n, err = io.Copy(dst, src)
close(copyDone)
}()
select {
case <-ctx.Done():
return -1, ctx.Err()
case <-copyDone:
}
return n, err
}
type readerCtx struct {
ctx context.Context
r io.Reader
}
// NewCtxReader wraps the given reader with a reader that doesn't proceed with
// reading if the context is done.
//
// Note: Read will still block if the underlying reader blocks.
func NewCtxReader(ctx context.Context, r io.Reader) io.Reader {
return &readerCtx{ctx: ctx, r: r}
}
func (r *readerCtx) Read(p []byte) (n int, err error) {
if err := r.ctx.Err(); err != nil {
return 0, err
}
n, outErr := r.r.Read(p)
if err := r.ctx.Err(); err != nil {
return 0, err
}
return n, outErr
}

View File

@@ -0,0 +1,35 @@
package ioutils
import (
"bytes"
"context"
"testing"
"time"
)
type blockingReader struct{}
func (r blockingReader) Read(p []byte) (int, error) {
time.Sleep(time.Second)
return 0, nil
}
func TestCopyCtx(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*5)
defer cancel()
dst := new(bytes.Buffer)
finished := make(chan struct{})
go func() {
CopyCtx(ctx, dst, blockingReader{})
close(finished)
}()
select {
case <-finished:
case <-time.After(time.Millisecond * 100):
t.Fatal("CopyCtx did not return after context was cancelled")
}
}