diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 963f865bd1..f6df75329a 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -15,14 +15,15 @@ import ( "os/exec" "path/filepath" "runtime" + "runtime/debug" "strconv" "strings" + "sync/atomic" "syscall" "time" "github.com/containerd/log" "github.com/docker/docker/pkg/idtools" - "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/system" "github.com/klauspost/compress/zstd" @@ -215,11 +216,22 @@ func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) { return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf) } -func wrapReadCloser(readBuf io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { - return ioutils.NewReadCloserWrapper(readBuf, func() error { - cancel() - return readBuf.Close() - }) +type readCloserWrapper struct { + io.Reader + closer func() error + closed atomic.Bool +} + +func (r *readCloserWrapper) Close() error { + if !r.closed.CompareAndSwap(false, true) { + log.G(context.TODO()).Error("subsequent attempt to close readCloserWrapper") + if log.GetLevel() >= log.DebugLevel { + log.G(context.TODO()).Errorf("stack trace: %s", string(debug.Stack())) + } + + return nil + } + return r.closer() } // DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive. @@ -237,11 +249,26 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) { return nil, err } + wrapReader := func(r io.Reader, cancel context.CancelFunc) io.ReadCloser { + return &readCloserWrapper{ + Reader: r, + closer: func() error { + if cancel != nil { + cancel() + } + if readCloser, ok := r.(io.ReadCloser); ok { + readCloser.Close() + } + p.Put(buf) + return nil + }, + } + } + compression := DetectCompression(bs) switch compression { case Uncompressed: - readBufWrapper := p.NewReadCloserWrapper(buf, buf) - return readBufWrapper, nil + return wrapReader(buf, nil), nil case Gzip: ctx, cancel := context.WithCancel(context.Background()) @@ -250,12 +277,10 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) { cancel() return nil, err } - readBufWrapper := p.NewReadCloserWrapper(buf, gzReader) - return wrapReadCloser(readBufWrapper, cancel), nil + return wrapReader(gzReader, cancel), nil case Bzip2: bz2Reader := bzip2.NewReader(buf) - readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader) - return readBufWrapper, nil + return wrapReader(bz2Reader, nil), nil case Xz: ctx, cancel := context.WithCancel(context.Background()) @@ -264,15 +289,13 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) { cancel() return nil, err } - readBufWrapper := p.NewReadCloserWrapper(buf, xzReader) - return wrapReadCloser(readBufWrapper, cancel), nil + return wrapReader(xzReader, cancel), nil case Zstd: zstdReader, err := zstd.NewReader(buf) if err != nil { return nil, err } - readBufWrapper := p.NewReadCloserWrapper(buf, zstdReader) - return readBufWrapper, nil + return wrapReader(zstdReader, nil), nil default: return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension()) } @@ -1424,11 +1447,14 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) { close(done) }() - return ioutils.NewReadCloserWrapper(pipeR, func() error { - // Close pipeR, and then wait for the command to complete before returning. We have to close pipeR first, as - // cmd.Wait waits for any non-file stdout/stderr/stdin to close. - err := pipeR.Close() - <-done - return err - }), nil + return &readCloserWrapper{ + Reader: pipeR, + closer: func() error { + // Close pipeR, and then wait for the command to complete before returning. We have to close pipeR first, as + // cmd.Wait waits for any non-file stdout/stderr/stdin to close. + err := pipeR.Close() + <-done + return err + }, + }, nil } diff --git a/pkg/archive/archive_test.go b/pkg/archive/archive_test.go index e11b9ea507..98198b07bb 100644 --- a/pkg/archive/archive_test.go +++ b/pkg/archive/archive_test.go @@ -17,7 +17,6 @@ import ( "time" "github.com/docker/docker/pkg/idtools" - "github.com/docker/docker/pkg/ioutils" "github.com/moby/sys/userns" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" @@ -1443,30 +1442,27 @@ func TestDisablePigz(t *testing.T) { t.Setenv("MOBY_DISABLE_PIGZ", "true") r := testDecompressStream(t, "gz", "gzip -f") - // For the bufio pool - outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) - // For the context canceller - contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) - assert.Equal(t, reflect.TypeOf(contextReaderCloserWrapper.Reader), reflect.TypeOf(&gzip.Reader{})) + // wrapped in closer to cancel contex and release buffer to pool + wrapper := r.(*readCloserWrapper) + + assert.Equal(t, reflect.TypeOf(wrapper.Reader), reflect.TypeOf(&gzip.Reader{})) } func TestPigz(t *testing.T) { r := testDecompressStream(t, "gz", "gzip -f") - // For the bufio pool - outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) - // For the context canceller - contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) + // wrapper for buffered reader and context cancel + wrapper := r.(*readCloserWrapper) _, err := exec.LookPath("unpigz") if err == nil { t.Log("Tested whether Pigz is used, as it installed") // For the command wait wrapper - cmdWaitCloserWrapper := contextReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) + cmdWaitCloserWrapper := wrapper.Reader.(*readCloserWrapper) assert.Equal(t, reflect.TypeOf(cmdWaitCloserWrapper.Reader), reflect.TypeOf(&io.PipeReader{})) } else { t.Log("Tested whether Pigz is not used, as it not installed") - assert.Equal(t, reflect.TypeOf(contextReaderCloserWrapper.Reader), reflect.TypeOf(&gzip.Reader{})) + assert.Equal(t, reflect.TypeOf(wrapper.Reader), reflect.TypeOf(&gzip.Reader{})) } } diff --git a/pkg/archive/diff_test.go b/pkg/archive/diff_test.go index 5eae0def28..dc9b79013f 100644 --- a/pkg/archive/diff_test.go +++ b/pkg/archive/diff_test.go @@ -7,8 +7,6 @@ import ( "path/filepath" "reflect" "testing" - - "github.com/docker/docker/pkg/ioutils" ) func TestApplyLayerInvalidFilenames(t *testing.T) { @@ -337,11 +335,14 @@ func makeTestLayer(paths []string) (rc io.ReadCloser, err error) { if err != nil { return } - return ioutils.NewReadCloserWrapper(archive, func() error { - err := archive.Close() - os.RemoveAll(tmpDir) - return err - }), nil + return &readCloserWrapper{ + Reader: archive, + closer: func() error { + err := archive.Close() + os.RemoveAll(tmpDir) + return err + }, + }, nil } func readDirContents(root string) ([]string, error) {