pkg/ioutils: move BytesPipe to container/streams/bytespipe

These types are only used internally in container/streams and have no
external consumers. move them to a subpackage of container/streams.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn
2025-01-09 16:20:22 +01:00
parent 00d1b92d0a
commit 15ce2914a5
6 changed files with 15 additions and 26 deletions

View File

@@ -1,51 +0,0 @@
package ioutils // import "github.com/docker/docker/pkg/ioutils"
import (
"errors"
"io"
)
var errBufferFull = errors.New("buffer is full")
type fixedBuffer struct {
buf []byte
pos int
lastRead int
}
func (b *fixedBuffer) Write(p []byte) (int, error) {
n := copy(b.buf[b.pos:cap(b.buf)], p)
b.pos += n
if n < len(p) {
if b.pos == cap(b.buf) {
return n, errBufferFull
}
return n, io.ErrShortWrite
}
return n, nil
}
func (b *fixedBuffer) Read(p []byte) (int, error) {
n := copy(p, b.buf[b.lastRead:b.pos])
b.lastRead += n
return n, nil
}
func (b *fixedBuffer) Len() int {
return b.pos - b.lastRead
}
func (b *fixedBuffer) Cap() int {
return cap(b.buf)
}
func (b *fixedBuffer) Reset() {
b.pos = 0
b.lastRead = 0
b.buf = b.buf[:0]
}
func (b *fixedBuffer) String() string {
return string(b.buf[b.lastRead:b.pos])
}

View File

@@ -1,153 +0,0 @@
package ioutils // import "github.com/docker/docker/pkg/ioutils"
import (
"bytes"
"testing"
)
func TestFixedBufferCap(t *testing.T) {
buf := &fixedBuffer{buf: make([]byte, 0, 5)}
n := buf.Cap()
if n != 5 {
t.Fatalf("expected buffer capacity to be 5 bytes, got %d", n)
}
}
func TestFixedBufferLen(t *testing.T) {
buf := &fixedBuffer{buf: make([]byte, 0, 10)}
_, _ = buf.Write([]byte("hello"))
l := buf.Len()
if l != 5 {
t.Fatalf("expected buffer length to be 5 bytes, got %d", l)
}
buf.Write([]byte("world"))
l = buf.Len()
if l != 10 {
t.Fatalf("expected buffer length to be 10 bytes, got %d", l)
}
// read 5 bytes
b := make([]byte, 5)
_, _ = buf.Read(b)
l = buf.Len()
if l != 5 {
t.Fatalf("expected buffer length to be 5 bytes, got %d", l)
}
n, err := buf.Write([]byte("i-wont-fit"))
if n != 0 {
t.Fatalf("expected no bytes to be written to buffer, got %d", n)
}
if err != errBufferFull {
t.Fatalf("expected errBufferFull, got %v", err)
}
l = buf.Len()
if l != 5 {
t.Fatalf("expected buffer length to still be 5 bytes, got %d", l)
}
buf.Reset()
l = buf.Len()
if l != 0 {
t.Fatalf("expected buffer length to still be 0 bytes, got %d", l)
}
}
func TestFixedBufferString(t *testing.T) {
buf := &fixedBuffer{buf: make([]byte, 0, 10)}
_, _ = buf.Write([]byte("hello"))
_, _ = buf.Write([]byte("world"))
out := buf.String()
if out != "helloworld" {
t.Fatalf(`expected output to be "helloworld", got %q`, out)
}
// read 5 bytes
b := make([]byte, 5)
_, _ = buf.Read(b)
// test that fixedBuffer.String() only returns the part that hasn't been read
out = buf.String()
if out != "world" {
t.Fatalf(`expected output to be "world", got %q`, out)
}
}
func TestFixedBufferWrite(t *testing.T) {
buf := &fixedBuffer{buf: make([]byte, 0, 64)}
n, err := buf.Write([]byte("hello"))
if err != nil {
t.Fatal(err)
}
if n != 5 {
t.Fatalf("expected 5 bytes written, got %d", n)
}
if string(buf.buf[:5]) != "hello" {
t.Fatalf(`expected "hello", got %q`, string(buf.buf[:5]))
}
n, err = buf.Write(bytes.Repeat([]byte{1}, 64))
if n != 59 {
t.Fatalf("expected 59 bytes written before buffer is full, got %d", n)
}
if err != errBufferFull {
t.Fatalf("expected errBufferFull, got %v - %v", err, buf.buf[:64])
}
}
func TestFixedBufferRead(t *testing.T) {
buf := &fixedBuffer{buf: make([]byte, 0, 64)}
if _, err := buf.Write([]byte("hello world")); err != nil {
t.Fatal(err)
}
b := make([]byte, 5)
n, err := buf.Read(b)
if err != nil {
t.Fatal(err)
}
if n != 5 {
t.Fatalf("expected 5 bytes read, got %d - %s", n, buf.String())
}
if string(b) != "hello" {
t.Fatalf(`expected "hello", got %q`, string(b))
}
n, err = buf.Read(b)
if err != nil {
t.Fatal(err)
}
if n != 5 {
t.Fatalf("expected 5 bytes read, got %d", n)
}
if string(b) != " worl" {
t.Fatalf(`expected " worl", got %s`, string(b))
}
b = b[:1]
n, err = buf.Read(b)
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatalf("expected 1 byte read, got %d - %s", n, buf.String())
}
if string(b) != "d" {
t.Fatalf(`expected "d", got %s`, string(b))
}
}

