api/pkg/streamformatter: move to client and daemon/internal

Move the streamformatter package up into the client for a temporary
shared location between common clients like CLI and compose.

The streamformatter package is used by the daemon to write streams of
status and progress messages to API clients. It is completely out of
scope of the api module and not used outside the daemon. Remove the
unused rawSteamFormatter, whose purpose is to render the progress as a
TUI.

Co-authored-by: Cory Snider <csnider@mirantis.com>
Signed-off-by: Austin Vazquez <austin.vazquez@docker.com>
This commit is contained in:
Cory Snider
2025-10-09 17:36:24 -04:00
committed by Austin Vazquez
parent 8222a3f1d9
commit 6baf274fa3
25 changed files with 143 additions and 99 deletions

View File

@@ -19,7 +19,6 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/entitlements"
"github.com/moby/buildkit/util/tracing"
"github.com/moby/moby/api/pkg/streamformatter"
"github.com/moby/moby/api/types/build"
"github.com/moby/moby/api/types/container"
"github.com/moby/moby/api/types/network"
@@ -29,6 +28,7 @@ import (
"github.com/moby/moby/v2/daemon/internal/builder-next/exporter"
"github.com/moby/moby/v2/daemon/internal/builder-next/exporter/mobyexporter"
"github.com/moby/moby/v2/daemon/internal/builder-next/exporter/overrides"
"github.com/moby/moby/v2/daemon/internal/streamformatter"
"github.com/moby/moby/v2/daemon/internal/timestamp"
"github.com/moby/moby/v2/daemon/libnetwork"
"github.com/moby/moby/v2/daemon/pkg/opts"

View File

@@ -8,7 +8,7 @@ import (
"github.com/containerd/log"
"github.com/moby/moby/api/pkg/progress"
"github.com/moby/moby/api/pkg/streamformatter"
"github.com/moby/moby/v2/daemon/internal/streamformatter"
)
// WriteDistributionProgress is a helper for writing progress from chan to JSON

View File

@@ -18,11 +18,11 @@ import (
"github.com/moby/go-archive/chrootarchive"
"github.com/moby/go-archive/compression"
"github.com/moby/moby/api/pkg/progress"
"github.com/moby/moby/api/pkg/streamformatter"
"github.com/moby/moby/api/types/events"
"github.com/moby/moby/v2/daemon/internal/image"
"github.com/moby/moby/v2/daemon/internal/ioutils"
"github.com/moby/moby/v2/daemon/internal/layer"
"github.com/moby/moby/v2/daemon/internal/streamformatter"
"github.com/moby/moby/v2/daemon/internal/stringid"
"github.com/moby/sys/sequential"
"github.com/moby/sys/symlink"

View File

@@ -0,0 +1,164 @@
// Package streamformatter provides helper functions to format a stream.
package streamformatter
import (
"encoding/json"
"fmt"
"io"
"sync"
"github.com/moby/moby/api/pkg/progress"
"github.com/moby/moby/api/types/jsonstream"
)
// jsonMessage defines a message struct. It describes
// the created time, where it from, status, ID of the
// message. It's used for docker events.
//
// It is a reduced set of [jsonmessage.JSONMessage].
type jsonMessage struct {
Stream string `json:"stream,omitempty"`
Status string `json:"status,omitempty"`
Progress *jsonstream.Progress `json:"progressDetail,omitempty"`
ID string `json:"id,omitempty"`
Error *jsonstream.Error `json:"errorDetail,omitempty"`
Aux *json.RawMessage `json:"aux,omitempty"` // Aux contains out-of-band data, such as digests for push signing and image id after building.
// ErrorMessage contains errors encountered during the operation.
//
// Deprecated: this field is deprecated since docker v0.6.0 / API v1.4. Use [Error.Message] instead. This field will be omitted in a future release.
ErrorMessage string `json:"error,omitempty"` // deprecated
}
const streamNewline = "\r\n"
type jsonProgressFormatter struct{}
func appendNewline(source []byte) []byte {
return append(source, []byte(streamNewline)...)
}
// FormatStatus formats the specified objects according to the specified format (and id).
func FormatStatus(id, format string, a ...any) []byte {
str := fmt.Sprintf(format, a...)
b, err := json.Marshal(&jsonMessage{ID: id, Status: str})
if err != nil {
return FormatError(err)
}
return appendNewline(b)
}
// FormatError formats the error as a JSON object
func FormatError(err error) []byte {
jsonError, ok := err.(*jsonstream.Error)
if !ok {
jsonError = &jsonstream.Error{Message: err.Error()}
}
if b, err := json.Marshal(&jsonMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
return appendNewline(b)
}
return []byte(`{"error":"format error"}` + streamNewline)
}
func (sf *jsonProgressFormatter) formatStatus(id, format string, a ...any) []byte {
return FormatStatus(id, format, a...)
}
// formatProgress formats the progress information for a specified action.
func (sf *jsonProgressFormatter) formatProgress(id, action string, progress *jsonstream.Progress, aux any) []byte {
if progress == nil {
progress = &jsonstream.Progress{}
}
var auxJSON *json.RawMessage
if aux != nil {
auxJSONBytes, err := json.Marshal(aux)
if err != nil {
return nil
}
auxJSON = new(json.RawMessage)
*auxJSON = auxJSONBytes
}
b, err := json.Marshal(&jsonMessage{
Status: action,
Progress: progress,
ID: id,
Aux: auxJSON,
})
if err != nil {
return nil
}
return appendNewline(b)
}
// NewJSONProgressOutput returns a progress.Output that formats output
// using JSON objects
func NewJSONProgressOutput(out io.Writer, newLines bool) progress.Output {
return &progressOutput{sf: &jsonProgressFormatter{}, out: out, newLines: newLines}
}
type formatProgress interface {
formatStatus(id, format string, a ...any) []byte
formatProgress(id, action string, progress *jsonstream.Progress, aux any) []byte
}
type progressOutput struct {
sf formatProgress
out io.Writer
newLines bool
mu sync.Mutex
}
// WriteProgress formats progress information from a ProgressReader.
func (out *progressOutput) WriteProgress(prog progress.Progress) error {
var formatted []byte
if prog.Message != "" {
formatted = out.sf.formatStatus(prog.ID, prog.Message)
} else {
jsonProgress := jsonstream.Progress{
Current: prog.Current,
Total: prog.Total,
HideCounts: prog.HideCounts,
Units: prog.Units,
}
formatted = out.sf.formatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux)
}
out.mu.Lock()
defer out.mu.Unlock()
_, err := out.out.Write(formatted)
if err != nil {
return err
}
if out.newLines && prog.LastUpdate {
_, err = out.out.Write(out.sf.formatStatus("", ""))
return err
}
return nil
}
// AuxFormatter is a streamFormatter that writes aux progress messages
type AuxFormatter struct {
io.Writer
}
// Emit emits the given interface as an aux progress message
func (sf *AuxFormatter) Emit(id string, aux any) error {
auxJSONBytes, err := json.Marshal(aux)
if err != nil {
return err
}
auxJSON := new(json.RawMessage)
*auxJSON = auxJSONBytes
msgJSON, err := json.Marshal(&jsonMessage{ID: id, Aux: auxJSON})
if err != nil {
return err
}
msgJSON = appendNewline(msgJSON)
n, err := sf.Writer.Write(msgJSON)
if n != len(msgJSON) {
return io.ErrShortWrite
}
return err
}

View File

@@ -0,0 +1,89 @@
package streamformatter
import (
"bytes"
"encoding/json"
"errors"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/moby/moby/api/types/jsonstream"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
func TestFormatStatus(t *testing.T) {
res := FormatStatus("ID", "%s%d", "a", 1)
expected := `{"status":"a1","id":"ID"}` + streamNewline
assert.Check(t, is.Equal(expected, string(res)))
}
func TestFormatError(t *testing.T) {
res := FormatError(errors.New("Error for formatter"))
expected := `{"errorDetail":{"message":"Error for formatter"},"error":"Error for formatter"}` + "\r\n"
assert.Check(t, is.Equal(expected, string(res)))
}
func TestFormatJSONError(t *testing.T) {
err := &jsonstream.Error{Code: 50, Message: "Json error"}
res := FormatError(err)
expected := `{"errorDetail":{"code":50,"message":"Json error"},"error":"Json error"}` + streamNewline
assert.Check(t, is.Equal(expected, string(res)))
}
func TestJsonProgressFormatterFormatProgress(t *testing.T) {
sf := &jsonProgressFormatter{}
jsonProgress := &jsonstream.Progress{
Current: 15,
Total: 30,
Start: 1,
}
aux := "aux message"
res := sf.formatProgress("id", "action", jsonProgress, aux)
msg := &jsonMessage{}
assert.NilError(t, json.Unmarshal(res, msg))
rawAux := json.RawMessage(`"` + aux + `"`)
expected := &jsonMessage{
ID: "id",
Status: "action",
Aux: &rawAux,
Progress: jsonProgress,
}
assert.DeepEqual(t, msg, expected, cmpJSONMessageOpt())
}
func cmpJSONMessageOpt() cmp.Option {
progressMessagePath := func(path cmp.Path) bool {
return path.String() == "ProgressMessage"
}
return cmp.Options{
// Ignore deprecated property that is a derivative of Progress
cmp.FilterPath(progressMessagePath, cmp.Ignore()),
}
}
func TestJsonProgressFormatterFormatStatus(t *testing.T) {
sf := jsonProgressFormatter{}
res := sf.formatStatus("ID", "%s%d", "a", 1)
assert.Check(t, is.Equal(`{"status":"a1","id":"ID"}`+streamNewline, string(res)))
}
func TestNewJSONProgressOutput(t *testing.T) {
b := bytes.Buffer{}
b.Write(FormatStatus("id", "Downloading"))
_ = NewJSONProgressOutput(&b, false)
assert.Check(t, is.Equal(`{"status":"Downloading","id":"id"}`+streamNewline, b.String()))
}
func TestAuxFormatterEmit(t *testing.T) {
b := bytes.Buffer{}
aux := &AuxFormatter{Writer: &b}
sampleAux := &struct {
Data string
}{"Additional data"}
err := aux.Emit("", sampleAux)
assert.NilError(t, err)
assert.Check(t, is.Equal(`{"aux":{"Data":"Additional data"}}`+streamNewline, b.String()))
}

View File

@@ -0,0 +1,45 @@
package streamformatter
import (
"encoding/json"
"io"
)
type streamWriter struct {
io.Writer
lineFormat func([]byte) string
}
func (sw *streamWriter) Write(buf []byte) (int, error) {
formattedBuf := sw.format(buf)
n, err := sw.Writer.Write(formattedBuf)
if n != len(formattedBuf) {
return n, io.ErrShortWrite
}
return len(buf), err
}
func (sw *streamWriter) format(buf []byte) []byte {
msg := &jsonMessage{Stream: sw.lineFormat(buf)}
b, err := json.Marshal(msg)
if err != nil {
return FormatError(err)
}
return appendNewline(b)
}
// NewStdoutWriter returns a writer which formats the output as json message
// representing stdout lines
func NewStdoutWriter(out io.Writer) io.Writer {
return &streamWriter{Writer: out, lineFormat: func(buf []byte) string {
return string(buf)
}}
}
// NewStderrWriter returns a writer which formats the output as json message
// representing stderr lines
func NewStderrWriter(out io.Writer) io.Writer {
return &streamWriter{Writer: out, lineFormat: func(buf []byte) string {
return "\033[91m" + string(buf) + "\033[0m"
}}
}

View File

@@ -0,0 +1,35 @@
package streamformatter
import (
"bytes"
"testing"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
func TestStreamWriterStdout(t *testing.T) {
buffer := &bytes.Buffer{}
content := "content"
sw := NewStdoutWriter(buffer)
size, err := sw.Write([]byte(content))
assert.NilError(t, err)
assert.Check(t, is.Equal(len(content), size))
expected := `{"stream":"content"}` + streamNewline
assert.Check(t, is.Equal(expected, buffer.String()))
}
func TestStreamWriterStderr(t *testing.T) {
buffer := &bytes.Buffer{}
content := "content"
sw := NewStderrWriter(buffer)
size, err := sw.Write([]byte(content))
assert.NilError(t, err)
assert.Check(t, is.Equal(len(content), size))
expected := `{"stream":"\u001b[91mcontent\u001b[0m"}` + streamNewline
assert.Check(t, is.Equal(expected, buffer.String()))
}