From 4c6e571d38c869f76dcadf116cdee4df1f347253 Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Thu, 9 Oct 2025 19:45:01 -0400 Subject: [PATCH] api/pkg/stdcopy: move stdWriter to daemon/internal Clients have no need for muxing streams using our StdCopy wire format. Signed-off-by: Cory Snider --- api/pkg/stdcopy/stdcopy.go | 67 --------------- daemon/attach.go | 5 +- .../stdcopymux}/stdcopy_example_test.go | 9 +- .../internal/stdcopymux}/stdcopy_test.go | 48 ++++++----- daemon/internal/stdcopymux/writer.go | 82 +++++++++++++++++++ daemon/server/httputils/write_log_stream.go | 7 +- daemon/server/router/container/exec.go | 5 +- .../moby/moby/api/pkg/stdcopy/stdcopy.go | 67 --------------- 8 files changed, 123 insertions(+), 167 deletions(-) rename {api/pkg/stdcopy => daemon/internal/stdcopymux}/stdcopy_example_test.go (83%) rename {api/pkg/stdcopy => daemon/internal/stdcopymux}/stdcopy_test.go (85%) create mode 100644 daemon/internal/stdcopymux/writer.go diff --git a/api/pkg/stdcopy/stdcopy.go b/api/pkg/stdcopy/stdcopy.go index 3d52b1cdbb..948c6b6755 100644 --- a/api/pkg/stdcopy/stdcopy.go +++ b/api/pkg/stdcopy/stdcopy.go @@ -1,12 +1,10 @@ package stdcopy import ( - "bytes" "encoding/binary" "errors" "fmt" "io" - "sync" ) // StdType is the type of standard stream @@ -28,71 +26,6 @@ const ( startingBufLen = 32*1024 + stdWriterPrefixLen + 1 ) -var bufPool = &sync.Pool{New: func() any { return bytes.NewBuffer(nil) }} - -// stdWriter is wrapper of io.Writer with extra customized info. -type stdWriter struct { - io.Writer - prefix byte -} - -// Write sends the buffer to the underlying writer. -// It inserts the prefix header before the buffer, -// so [StdCopy] knows where to multiplex the output. -// -// It implements [io.Writer]. -func (w *stdWriter) Write(p []byte) (int, error) { - if w == nil || w.Writer == nil { - return 0, errors.New("writer not instantiated") - } - if p == nil { - return 0, nil - } - - header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix} - binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p))) - buf := bufPool.Get().(*bytes.Buffer) - buf.Write(header[:]) - buf.Write(p) - - n, err := w.Writer.Write(buf.Bytes()) - n -= stdWriterPrefixLen - if n < 0 { - n = 0 - } - - buf.Reset() - bufPool.Put(buf) - return n, err -} - -// NewStdWriter instantiates a new writer using a custom format to multiplex -// multiple streams to a single writer. All messages written using this writer -// are encapsulated using a custom format, and written to the underlying -// stream "w". -// -// Writers created through NewStdWriter allow for multiple write streams -// (e.g., stdout ([Stdout]) and stderr ([Stderr]) to be multiplexed into a -// single connection. "streamType" indicates the type of stream to encapsulate, -// commonly, [Stdout] or [Stderr]. The [Systemerr] stream can be used to -// include server-side errors in the stream. Information on this stream -// is returned as an error by [StdCopy] and terminates processing the -// stream. -// -// The [Stdin] stream is present for completeness and should generally -// NOT be used. It is output on [Stdout] when reading the stream with -// [StdCopy]. -// -// All streams must share the same underlying [io.Writer] to ensure proper -// multiplexing. Each call to NewStdWriter wraps that shared writer with -// a header indicating the target stream. -func NewStdWriter(w io.Writer, streamType StdType) io.Writer { - return &stdWriter{ - Writer: w, - prefix: byte(streamType), - } -} - // StdCopy is a modified version of [io.Copy] to de-multiplex messages // from "multiplexedSource" and copy them to destination streams // "destOut" and "destErr". diff --git a/daemon/attach.go b/daemon/attach.go index f445adff50..b4a2ad5235 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -10,6 +10,7 @@ import ( containertypes "github.com/moby/moby/api/types/container" "github.com/moby/moby/api/types/events" "github.com/moby/moby/v2/daemon/container" + "github.com/moby/moby/v2/daemon/internal/stdcopymux" "github.com/moby/moby/v2/daemon/internal/stream" "github.com/moby/moby/v2/daemon/logger" "github.com/moby/moby/v2/daemon/server/backend" @@ -74,8 +75,8 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, req *backend.Containe defer inStream.Close() if multiplexed { - errStream = stdcopy.NewStdWriter(errStream, stdcopy.Stderr) - outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) + errStream = stdcopymux.NewStdWriter(errStream, stdcopy.Stderr) + outStream = stdcopymux.NewStdWriter(outStream, stdcopy.Stdout) } if cfg.UseStdin { diff --git a/api/pkg/stdcopy/stdcopy_example_test.go b/daemon/internal/stdcopymux/stdcopy_example_test.go similarity index 83% rename from api/pkg/stdcopy/stdcopy_example_test.go rename to daemon/internal/stdcopymux/stdcopy_example_test.go index 9e3b4deae1..2b5ede79a7 100644 --- a/api/pkg/stdcopy/stdcopy_example_test.go +++ b/daemon/internal/stdcopymux/stdcopy_example_test.go @@ -1,4 +1,4 @@ -package stdcopy_test +package stdcopymux_test import ( "errors" @@ -8,6 +8,7 @@ import ( "time" "github.com/moby/moby/api/pkg/stdcopy" + "github.com/moby/moby/v2/daemon/internal/stdcopymux" ) func ExampleNewStdWriter() { @@ -25,9 +26,9 @@ func ExampleNewStdWriter() { }() // daemon writing to stdout, stderr, and systemErr. - stdout := stdcopy.NewStdWriter(muxStream, stdcopy.Stdout) - stderr := stdcopy.NewStdWriter(muxStream, stdcopy.Stderr) - systemErr := stdcopy.NewStdWriter(muxStream, stdcopy.Systemerr) + stdout := stdcopymux.NewStdWriter(muxStream, stdcopy.Stdout) + stderr := stdcopymux.NewStdWriter(muxStream, stdcopy.Stderr) + systemErr := stdcopymux.NewStdWriter(muxStream, stdcopy.Systemerr) for range 10 { _, _ = fmt.Fprintln(stdout, "hello from stdout") diff --git a/api/pkg/stdcopy/stdcopy_test.go b/daemon/internal/stdcopymux/stdcopy_test.go similarity index 85% rename from api/pkg/stdcopy/stdcopy_test.go rename to daemon/internal/stdcopymux/stdcopy_test.go index 14ea60ea53..b3ec459e79 100644 --- a/api/pkg/stdcopy/stdcopy_test.go +++ b/daemon/internal/stdcopymux/stdcopy_test.go @@ -1,4 +1,4 @@ -package stdcopy +package stdcopymux import ( "bytes" @@ -6,10 +6,14 @@ import ( "io" "strings" "testing" + + "github.com/moby/moby/api/pkg/stdcopy" ) +const startingBufLen = 32*1024 + 8 /* stdwriterPrefixLen */ + 1 + func TestNewStdWriter(t *testing.T) { - writer := NewStdWriter(io.Discard, Stdout) + writer := NewStdWriter(io.Discard, stdcopy.Stdout) if writer == nil { t.Fatalf("NewStdWriter with an invalid StdType should not return nil.") } @@ -18,7 +22,7 @@ func TestNewStdWriter(t *testing.T) { func TestWriteWithUninitializedStdWriter(t *testing.T) { writer := stdWriter{ Writer: nil, - prefix: byte(Stdout), + prefix: byte(stdcopy.Stdout), } n, err := writer.Write([]byte("Something here")) if n != 0 || err == nil { @@ -27,7 +31,7 @@ func TestWriteWithUninitializedStdWriter(t *testing.T) { } func TestWriteWithNilBytes(t *testing.T) { - writer := NewStdWriter(io.Discard, Stdout) + writer := NewStdWriter(io.Discard, stdcopy.Stdout) n, err := writer.Write(nil) if err != nil { t.Fatalf("Shouldn't have fail when given no data") @@ -38,7 +42,7 @@ func TestWriteWithNilBytes(t *testing.T) { } func TestWrite(t *testing.T) { - writer := NewStdWriter(io.Discard, Stdout) + writer := NewStdWriter(io.Discard, stdcopy.Stdout) data := []byte("Test StdWrite.Write") n, err := writer.Write(data) if err != nil { @@ -64,7 +68,7 @@ func TestWriteWithWriterError(t *testing.T) { writer := NewStdWriter(&errWriter{ n: stdWriterPrefixLen + expectedReturnedBytes, err: expectedError, - }, Stdout) + }, stdcopy.Stdout) data := []byte("This won't get written, sigh") n, err := writer.Write(data) if !errors.Is(err, expectedError) { @@ -77,7 +81,7 @@ func TestWriteWithWriterError(t *testing.T) { } func TestWriteDoesNotReturnNegativeWrittenBytes(t *testing.T) { - writer := NewStdWriter(&errWriter{n: -1}, Stdout) + writer := NewStdWriter(&errWriter{n: -1}, stdcopy.Stdout) data := []byte("This won't get written, sigh") actual, _ := writer.Write(data) if actual != 0 { @@ -87,12 +91,12 @@ func TestWriteDoesNotReturnNegativeWrittenBytes(t *testing.T) { func getSrcBuffer(stdOutBytes, stdErrBytes []byte) (*bytes.Buffer, error) { buffer := new(bytes.Buffer) - dstOut := NewStdWriter(buffer, Stdout) + dstOut := NewStdWriter(buffer, stdcopy.Stdout) _, err := dstOut.Write(stdOutBytes) if err != nil { return buffer, err } - dstErr := NewStdWriter(buffer, Stderr) + dstErr := NewStdWriter(buffer, stdcopy.Stderr) _, err = dstErr.Write(stdErrBytes) return buffer, err } @@ -104,7 +108,7 @@ func TestStdCopyWriteAndRead(t *testing.T) { if err != nil { t.Fatal(err) } - written, err := StdCopy(io.Discard, io.Discard, buffer) + written, err := stdcopy.StdCopy(io.Discard, io.Discard, buffer) if err != nil { t.Fatal(err) } @@ -135,7 +139,7 @@ func TestStdCopyReturnsErrorReadingHeader(t *testing.T) { reader := &customReader{ err: expectedError, } - written, err := StdCopy(io.Discard, io.Discard, reader) + written, err := stdcopy.StdCopy(io.Discard, io.Discard, reader) if written != 0 { t.Fatalf("Expected 0 bytes read, got %d", written) } @@ -158,7 +162,7 @@ func TestStdCopyReturnsErrorReadingFrame(t *testing.T) { err: expectedError, src: buffer, } - written, err := StdCopy(io.Discard, io.Discard, reader) + written, err := stdcopy.StdCopy(io.Discard, io.Discard, reader) if written != 0 { t.Fatalf("Expected 0 bytes read, got %d", written) } @@ -180,7 +184,7 @@ func TestStdCopyDetectsCorruptedFrame(t *testing.T) { err: io.EOF, src: buffer, } - written, err := StdCopy(io.Discard, io.Discard, reader) + written, err := stdcopy.StdCopy(io.Discard, io.Discard, reader) if written != startingBufLen { t.Fatalf("Expected %d bytes read, got %d", startingBufLen, written) } @@ -190,10 +194,10 @@ func TestStdCopyDetectsCorruptedFrame(t *testing.T) { } func TestStdCopyWithInvalidInputHeader(t *testing.T) { - dstOut := NewStdWriter(io.Discard, Stdout) - dstErr := NewStdWriter(io.Discard, Stderr) + dstOut := NewStdWriter(io.Discard, stdcopy.Stdout) + dstErr := NewStdWriter(io.Discard, stdcopy.Stderr) src := strings.NewReader("Invalid input") - _, err := StdCopy(dstOut, dstErr, src) + _, err := stdcopy.StdCopy(dstOut, dstErr, src) if err == nil { t.Fatal("StdCopy with invalid input header should fail.") } @@ -202,7 +206,7 @@ func TestStdCopyWithInvalidInputHeader(t *testing.T) { func TestStdCopyWithCorruptedPrefix(t *testing.T) { data := []byte{0x01, 0x02, 0x03} src := bytes.NewReader(data) - written, err := StdCopy(nil, nil, src) + written, err := stdcopy.StdCopy(nil, nil, src) if err != nil { t.Fatalf("StdCopy should not return an error with corrupted prefix.") } @@ -222,7 +226,7 @@ func TestStdCopyReturnsWriteErrors(t *testing.T) { dstOut := &errWriter{err: expectedError} - written, err := StdCopy(dstOut, io.Discard, buffer) + written, err := stdcopy.StdCopy(dstOut, io.Discard, buffer) if written != 0 { t.Fatalf("StdCopy should have written 0, but has written %d", written) } @@ -240,7 +244,7 @@ func TestStdCopyDetectsNotFullyWrittenFrames(t *testing.T) { } dstOut := &errWriter{n: startingBufLen - 10} - written, err := StdCopy(dstOut, io.Discard, buffer) + written, err := stdcopy.StdCopy(dstOut, io.Discard, buffer) if written != 0 { t.Fatalf("StdCopy should have return 0 written bytes, but returned %d", written) } @@ -261,7 +265,7 @@ func TestStdCopyReturnsErrorFromSystem(t *testing.T) { } // add in an error message on the Systemerr stream systemErrBytes := []byte(strings.Repeat("S", startingBufLen)) - systemWriter := NewStdWriter(buffer, Systemerr) + systemWriter := NewStdWriter(buffer, stdcopy.Systemerr) _, err = systemWriter.Write(systemErrBytes) if err != nil { t.Fatal(err) @@ -269,7 +273,7 @@ func TestStdCopyReturnsErrorFromSystem(t *testing.T) { // now copy and demux. we should expect an error containing the string we // wrote out - _, err = StdCopy(io.Discard, io.Discard, buffer) + _, err = stdcopy.StdCopy(io.Discard, io.Discard, buffer) if err == nil { t.Fatal("expected error, got none") } @@ -279,7 +283,7 @@ func TestStdCopyReturnsErrorFromSystem(t *testing.T) { } func BenchmarkWrite(b *testing.B) { - w := NewStdWriter(io.Discard, Stdout) + w := NewStdWriter(io.Discard, stdcopy.Stdout) data := []byte("Test line for testing stdwriter performance\n") data = bytes.Repeat(data, 100) b.SetBytes(int64(len(data))) diff --git a/daemon/internal/stdcopymux/writer.go b/daemon/internal/stdcopymux/writer.go new file mode 100644 index 0000000000..4482a63376 --- /dev/null +++ b/daemon/internal/stdcopymux/writer.go @@ -0,0 +1,82 @@ +package stdcopymux + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "sync" + + "github.com/moby/moby/api/pkg/stdcopy" +) + +const ( + stdWriterPrefixLen = 8 + stdWriterFdIndex = 0 + stdWriterSizeIndex = 4 +) + +var bufPool = &sync.Pool{New: func() any { return bytes.NewBuffer(nil) }} + +// stdWriter is wrapper of io.Writer with extra customized info. +type stdWriter struct { + io.Writer + prefix byte +} + +// Write sends the buffer to the underlying writer. +// It inserts the prefix header before the buffer, +// so [StdCopy] knows where to multiplex the output. +// +// It implements [io.Writer]. +func (w *stdWriter) Write(p []byte) (int, error) { + if w == nil || w.Writer == nil { + return 0, errors.New("writer not instantiated") + } + if p == nil { + return 0, nil + } + + header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix} + binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p))) + buf := bufPool.Get().(*bytes.Buffer) + buf.Write(header[:]) + buf.Write(p) + + n, err := w.Writer.Write(buf.Bytes()) + n -= stdWriterPrefixLen + if n < 0 { + n = 0 + } + + buf.Reset() + bufPool.Put(buf) + return n, err +} + +// NewStdWriter instantiates a new writer using a custom format to multiplex +// multiple streams to a single writer. All messages written using this writer +// are encapsulated using a custom format, and written to the underlying +// stream "w". +// +// Writers created through NewStdWriter allow for multiple write streams +// (e.g., stdout ([Stdout]) and stderr ([Stderr]) to be multiplexed into a +// single connection. "streamType" indicates the type of stream to encapsulate, +// commonly, [Stdout] or [Stderr]. The [Systemerr] stream can be used to +// include server-side errors in the stream. Information on this stream +// is returned as an error by [StdCopy] and terminates processing the +// stream. +// +// The [Stdin] stream is present for completeness and should generally +// NOT be used. It is output on [Stdout] when reading the stream with +// [StdCopy]. +// +// All streams must share the same underlying [io.Writer] to ensure proper +// multiplexing. Each call to NewStdWriter wraps that shared writer with +// a header indicating the target stream. +func NewStdWriter(w io.Writer, streamType stdcopy.StdType) io.Writer { + return &stdWriter{ + Writer: w, + prefix: byte(streamType), + } +} diff --git a/daemon/server/httputils/write_log_stream.go b/daemon/server/httputils/write_log_stream.go index 2fa48ca2f4..f233817782 100644 --- a/daemon/server/httputils/write_log_stream.go +++ b/daemon/server/httputils/write_log_stream.go @@ -9,6 +9,7 @@ import ( "sort" "github.com/moby/moby/api/pkg/stdcopy" + "github.com/moby/moby/v2/daemon/internal/stdcopymux" "github.com/moby/moby/v2/daemon/server/backend" "github.com/moby/moby/v2/pkg/ioutils" ) @@ -33,9 +34,9 @@ func WriteLogStream(_ context.Context, w http.ResponseWriter, msgs <-chan *backe errStream := outStream sysErrStream := errStream if mux { - sysErrStream = stdcopy.NewStdWriter(outStream, stdcopy.Systemerr) - errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) - outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) + sysErrStream = stdcopymux.NewStdWriter(outStream, stdcopy.Systemerr) + errStream = stdcopymux.NewStdWriter(outStream, stdcopy.Stderr) + outStream = stdcopymux.NewStdWriter(outStream, stdcopy.Stdout) } for { diff --git a/daemon/server/router/container/exec.go b/daemon/server/router/container/exec.go index b106c28910..8e9939a8bf 100644 --- a/daemon/server/router/container/exec.go +++ b/daemon/server/router/container/exec.go @@ -11,6 +11,7 @@ import ( "github.com/moby/moby/api/types" "github.com/moby/moby/api/types/container" "github.com/moby/moby/api/types/versions" + "github.com/moby/moby/v2/daemon/internal/stdcopymux" "github.com/moby/moby/v2/daemon/server/backend" "github.com/moby/moby/v2/daemon/server/httputils" "github.com/moby/moby/v2/errdefs" @@ -130,8 +131,8 @@ func (c *containerRouter) postContainerExecStart(ctx context.Context, w http.Res if options.Tty { stdout = outStream } else { - stderr = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) - stdout = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) + stderr = stdcopymux.NewStdWriter(outStream, stdcopy.Stderr) + stdout = stdcopymux.NewStdWriter(outStream, stdcopy.Stdout) } } diff --git a/vendor/github.com/moby/moby/api/pkg/stdcopy/stdcopy.go b/vendor/github.com/moby/moby/api/pkg/stdcopy/stdcopy.go index 3d52b1cdbb..948c6b6755 100644 --- a/vendor/github.com/moby/moby/api/pkg/stdcopy/stdcopy.go +++ b/vendor/github.com/moby/moby/api/pkg/stdcopy/stdcopy.go @@ -1,12 +1,10 @@ package stdcopy import ( - "bytes" "encoding/binary" "errors" "fmt" "io" - "sync" ) // StdType is the type of standard stream @@ -28,71 +26,6 @@ const ( startingBufLen = 32*1024 + stdWriterPrefixLen + 1 ) -var bufPool = &sync.Pool{New: func() any { return bytes.NewBuffer(nil) }} - -// stdWriter is wrapper of io.Writer with extra customized info. -type stdWriter struct { - io.Writer - prefix byte -} - -// Write sends the buffer to the underlying writer. -// It inserts the prefix header before the buffer, -// so [StdCopy] knows where to multiplex the output. -// -// It implements [io.Writer]. -func (w *stdWriter) Write(p []byte) (int, error) { - if w == nil || w.Writer == nil { - return 0, errors.New("writer not instantiated") - } - if p == nil { - return 0, nil - } - - header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix} - binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p))) - buf := bufPool.Get().(*bytes.Buffer) - buf.Write(header[:]) - buf.Write(p) - - n, err := w.Writer.Write(buf.Bytes()) - n -= stdWriterPrefixLen - if n < 0 { - n = 0 - } - - buf.Reset() - bufPool.Put(buf) - return n, err -} - -// NewStdWriter instantiates a new writer using a custom format to multiplex -// multiple streams to a single writer. All messages written using this writer -// are encapsulated using a custom format, and written to the underlying -// stream "w". -// -// Writers created through NewStdWriter allow for multiple write streams -// (e.g., stdout ([Stdout]) and stderr ([Stderr]) to be multiplexed into a -// single connection. "streamType" indicates the type of stream to encapsulate, -// commonly, [Stdout] or [Stderr]. The [Systemerr] stream can be used to -// include server-side errors in the stream. Information on this stream -// is returned as an error by [StdCopy] and terminates processing the -// stream. -// -// The [Stdin] stream is present for completeness and should generally -// NOT be used. It is output on [Stdout] when reading the stream with -// [StdCopy]. -// -// All streams must share the same underlying [io.Writer] to ensure proper -// multiplexing. Each call to NewStdWriter wraps that shared writer with -// a header indicating the target stream. -func NewStdWriter(w io.Writer, streamType StdType) io.Writer { - return &stdWriter{ - Writer: w, - prefix: byte(streamType), - } -} - // StdCopy is a modified version of [io.Copy] to de-multiplex messages // from "multiplexedSource" and copy them to destination streams // "destOut" and "destErr".