View File

@@ -1,193 +0,0 @@
package ioutils // import "github.com/docker/docker/pkg/ioutils"
import (
"errors"
"io"
"sync"
)
// maxCap is the highest capacity to use in byte slices that buffer data.
const maxCap = 1e6
// minCap is the lowest capacity to use in byte slices that buffer data
const minCap = 64
// blockThreshold is the minimum number of bytes in the buffer which will cause
// a write to BytesPipe to block when allocating a new slice.
const blockThreshold = 1e6
var (
// ErrClosed is returned when Write is called on a closed BytesPipe.
//
// Deprecated: this type is only used internally, and will be removed in the next release.
ErrClosed = errors.New("write to closed BytesPipe")
bufPools = make(map[int]*sync.Pool)
bufPoolsLock sync.Mutex
)
// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
// All written data may be read at most once. Also, BytesPipe allocates
// and releases new byte slices to adjust to current needs, so the buffer
// won't be overgrown after peak loads.
//
// Deprecated: this type is only used internally, and will be removed in the next release.
type BytesPipe struct {
mu sync.Mutex
wait *sync.Cond
buf []*fixedBuffer
bufLen int
closeErr error // error to return from next Read. set to nil if not closed.
readBlock bool // check read BytesPipe is Wait() or not
}
// NewBytesPipe creates new BytesPipe, initialized by specified slice.
// If buf is nil, then it will be initialized with slice which cap is 64.
// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
//
// Deprecated: this function is only used internally, and will be removed in the next release.
func NewBytesPipe() *BytesPipe {
bp := &BytesPipe{}
bp.buf = append(bp.buf, getBuffer(minCap))
bp.wait = sync.NewCond(&bp.mu)
return bp
}
// Write writes p to BytesPipe.
// It can allocate new []byte slices in a process of writing.
func (bp *BytesPipe) Write(p []byte) (int, error) {
bp.mu.Lock()
defer bp.mu.Unlock()
written := 0
loop0:
for {
if bp.closeErr != nil {
return written, ErrClosed
}
if len(bp.buf) == 0 {
bp.buf = append(bp.buf, getBuffer(64))
}
// get the last buffer
b := bp.buf[len(bp.buf)-1]
n, err := b.Write(p)
written += n
bp.bufLen += n
// errBufferFull is an error we expect to get if the buffer is full
if err != nil && err != errBufferFull {
bp.wait.Broadcast()
return written, err
}
// if there was enough room to write all then break
if len(p) == n {
break
}
// more data: write to the next slice
p = p[n:]
// make sure the buffer doesn't grow too big from this write
for bp.bufLen >= blockThreshold {
if bp.readBlock {
bp.wait.Broadcast()
}
bp.wait.Wait()
if bp.closeErr != nil {
continue loop0
}
}
// add new byte slice to the buffers slice and continue writing
nextCap := b.Cap() * 2
if nextCap > maxCap {
nextCap = maxCap
}
bp.buf = append(bp.buf, getBuffer(nextCap))
}
bp.wait.Broadcast()
return written, nil
}
// CloseWithError causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) CloseWithError(err error) error {
bp.mu.Lock()
if err != nil {
bp.closeErr = err
} else {
bp.closeErr = io.EOF
}
bp.wait.Broadcast()
bp.mu.Unlock()
return nil
}
// Close causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) Close() error {
return bp.CloseWithError(nil)
}
// Read reads bytes from BytesPipe.
// Data could be read only once.
func (bp *BytesPipe) Read(p []byte) (n int, err error) {
bp.mu.Lock()
defer bp.mu.Unlock()
if bp.bufLen == 0 {
if bp.closeErr != nil {
return 0, bp.closeErr
}
bp.readBlock = true
bp.wait.Wait()
bp.readBlock = false
if bp.bufLen == 0 && bp.closeErr != nil {
return 0, bp.closeErr
}
}
for bp.bufLen > 0 {
b := bp.buf[0]
read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error
n += read
bp.bufLen -= read
if b.Len() == 0 {
// it's empty so return it to the pool and move to the next one
returnBuffer(b)
bp.buf[0] = nil
bp.buf = bp.buf[1:]
}
if len(p) == read {
break
}
p = p[read:]
}
bp.wait.Broadcast()
return
}
func returnBuffer(b *fixedBuffer) {
b.Reset()
bufPoolsLock.Lock()
pool := bufPools[b.Cap()]
bufPoolsLock.Unlock()
if pool != nil {
pool.Put(b)
}
}
func getBuffer(size int) *fixedBuffer {
bufPoolsLock.Lock()
pool, ok := bufPools[size]
if !ok {
pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }}
bufPools[size] = pool
}
bufPoolsLock.Unlock()
return pool.Get().(*fixedBuffer)
}

