Move pkg/streamformatter to api/pkg/streamformatter

Signed-off-by: Derek McGowan <derek@mcg.dev>
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Derek McGowan
2025-07-29 13:28:31 -07:00
committed by Sebastiaan van Stijn
parent 66862e14d1
commit d00ecdc479
21 changed files with 307 additions and 15 deletions

View File

@@ -1,247 +0,0 @@
// Package streamformatter provides helper functions to format a stream.
package streamformatter
import (
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/docker/go-units"
"github.com/moby/moby/api/pkg/progress"
"github.com/moby/moby/api/types/jsonstream"
)
// jsonMessage defines a message struct. It describes
// the created time, where it from, status, ID of the
// message. It's used for docker events.
//
// It is a reduced set of [jsonmessage.JSONMessage].
type jsonMessage struct {
Stream string `json:"stream,omitempty"`
Status string `json:"status,omitempty"`
Progress *jsonstream.Progress `json:"progressDetail,omitempty"`
ID string `json:"id,omitempty"`
Error *jsonstream.Error `json:"errorDetail,omitempty"`
Aux *json.RawMessage `json:"aux,omitempty"` // Aux contains out-of-band data, such as digests for push signing and image id after building.
// ErrorMessage contains errors encountered during the operation.
//
// Deprecated: this field is deprecated since docker v0.6.0 / API v1.4. Use [Error.Message] instead. This field will be omitted in a future release.
ErrorMessage string `json:"error,omitempty"` // deprecated
}
const streamNewline = "\r\n"
type jsonProgressFormatter struct{}
func appendNewline(source []byte) []byte {
return append(source, []byte(streamNewline)...)
}
// FormatStatus formats the specified objects according to the specified format (and id).
func FormatStatus(id, format string, a ...interface{}) []byte {
str := fmt.Sprintf(format, a...)
b, err := json.Marshal(&jsonMessage{ID: id, Status: str})
if err != nil {
return FormatError(err)
}
return appendNewline(b)
}
// FormatError formats the error as a JSON object
func FormatError(err error) []byte {
jsonError, ok := err.(*jsonstream.Error)
if !ok {
jsonError = &jsonstream.Error{Message: err.Error()}
}
if b, err := json.Marshal(&jsonMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
return appendNewline(b)
}
return []byte(`{"error":"format error"}` + streamNewline)
}
func (sf *jsonProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte {
return FormatStatus(id, format, a...)
}
// formatProgress formats the progress information for a specified action.
func (sf *jsonProgressFormatter) formatProgress(id, action string, progress *jsonstream.Progress, aux interface{}) []byte {
if progress == nil {
progress = &jsonstream.Progress{}
}
var auxJSON *json.RawMessage
if aux != nil {
auxJSONBytes, err := json.Marshal(aux)
if err != nil {
return nil
}
auxJSON = new(json.RawMessage)
*auxJSON = auxJSONBytes
}
b, err := json.Marshal(&jsonMessage{
Status: action,
Progress: progress,
ID: id,
Aux: auxJSON,
})
if err != nil {
return nil
}
return appendNewline(b)
}
type rawProgressFormatter struct{}
func (sf *rawProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte {
return []byte(fmt.Sprintf(format, a...) + streamNewline)
}
func rawProgressString(p *jsonstream.Progress) string {
if p == nil || (p.Current <= 0 && p.Total <= 0) {
return ""
}
if p.Total <= 0 {
switch p.Units {
case "":
return fmt.Sprintf("%8v", units.HumanSize(float64(p.Current)))
default:
return fmt.Sprintf("%d %s", p.Current, p.Units)
}
}
percentage := int(float64(p.Current)/float64(p.Total)*100) / 2
if percentage > 50 {
percentage = 50
}
numSpaces := 0
if 50-percentage > 0 {
numSpaces = 50 - percentage
}
pbBox := fmt.Sprintf("[%s>%s] ", strings.Repeat("=", percentage), strings.Repeat(" ", numSpaces))
var numbersBox string
switch {
case p.HideCounts:
case p.Units == "": // no units, use bytes
current := units.HumanSize(float64(p.Current))
total := units.HumanSize(float64(p.Total))
numbersBox = fmt.Sprintf("%8v/%v", current, total)
if p.Current > p.Total {
// remove total display if the reported current is wonky.
numbersBox = fmt.Sprintf("%8v", current)
}
default:
numbersBox = fmt.Sprintf("%d/%d %s", p.Current, p.Total, p.Units)
if p.Current > p.Total {
// remove total display if the reported current is wonky.
numbersBox = fmt.Sprintf("%d %s", p.Current, p.Units)
}
}
var timeLeftBox string
if p.Current > 0 && p.Start > 0 && percentage < 50 {
fromStart := time.Since(time.Unix(p.Start, 0))
perEntry := fromStart / time.Duration(p.Current)
left := time.Duration(p.Total-p.Current) * perEntry
timeLeftBox = " " + left.Round(time.Second).String()
}
return pbBox + numbersBox + timeLeftBox
}
func (sf *rawProgressFormatter) formatProgress(id, action string, progress *jsonstream.Progress, aux interface{}) []byte {
if progress == nil {
progress = &jsonstream.Progress{}
}
endl := "\r"
out := rawProgressString(progress)
if out == "" {
endl += "\n"
}
return []byte(action + " " + out + endl)
}
// NewProgressOutput returns a progress.Output object that can be passed to
// progress.NewProgressReader.
func NewProgressOutput(out io.Writer) progress.Output {
return &progressOutput{sf: &rawProgressFormatter{}, out: out, newLines: true}
}
// NewJSONProgressOutput returns a progress.Output that formats output
// using JSON objects
func NewJSONProgressOutput(out io.Writer, newLines bool) progress.Output {
return &progressOutput{sf: &jsonProgressFormatter{}, out: out, newLines: newLines}
}
type formatProgress interface {
formatStatus(id, format string, a ...interface{}) []byte
formatProgress(id, action string, progress *jsonstream.Progress, aux interface{}) []byte
}
type progressOutput struct {
sf formatProgress
out io.Writer
newLines bool
mu sync.Mutex
}
// WriteProgress formats progress information from a ProgressReader.
func (out *progressOutput) WriteProgress(prog progress.Progress) error {
var formatted []byte
if prog.Message != "" {
formatted = out.sf.formatStatus(prog.ID, prog.Message)
} else {
jsonProgress := jsonstream.Progress{
Current: prog.Current,
Total: prog.Total,
HideCounts: prog.HideCounts,
Units: prog.Units,
}
formatted = out.sf.formatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux)
}
out.mu.Lock()
defer out.mu.Unlock()
_, err := out.out.Write(formatted)
if err != nil {
return err
}
if out.newLines && prog.LastUpdate {
_, err = out.out.Write(out.sf.formatStatus("", ""))
return err
}
return nil
}
// AuxFormatter is a streamFormatter that writes aux progress messages
type AuxFormatter struct {
io.Writer
}
// Emit emits the given interface as an aux progress message
func (sf *AuxFormatter) Emit(id string, aux interface{}) error {
auxJSONBytes, err := json.Marshal(aux)
if err != nil {
return err
}
auxJSON := new(json.RawMessage)
*auxJSON = auxJSONBytes
msgJSON, err := json.Marshal(&jsonMessage{ID: id, Aux: auxJSON})
if err != nil {
return err
}
msgJSON = appendNewline(msgJSON)
n, err := sf.Writer.Write(msgJSON)
if n != len(msgJSON) {
return io.ErrShortWrite
}
return err
}

View File

@@ -1,110 +0,0 @@
package streamformatter
import (
"bytes"
"encoding/json"
"errors"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/moby/moby/api/types/jsonstream"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
func TestRawProgressFormatterFormatStatus(t *testing.T) {
sf := rawProgressFormatter{}
res := sf.formatStatus("ID", "%s%d", "a", 1)
assert.Check(t, is.Equal("a1\r\n", string(res)))
}
func TestRawProgressFormatterFormatProgress(t *testing.T) {
sf := rawProgressFormatter{}
jsonProgress := &jsonstream.Progress{
Current: 15,
Total: 30,
Start: 1,
}
res := sf.formatProgress("id", "action", jsonProgress, nil)
out := string(res)
assert.Check(t, strings.HasPrefix(out, "action [===="))
assert.Check(t, is.Contains(out, "15B/30B"))
assert.Check(t, strings.HasSuffix(out, "\r"))
}
func TestFormatStatus(t *testing.T) {
res := FormatStatus("ID", "%s%d", "a", 1)
expected := `{"status":"a1","id":"ID"}` + streamNewline
assert.Check(t, is.Equal(expected, string(res)))
}
func TestFormatError(t *testing.T) {
res := FormatError(errors.New("Error for formatter"))
expected := `{"errorDetail":{"message":"Error for formatter"},"error":"Error for formatter"}` + "\r\n"
assert.Check(t, is.Equal(expected, string(res)))
}
func TestFormatJSONError(t *testing.T) {
err := &jsonstream.Error{Code: 50, Message: "Json error"}
res := FormatError(err)
expected := `{"errorDetail":{"code":50,"message":"Json error"},"error":"Json error"}` + streamNewline
assert.Check(t, is.Equal(expected, string(res)))
}
func TestJsonProgressFormatterFormatProgress(t *testing.T) {
sf := &jsonProgressFormatter{}
jsonProgress := &jsonstream.Progress{
Current: 15,
Total: 30,
Start: 1,
}
aux := "aux message"
res := sf.formatProgress("id", "action", jsonProgress, aux)
msg := &jsonMessage{}
assert.NilError(t, json.Unmarshal(res, msg))
rawAux := json.RawMessage(`"` + aux + `"`)
expected := &jsonMessage{
ID: "id",
Status: "action",
Aux: &rawAux,
Progress: jsonProgress,
}
assert.DeepEqual(t, msg, expected, cmpJSONMessageOpt())
}
func cmpJSONMessageOpt() cmp.Option {
progressMessagePath := func(path cmp.Path) bool {
return path.String() == "ProgressMessage"
}
return cmp.Options{
// Ignore deprecated property that is a derivative of Progress
cmp.FilterPath(progressMessagePath, cmp.Ignore()),
}
}
func TestJsonProgressFormatterFormatStatus(t *testing.T) {
sf := jsonProgressFormatter{}
res := sf.formatStatus("ID", "%s%d", "a", 1)
assert.Check(t, is.Equal(`{"status":"a1","id":"ID"}`+streamNewline, string(res)))
}
func TestNewJSONProgressOutput(t *testing.T) {
b := bytes.Buffer{}
b.Write(FormatStatus("id", "Downloading"))
_ = NewJSONProgressOutput(&b, false)
assert.Check(t, is.Equal(`{"status":"Downloading","id":"id"}`+streamNewline, b.String()))
}
func TestAuxFormatterEmit(t *testing.T) {
b := bytes.Buffer{}
aux := &AuxFormatter{Writer: &b}
sampleAux := &struct {
Data string
}{"Additional data"}
err := aux.Emit("", sampleAux)
assert.NilError(t, err)
assert.Check(t, is.Equal(`{"aux":{"Data":"Additional data"}}`+streamNewline, b.String()))
}

View File

@@ -1,45 +0,0 @@
package streamformatter
import (
"encoding/json"
"io"
)
type streamWriter struct {
io.Writer
lineFormat func([]byte) string
}
func (sw *streamWriter) Write(buf []byte) (int, error) {
formattedBuf := sw.format(buf)
n, err := sw.Writer.Write(formattedBuf)
if n != len(formattedBuf) {
return n, io.ErrShortWrite
}
return len(buf), err
}
func (sw *streamWriter) format(buf []byte) []byte {
msg := &jsonMessage{Stream: sw.lineFormat(buf)}
b, err := json.Marshal(msg)
if err != nil {
return FormatError(err)
}
return appendNewline(b)
}
// NewStdoutWriter returns a writer which formats the output as json message
// representing stdout lines
func NewStdoutWriter(out io.Writer) io.Writer {
return &streamWriter{Writer: out, lineFormat: func(buf []byte) string {
return string(buf)
}}
}
// NewStderrWriter returns a writer which formats the output as json message
// representing stderr lines
func NewStderrWriter(out io.Writer) io.Writer {
return &streamWriter{Writer: out, lineFormat: func(buf []byte) string {
return "\033[91m" + string(buf) + "\033[0m"
}}
}

View File

@@ -1,35 +0,0 @@
package streamformatter
import (
"bytes"
"testing"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
func TestStreamWriterStdout(t *testing.T) {
buffer := &bytes.Buffer{}
content := "content"
sw := NewStdoutWriter(buffer)
size, err := sw.Write([]byte(content))
assert.NilError(t, err)
assert.Check(t, is.Equal(len(content), size))
expected := `{"stream":"content"}` + streamNewline
assert.Check(t, is.Equal(expected, buffer.String()))
}
func TestStreamWriterStderr(t *testing.T) {
buffer := &bytes.Buffer{}
content := "content"
sw := NewStderrWriter(buffer)
size, err := sw.Write([]byte(content))
assert.NilError(t, err)
assert.Check(t, is.Equal(len(content), size))
expected := `{"stream":"\u001b[91mcontent\u001b[0m"}` + streamNewline
assert.Check(t, is.Equal(expected, buffer.String()))
}