Remove use of pools in archive

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-12-16 23:10:26 -08:00
parent a72026acbb
commit a93a079cb4
5 changed files with 28 additions and 40 deletions

View File

@@ -25,7 +25,6 @@ import (
"github.com/containerd/log"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/pools"
"github.com/klauspost/compress/zstd"
"github.com/moby/patternmatcher"
"github.com/moby/sys/sequential"
@@ -235,8 +234,7 @@ func (r *readCloserWrapper) Close() error {
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
p := pools.BufioReader32KPool
buf := p.Get(archive)
buf := bufio.NewReaderSize(archive, 32*1024)
bs, err := buf.Peek(10)
if err != nil && err != io.EOF {
// Note: we'll ignore any io.EOF error because there are some odd
@@ -258,7 +256,6 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
if readCloser, ok := r.(io.ReadCloser); ok {
readCloser.Close()
}
p.Put(buf)
return nil
},
}
@@ -300,18 +297,19 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
}
}
type nopWriteCloser struct {
io.Writer
}
func (nopWriteCloser) Close() error { return nil }
// CompressStream compresses the dest with specified compression algorithm.
func CompressStream(dest io.Writer, compression Compression) (io.WriteCloser, error) {
p := pools.BufioWriter32KPool
buf := p.Get(dest)
switch compression {
case Uncompressed:
writeBufWrapper := p.NewWriteCloserWrapper(buf, buf)
return writeBufWrapper, nil
return nopWriteCloser{dest}, nil
case Gzip:
gzWriter := gzip.NewWriter(dest)
writeBufWrapper := p.NewWriteCloserWrapper(buf, gzWriter)
return writeBufWrapper, nil
return gzip.NewWriter(dest), nil
case Bzip2, Xz:
// archive/bzip2 does not support writing, and there is no xz support at all
// However, this is not a problem as docker only currently generates gzipped tars
@@ -382,7 +380,7 @@ func ReplaceFileTarWrapper(inputTarStream io.ReadCloser, mods map[string]TarModi
pipeWriter.CloseWithError(err)
return
}
if _, err := pools.Copy(tarWriter, tarReader); err != nil {
if _, err := copyWithBuffer(tarWriter, tarReader); err != nil {
pipeWriter.CloseWithError(err)
return
}
@@ -529,7 +527,6 @@ type tarWhiteoutConverter interface {
type tarAppender struct {
TarWriter *tar.Writer
Buffer *bufio.Writer
// for hardlink mapping
SeenFiles map[uint64]string
@@ -547,7 +544,6 @@ func newTarAppender(idMapping idtools.IdentityMapping, writer io.Writer, chownOp
return &tarAppender{
SeenFiles: make(map[uint64]string),
TarWriter: tar.NewWriter(writer),
Buffer: pools.BufioWriter32KPool.Get(nil),
IdentityMapping: idMapping,
ChownOpts: chownOpts,
}
@@ -665,17 +661,11 @@ func (ta *tarAppender) addTarFile(path, name string) error {
return err
}
ta.Buffer.Reset(ta.TarWriter)
defer ta.Buffer.Reset(nil)
_, err = pools.Copy(ta.Buffer, file)
_, err = copyWithBuffer(ta.TarWriter, file)
file.Close()
if err != nil {
return err
}
err = ta.Buffer.Flush()
if err != nil {
return err
}
}
return nil
@@ -718,7 +708,7 @@ func createTarFile(path, extractDir string, hdr *tar.Header, reader io.Reader, o
if err != nil {
return err
}
if _, err := pools.Copy(file, reader); err != nil {
if _, err := copyWithBuffer(file, reader); err != nil {
file.Close()
return err
}
@@ -929,9 +919,6 @@ func (t *Tarballer) Do() {
}
}()
// this buffer is needed for the duration of this piped stream
defer pools.BufioWriter32KPool.Put(ta.Buffer)
// In general we log errors here but ignore them because
// during e.g. a diff operation the container can continue
// mutating the filesystem and we can see transient errors
@@ -1087,8 +1074,6 @@ func (t *Tarballer) Do() {
// Unpack unpacks the decompressedArchive to dest with options.
func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error {
tr := tar.NewReader(decompressedArchive)
trBuf := pools.BufioReader32KPool.Get(nil)
defer pools.BufioReader32KPool.Put(trBuf)
var dirs []*tar.Header
whiteoutConverter := getWhiteoutConverter(options.WhiteoutFormat)
@@ -1165,7 +1150,6 @@ loop:
}
}
}
trBuf.Reset(tr)
if err := remapIDs(options.IDMap, hdr); err != nil {
return err
@@ -1181,7 +1165,7 @@ loop:
}
}
if err := createTarFile(path, dest, hdr, trBuf, options); err != nil {
if err := createTarFile(path, dest, hdr, tr, options); err != nil {
return err
}
@@ -1384,7 +1368,7 @@ func (archiver *Archiver) CopyFileWithTar(src, dst string) (err error) {
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if _, err := pools.Copy(tw, srcF); err != nil {
if _, err := copyWithBuffer(tw, srcF); err != nil {
return err
}
return nil

View File

@@ -693,7 +693,7 @@ func tarUntar(t *testing.T, origin string, options *TarOptions) ([]Change, error
defer archive.Close()
buf := make([]byte, 10)
if _, err := archive.Read(buf); err != nil {
if _, err := io.ReadFull(archive, buf); err != nil {
return nil, err
}
wrap := io.MultiReader(bytes.NewReader(buf), archive)

View File

@@ -15,7 +15,6 @@ import (
"github.com/containerd/log"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/pools"
)
// ChangeType represents the change type.
@@ -389,9 +388,6 @@ func ExportChanges(dir string, changes []Change, idMap idtools.IdentityMapping)
go func() {
ta := newTarAppender(idMap, writer, nil)
// this buffer is needed for the duration of this piped stream
defer pools.BufioWriter32KPool.Put(ta.Buffer)
sort.Sort(changesByPath(changes))
// In general we log errors here but ignore them because

View File

@@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"github.com/containerd/log"
)
@@ -20,6 +21,17 @@ var (
ErrInvalidCopySource = errors.New("invalid copy source content")
)
var copyPool = sync.Pool{
New: func() interface{} { s := make([]byte, 32*1024); return &s },
}
func copyWithBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
buf := copyPool.Get().(*[]byte)
written, err = io.CopyBuffer(dst, src, *buf)
copyPool.Put(buf)
return
}
// PreserveTrailingDotOrSeparator returns the given cleaned path (after
// processing using any utility functions from the path or filepath stdlib
// packages) and appends a trailing `/.` or `/` if its corresponding original

View File

@@ -11,7 +11,6 @@ import (
"strings"
"github.com/containerd/log"
"github.com/docker/docker/pkg/pools"
)
// UnpackLayer unpack `layer` to a `dest`. The stream `layer` can be
@@ -19,8 +18,6 @@ import (
// Returns the size in bytes of the contents of the layer.
func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64, err error) {
tr := tar.NewReader(layer)
trBuf := pools.BufioReader32KPool.Get(tr)
defer pools.BufioReader32KPool.Put(trBuf)
var dirs []*tar.Header
unpackedPaths := make(map[string]struct{})
@@ -159,8 +156,7 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64,
}
}
trBuf.Reset(tr)
srcData := io.Reader(trBuf)
srcData := io.Reader(tr)
srcHdr := hdr
// Hard links into /.wh..wh.plnk don't work, as we don't extract that directory, so