View File

@@ -1,219 +0,0 @@
package ioutils // import "github.com/docker/docker/pkg/ioutils"
import (
"crypto/sha256"
"encoding/hex"
"math/rand"
"testing"
"time"
)
func TestBytesPipeRead(t *testing.T) {
buf := NewBytesPipe()
_, _ = buf.Write([]byte("12"))
_, _ = buf.Write([]byte("34"))
_, _ = buf.Write([]byte("56"))
_, _ = buf.Write([]byte("78"))
_, _ = buf.Write([]byte("90"))
rd := make([]byte, 4)
n, err := buf.Read(rd)
if err != nil {
t.Fatal(err)
}
if n != 4 {
t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4)
}
if string(rd) != "1234" {
t.Fatalf("Read %s, but must be %s", rd, "1234")
}
n, err = buf.Read(rd)
if err != nil {
t.Fatal(err)
}
if n != 4 {
t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4)
}
if string(rd) != "5678" {
t.Fatalf("Read %s, but must be %s", rd, "5679")
}
n, err = buf.Read(rd)
if err != nil {
t.Fatal(err)
}
if n != 2 {
t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 2)
}
if string(rd[:n]) != "90" {
t.Fatalf("Read %s, but must be %s", rd, "90")
}
}
func TestBytesPipeWrite(t *testing.T) {
buf := NewBytesPipe()
_, _ = buf.Write([]byte("12"))
_, _ = buf.Write([]byte("34"))
_, _ = buf.Write([]byte("56"))
_, _ = buf.Write([]byte("78"))
_, _ = buf.Write([]byte("90"))
if buf.buf[0].String() != "1234567890" {
t.Fatalf("Buffer %q, must be %q", buf.buf[0].String(), "1234567890")
}
}
// Regression test for #41941.
func TestBytesPipeDeadlock(t *testing.T) {
bp := NewBytesPipe()
bp.buf = []*fixedBuffer{getBuffer(blockThreshold)}
rd := make(chan error)
go func() {
n, err := bp.Read(make([]byte, 1))
t.Logf("Read n=%d, err=%v", n, err)
if n != 1 {
t.Errorf("short read: got %d, want 1", n)
}
rd <- err
}()
wr := make(chan error)
go func() {
const writeLen int = blockThreshold + 1
time.Sleep(time.Millisecond)
n, err := bp.Write(make([]byte, writeLen))
t.Logf("Write n=%d, err=%v", n, err)
if n != writeLen {
t.Errorf("short write: got %d, want %d", n, writeLen)
}
wr <- err
}()
timer := time.NewTimer(time.Second)
defer timer.Stop()
select {
case <-timer.C:
t.Fatal("deadlock! Neither Read() nor Write() returned.")
case rerr := <-rd:
if rerr != nil {
t.Fatal(rerr)
}
select {
case <-timer.C:
t.Fatal("deadlock! Write() did not return.")
case werr := <-wr:
if werr != nil {
t.Fatal(werr)
}
}
case werr := <-wr:
if werr != nil {
t.Fatal(werr)
}
select {
case <-timer.C:
t.Fatal("deadlock! Read() did not return.")
case rerr := <-rd:
if rerr != nil {
t.Fatal(rerr)
}
}
}
}
// Write and read in different speeds/chunk sizes and check valid data is read.
func TestBytesPipeWriteRandomChunks(t *testing.T) {
tests := []struct{ iterations, writesPerLoop, readsPerLoop int }{
{iterations: 100, writesPerLoop: 10, readsPerLoop: 1},
{iterations: 1000, writesPerLoop: 10, readsPerLoop: 5},
{iterations: 1000, writesPerLoop: 100},
{iterations: 1000, writesPerLoop: 5, readsPerLoop: 6},
{iterations: 10000, writesPerLoop: 50, readsPerLoop: 25},
}
testMessage := []byte("this is a random string for testing")
// random slice sizes to read and write
writeChunks := []int{25, 35, 15, 20}
readChunks := []int{5, 45, 20, 25}
for _, tc := range tests {
// first pass: write directly to hash
hash := sha256.New()
for i := 0; i < tc.iterations*tc.writesPerLoop; i++ {
if _, err := hash.Write(testMessage[:writeChunks[i%len(writeChunks)]]); err != nil {
t.Fatal(err)
}
}
expected := hex.EncodeToString(hash.Sum(nil))
// write/read through buffer
buf := NewBytesPipe()
hash.Reset()
done := make(chan struct{})
go func() {
// random delay before read starts
<-time.After(time.Duration(rand.Intn(10)) * time.Millisecond)
for i := 0; ; i++ {
p := make([]byte, readChunks[(tc.iterations*tc.readsPerLoop+i)%len(readChunks)])
n, _ := buf.Read(p)
if n == 0 {
break
}
hash.Write(p[:n])
}
close(done)
}()
for i := 0; i < tc.iterations; i++ {
for w := 0; w < tc.writesPerLoop; w++ {
buf.Write(testMessage[:writeChunks[(i*tc.writesPerLoop+w)%len(writeChunks)]])
}
}
_ = buf.Close()
<-done
actual := hex.EncodeToString(hash.Sum(nil))
if expected != actual {
t.Fatalf("BytesPipe returned invalid data. Expected checksum %v, got %v", expected, actual)
}
}
}
func BenchmarkBytesPipeWrite(b *testing.B) {
b.ReportAllocs()
testData := []byte("pretty short line, because why not?")
for i := 0; i < b.N; i++ {
readBuf := make([]byte, 1024)
buf := NewBytesPipe()
go func() {
var err error
for err == nil {
_, err = buf.Read(readBuf)
}
}()
for j := 0; j < 1000; j++ {
_, _ = buf.Write(testData)
}
_ = buf.Close()
}
}
func BenchmarkBytesPipeRead(b *testing.B) {
b.ReportAllocs()
rd := make([]byte, 512)
for i := 0; i < b.N; i++ {
b.StopTimer()
buf := NewBytesPipe()
for j := 0; j < 500; j++ {
_, _ = buf.Write(make([]byte, 1024))
}
b.StartTimer()
for j := 0; j < 1000; j++ {
if n, _ := buf.Read(rd); n != 512 {
b.Fatalf("Wrong number of bytes: %d", n)
}
}
}
}