Merge pull request #51156 from corhere/client-pkg-jsonmessage

client/pkg/jsonmessage: refactor in terms of `api/types/jsonstream`
This commit is contained in:
Paweł Gronowski
2025-10-30 19:33:21 +01:00
committed by GitHub
17 changed files with 110 additions and 188 deletions

View File

@@ -9,13 +9,13 @@ import (
cerrdefs "github.com/containerd/errdefs"
"github.com/distribution/reference"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/client/internal"
"github.com/moby/moby/client/pkg/jsonmessage"
)
type ImagePullResponse interface {
io.ReadCloser
JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error]
JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error]
Wait(ctx context.Context) error
}

View File

@@ -10,9 +10,9 @@ import (
"time"
cerrdefs "github.com/containerd/errdefs"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/api/types/registry"
"github.com/moby/moby/client/internal"
"github.com/moby/moby/client/pkg/jsonmessage"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
@@ -193,7 +193,7 @@ func TestImagePullResponse(t *testing.T) {
response := internal.NewJSONMessageStream(r)
ctx, cancel := context.WithCancel(t.Context())
messages := response.JSONMessages(ctx)
c := make(chan jsonmessage.JSONMessage)
c := make(chan jsonstream.Message)
go func() {
for message, err := range messages {
if err != nil {

View File

@@ -12,14 +12,14 @@ import (
cerrdefs "github.com/containerd/errdefs"
"github.com/distribution/reference"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/api/types/registry"
"github.com/moby/moby/client/internal"
"github.com/moby/moby/client/pkg/jsonmessage"
)
type ImagePushResponse interface {
io.ReadCloser
JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error]
JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error]
Wait(ctx context.Context) error
}

View File

@@ -8,7 +8,7 @@ import (
"iter"
"sync"
"github.com/moby/moby/client/pkg/jsonmessage"
"github.com/moby/moby/api/types/jsonstream"
)
func NewJSONMessageStream(rc io.ReadCloser) stream {
@@ -44,15 +44,15 @@ func (r stream) Close() error {
// JSONMessages decodes the response stream as a sequence of JSONMessages.
// if stream ends or context is cancelled, the underlying [io.Reader] is closed.
func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error] {
func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error] {
context.AfterFunc(ctx, func() {
_ = r.Close()
})
dec := json.NewDecoder(r)
return func(yield func(jsonmessage.JSONMessage, error) bool) {
return func(yield func(jsonstream.Message, error) bool) {
defer r.Close()
for {
var jm jsonmessage.JSONMessage
var jm jsonstream.Message
err := dec.Decode(&jm)
if errors.Is(err, io.EOF) {
break

View File

@@ -14,28 +14,14 @@ import (
"github.com/moby/term"
)
var timeNow = time.Now // For overriding in tests.
// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
// ensure the formatted time isalways the same number of characters.
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
// JSONProgress describes a progress message in a JSON stream.
type JSONProgress struct {
jsonstream.Progress
// terminalFd is the fd of the current terminal, if any. It is used
// to get the terminal width.
terminalFd uintptr
// nowFunc is used to override the current time in tests.
nowFunc func() time.Time
// winSize is used to override the terminal width in tests.
winSize int
}
func (p *JSONProgress) String() string {
func RenderTUIProgress(p jsonstream.Progress, width uint16) string {
var (
width = p.width()
pbBox string
numbersBox string
)
@@ -89,7 +75,7 @@ func (p *JSONProgress) String() string {
var timeLeftBox string
if width > 50 {
if p.Current > 0 && p.Start > 0 && percentage < 50 {
fromStart := p.now().Sub(time.Unix(p.Start, 0))
fromStart := timeNow().UTC().Sub(time.Unix(p.Start, 0))
perEntry := fromStart / time.Duration(p.Current)
left := time.Duration(p.Total-p.Current) * perEntry
timeLeftBox = " " + left.Round(time.Second).String()
@@ -98,40 +84,6 @@ func (p *JSONProgress) String() string {
return pbBox + numbersBox + timeLeftBox
}
// now returns the current time in UTC, but can be overridden in tests
// by setting JSONProgress.nowFunc to a custom function.
func (p *JSONProgress) now() time.Time {
if p.nowFunc != nil {
return p.nowFunc()
}
return time.Now().UTC()
}
// width returns the current terminal's width, but can be overridden
// in tests by setting JSONProgress.winSize to a non-zero value.
func (p *JSONProgress) width() int {
if p.winSize != 0 {
return p.winSize
}
ws, err := term.GetWinsize(p.terminalFd)
if err == nil {
return int(ws.Width)
}
return 200
}
// JSONMessage defines a message struct. It describes
// the created time, where it from, status, ID of the
// message. It's used for docker events.
type JSONMessage struct {
Stream string `json:"stream,omitempty"`
Status string `json:"status,omitempty"`
Progress *JSONProgress `json:"progressDetail,omitempty"`
ID string `json:"id,omitempty"`
Error *jsonstream.Error `json:"errorDetail,omitempty"`
Aux *json.RawMessage `json:"aux,omitempty"` // Aux contains out-of-band data, such as digests for push signing and image id after building.
}
// We can probably use [aec.EmptyBuilder] for managing the output, but
// currently we're doing it all manually, so defining some consts for
// the basics we use.
@@ -164,7 +116,7 @@ func cursorDown(out io.Writer, l uint) {
// Display prints the JSONMessage to out. If isTerminal is true, it erases
// the entire current line when displaying the progressbar. It returns an
// error if the [JSONMessage.Error] field is non-nil.
func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
func Display(jm jsonstream.Message, out io.Writer, isTerminal bool, width uint16) error {
if jm.Error != nil {
return jm.Error
}
@@ -173,14 +125,17 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
clearLine(out)
endl = "\r"
_, _ = fmt.Fprint(out, endl)
} else if jm.Progress != nil && jm.Progress.String() != "" { // disable progressbar in non-terminal
} else if jm.Progress != nil && (jm.Progress.Current > 0 || jm.Progress.Total > 0) { // disable progressbar in non-terminal
return nil
}
if jm.ID != "" {
_, _ = fmt.Fprintf(out, "%s: ", jm.ID)
}
if jm.Progress != nil && isTerminal {
_, _ = fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl)
if width == 0 {
width = 200
}
_, _ = fmt.Fprintf(out, "%s %s%s", jm.Status, RenderTUIProgress(*jm.Progress, width), endl)
} else if jm.Stream != "" {
_, _ = fmt.Fprintf(out, "%s%s", jm.Stream, endl)
} else {
@@ -189,16 +144,16 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
return nil
}
type JSONMessagesStream iter.Seq2[JSONMessage, error]
type JSONMessagesStream iter.Seq2[jsonstream.Message, error]
// DisplayJSONMessagesStream reads a JSON message stream from in, and writes
// each [JSONMessage] to out.
// see DisplayJSONMessages for details
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
dec := json.NewDecoder(in)
var f JSONMessagesStream = func(yield func(JSONMessage, error) bool) {
var f JSONMessagesStream = func(yield func(jsonstream.Message, error) bool) {
for {
var jm JSONMessage
var jm jsonstream.Message
err := dec.Decode(&jm)
if errors.Is(err, io.EOF) {
break
@@ -228,8 +183,15 @@ func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr,
// - auxCallback allows handling the [JSONMessage.Aux] field. It is
// called if a JSONMessage contains an Aux field, in which case
// DisplayJSONMessagesStream does not present the JSONMessage.
func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
ids := make(map[string]uint)
var width uint16 = 200
if isTerminal {
ws, err := term.GetWinsize(terminalFd)
if err == nil {
width = ws.Width
}
}
for jm, err := range messages {
var diff uint
@@ -244,9 +206,6 @@ func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd
continue
}
if jm.Progress != nil {
jm.Progress.terminalFd = terminalFd
}
if jm.ID != "" && jm.Progress != nil {
line, ok := ids[jm.ID]
if !ok {
@@ -274,7 +233,7 @@ func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd
// with multiple tags).
ids = make(map[string]uint)
}
err := jm.Display(out, isTerminal)
err := Display(jm, out, isTerminal, width)
if jm.ID != "" && isTerminal {
cursorDown(out, diff)
}

View File

@@ -32,27 +32,26 @@ func TestProgressString(t *testing.T) {
testcases := []struct {
name string
progress JSONProgress
progress jsonstream.Progress
expected expected
nowFunc func() time.Time
}{
{
name: "no progress",
},
{
name: "progress 1",
progress: JSONProgress{Progress: jsonstream.Progress{Current: 1}},
progress: jsonstream.Progress{Current: 1},
expected: shortAndLong(" 1B", " 1B"),
},
{
name: "some progress with a start time",
progress: JSONProgress{
Progress: jsonstream.Progress{
progress: jsonstream.Progress{
Current: 20,
Total: 100,
Start: start.Unix(),
},
nowFunc: timeAfter(time.Second),
},
expected: shortAndLong(
" 20B/100B 4s",
"[==========> ] 20B/100B 4s",
@@ -60,7 +59,7 @@ func TestProgressString(t *testing.T) {
},
{
name: "some progress without a start time",
progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 100}},
progress: jsonstream.Progress{Current: 50, Total: 100},
expected: shortAndLong(
" 50B/100B",
"[=========================> ] 50B/100B",
@@ -68,7 +67,7 @@ func TestProgressString(t *testing.T) {
},
{
name: "current more than total is not negative gh#7136",
progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 40}},
progress: jsonstream.Progress{Current: 50, Total: 40},
expected: shortAndLong(
" 50B",
"[==================================================>] 50B",
@@ -76,7 +75,7 @@ func TestProgressString(t *testing.T) {
},
{
name: "with units",
progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 100, Units: "units"}},
progress: jsonstream.Progress{Current: 50, Total: 100, Units: "units"},
expected: shortAndLong(
"50/100 units",
"[=========================> ] 50/100 units",
@@ -84,7 +83,7 @@ func TestProgressString(t *testing.T) {
},
{
name: "current more than total with units is not negative ",
progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 40, Units: "units"}},
progress: jsonstream.Progress{Current: 50, Total: 40, Units: "units"},
expected: shortAndLong(
"50 units",
"[==================================================>] 50 units",
@@ -92,7 +91,7 @@ func TestProgressString(t *testing.T) {
},
{
name: "hide counts",
progress: JSONProgress{Progress: jsonstream.Progress{Current: 50, Total: 100, HideCounts: true}},
progress: jsonstream.Progress{Current: 50, Total: 100, HideCounts: true},
expected: shortAndLong(
"",
"[=========================> ] ",
@@ -102,17 +101,19 @@ func TestProgressString(t *testing.T) {
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
testcase.progress.winSize = 100
assert.Equal(t, testcase.progress.String(), testcase.expected.short)
testcase.progress.winSize = 200
assert.Equal(t, testcase.progress.String(), testcase.expected.long)
if testcase.nowFunc != nil {
originalTimeNow := timeNow
timeNow = testcase.nowFunc
defer func() { timeNow = originalTimeNow }()
}
assert.Equal(t, RenderTUIProgress(testcase.progress, 100), testcase.expected.short)
assert.Equal(t, RenderTUIProgress(testcase.progress, 200), testcase.expected.long)
})
}
}
func TestJSONMessageDisplay(t *testing.T) {
messages := map[JSONMessage][]string{
messages := map[jsonstream.Message][]string{
// Empty
{}: {"\n", "\n"},
// Status
@@ -142,7 +143,7 @@ func TestJSONMessageDisplay(t *testing.T) {
{
Status: "status",
Stream: "",
Progress: &JSONProgress{Progress: jsonstream.Progress{Current: 1}},
Progress: &jsonstream.Progress{Current: 1},
}: {
"",
fmt.Sprintf("%c[2K\rstatus 1B\r", 27),
@@ -153,7 +154,7 @@ func TestJSONMessageDisplay(t *testing.T) {
for jsonMessage, expectedMessages := range messages {
// Without terminal
data := bytes.NewBuffer([]byte{})
if err := jsonMessage.Display(data, false); err != nil {
if err := Display(jsonMessage, data, false, 0); err != nil {
t.Fatal(err)
}
if data.String() != expectedMessages[0] {
@@ -161,7 +162,7 @@ func TestJSONMessageDisplay(t *testing.T) {
}
// With terminal
data = bytes.NewBuffer([]byte{})
if err := jsonMessage.Display(data, true); err != nil {
if err := Display(jsonMessage, data, true, 0); err != nil {
t.Fatal(err)
}
if data.String() != expectedMessages[1] {
@@ -173,15 +174,15 @@ func TestJSONMessageDisplay(t *testing.T) {
// Test JSONMessage with an Error. It returns an error with the given text, not the meaning of the HTTP code.
func TestJSONMessageDisplayWithJSONError(t *testing.T) {
data := bytes.NewBuffer([]byte{})
jsonMessage := JSONMessage{Error: &jsonstream.Error{Code: 404, Message: "Can't find it"}}
jsonMessage := jsonstream.Message{Error: &jsonstream.Error{Code: 404, Message: "Can't find it"}}
err := jsonMessage.Display(data, true)
err := Display(jsonMessage, data, true, 0)
if err == nil || err.Error() != "Can't find it" {
t.Fatalf("Expected a jsonstream.Error 404, got %q", err)
}
jsonMessage = JSONMessage{Error: &jsonstream.Error{Code: 401, Message: "Anything"}}
err = jsonMessage.Display(data, true)
jsonMessage = jsonstream.Message{Error: &jsonstream.Error{Code: 401, Message: "Anything"}}
err = Display(jsonMessage, data, true, 0)
assert.Check(t, is.Error(err, "Anything"))
}

View File

@@ -7,8 +7,8 @@ import (
"strings"
"testing"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/client"
"github.com/moby/moby/client/pkg/jsonmessage"
"github.com/moby/moby/v2/integration/internal/requirement"
"github.com/moby/moby/v2/internal/testutil"
"github.com/moby/moby/v2/internal/testutil/daemon"
@@ -23,7 +23,7 @@ func getCgroupFromBuildOutput(buildOutput io.Reader) (string, error) {
dec := json.NewDecoder(buildOutput)
for {
m := jsonmessage.JSONMessage{}
m := jsonstream.Message{}
err := dec.Decode(&m)
if err == io.EOF {
return "", nil

View File

@@ -18,8 +18,8 @@ import (
"github.com/moby/moby/api/types/build"
"github.com/moby/moby/api/types/events"
"github.com/moby/moby/api/types/image"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/client"
"github.com/moby/moby/client/pkg/jsonmessage"
"github.com/moby/moby/v2/internal/testutil"
"github.com/moby/moby/v2/internal/testutil/fakecontext"
"gotest.tools/v3/assert"
@@ -127,7 +127,7 @@ func buildContainerIdsFilter(buildOutput io.Reader) (client.Filters, error) {
dec := json.NewDecoder(buildOutput)
for {
m := jsonmessage.JSONMessage{}
m := jsonstream.Message{}
err := dec.Decode(&m)
if err == io.EOF {
return filter, nil
@@ -813,7 +813,7 @@ func readBuildImageIDs(t *testing.T, rd io.Reader) string {
t.Helper()
decoder := json.NewDecoder(rd)
for {
var jm jsonmessage.JSONMessage
var jm jsonstream.Message
if err := decoder.Decode(&jm); err != nil {
if err == io.EOF {
break

View File

@@ -15,6 +15,7 @@ import (
cerrdefs "github.com/containerd/errdefs"
"github.com/moby/go-archive"
"github.com/moby/moby/api/types/build"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/client"
"github.com/moby/moby/client/pkg/jsonmessage"
"github.com/moby/moby/v2/integration/internal/container"
@@ -214,7 +215,7 @@ func makeTestImage(ctx context.Context, t *testing.T) (imageID string) {
assert.NilError(t, err)
defer resp.Body.Close()
err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, func(msg jsonmessage.JSONMessage) {
err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, func(msg jsonstream.Message) {
var r build.Result
assert.NilError(t, json.Unmarshal(*msg.Aux, &r))
imageID = r.ID
@@ -289,7 +290,7 @@ func TestCopyFromContainer(t *testing.T) {
defer resp.Body.Close()
var imageID string
err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, func(msg jsonmessage.JSONMessage) {
err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, func(msg jsonstream.Message) {
var r build.Result
assert.NilError(t, json.Unmarshal(*msg.Aux, &r))
imageID = r.ID

View File

@@ -5,8 +5,8 @@ import (
"strings"
"testing"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/client"
"github.com/moby/moby/client/pkg/jsonmessage"
"github.com/moby/moby/v2/integration/internal/container"
"github.com/moby/moby/v2/internal/testutil"
"github.com/moby/moby/v2/internal/testutil/daemon"
@@ -39,7 +39,7 @@ func TestExportContainerAndImportImage(t *testing.T) {
// the image ID and match with the output from `docker images`.
dec := json.NewDecoder(importRes)
var jm jsonmessage.JSONMessage
var jm jsonstream.Message
err = dec.Decode(&jm)
assert.NilError(t, err)

View File

@@ -10,6 +10,7 @@ import (
"github.com/containerd/containerd/v2/pkg/protobuf/proto"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/moby/api/types/build"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/client"
"github.com/moby/moby/client/pkg/jsonmessage"
"github.com/moby/moby/v2/internal/testutil/fakecontext"
@@ -36,7 +37,7 @@ func GetImageIDFromBody(t *testing.T, body io.Reader) string {
buf := bytes.NewBuffer(nil)
dec := json.NewDecoder(body)
for {
var jm jsonmessage.JSONMessage
var jm jsonstream.Message
err := dec.Decode(&jm)
if err == io.EOF {
break
@@ -48,7 +49,7 @@ func GetImageIDFromBody(t *testing.T, body io.Reader) string {
}
buf.Reset()
jm.Display(buf, false)
jsonmessage.Display(jm, buf, false, 0)
if buf.Len() == 0 {
continue
}
@@ -76,7 +77,7 @@ func GetImageIDFromBody(t *testing.T, body io.Reader) string {
return id
}
func processBuildkitAux(t *testing.T, jm *jsonmessage.JSONMessage, id *string) bool {
func processBuildkitAux(t *testing.T, jm *jsonstream.Message, id *string) bool {
if jm.ID == "moby.buildkit.trace" {
var dt []byte
if err := json.Unmarshal(*jm.Aux, &dt); err != nil {

View File

@@ -10,8 +10,8 @@ import (
"testing"
"github.com/moby/go-archive"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/client"
"github.com/moby/moby/client/pkg/jsonmessage"
"github.com/moby/moby/v2/internal/testutil/specialimage"
"gotest.tools/v3/assert"
)
@@ -46,7 +46,7 @@ func Load(ctx context.Context, t *testing.T, apiClient client.APIClient, imageFu
decoder := json.NewDecoder(bytes.NewReader(all))
for {
var msg jsonmessage.JSONMessage
var msg jsonstream.Message
err := decoder.Decode(&msg)
if errors.Is(err, io.EOF) {
break

View File

@@ -16,6 +16,7 @@ import (
c8dimages "github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes/docker"
"github.com/moby/moby/api/types"
"github.com/moby/moby/api/types/jsonstream"
registrytypes "github.com/moby/moby/api/types/registry"
"github.com/moby/moby/api/types/system"
"github.com/moby/moby/client"
@@ -142,7 +143,7 @@ func TestPluginInstall(t *testing.T) {
buf := &strings.Builder{}
assert.NilError(t, err)
var digest string
assert.NilError(t, jsonmessage.DisplayJSONMessagesStream(pushResult, buf, 0, false, func(j jsonmessage.JSONMessage) {
assert.NilError(t, jsonmessage.DisplayJSONMessagesStream(pushResult, buf, 0, false, func(j jsonstream.Message) {
if j.Aux != nil {
var r types.PushResult
assert.NilError(t, json.Unmarshal(*j.Aux, &r))

View File

@@ -9,13 +9,13 @@ import (
cerrdefs "github.com/containerd/errdefs"
"github.com/distribution/reference"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/client/internal"
"github.com/moby/moby/client/pkg/jsonmessage"
)
type ImagePullResponse interface {
io.ReadCloser
JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error]
JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error]
Wait(ctx context.Context) error
}

View File

@@ -12,14 +12,14 @@ import (
cerrdefs "github.com/containerd/errdefs"
"github.com/distribution/reference"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/api/types/registry"
"github.com/moby/moby/client/internal"
"github.com/moby/moby/client/pkg/jsonmessage"
)
type ImagePushResponse interface {
io.ReadCloser
JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error]
JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error]
Wait(ctx context.Context) error
}

View File

@@ -8,7 +8,7 @@ import (
"iter"
"sync"
"github.com/moby/moby/client/pkg/jsonmessage"
"github.com/moby/moby/api/types/jsonstream"
)
func NewJSONMessageStream(rc io.ReadCloser) stream {
@@ -44,15 +44,15 @@ func (r stream) Close() error {
// JSONMessages decodes the response stream as a sequence of JSONMessages.
// if stream ends or context is cancelled, the underlying [io.Reader] is closed.
func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error] {
func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error] {
context.AfterFunc(ctx, func() {
_ = r.Close()
})
dec := json.NewDecoder(r)
return func(yield func(jsonmessage.JSONMessage, error) bool) {
return func(yield func(jsonstream.Message, error) bool) {
defer r.Close()
for {
var jm jsonmessage.JSONMessage
var jm jsonstream.Message
err := dec.Decode(&jm)
if errors.Is(err, io.EOF) {
break

View File

@@ -14,28 +14,14 @@ import (
"github.com/moby/term"
)
var timeNow = time.Now // For overriding in tests.
// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
// ensure the formatted time isalways the same number of characters.
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
// JSONProgress describes a progress message in a JSON stream.
type JSONProgress struct {
jsonstream.Progress
// terminalFd is the fd of the current terminal, if any. It is used
// to get the terminal width.
terminalFd uintptr
// nowFunc is used to override the current time in tests.
nowFunc func() time.Time
// winSize is used to override the terminal width in tests.
winSize int
}
func (p *JSONProgress) String() string {
func RenderTUIProgress(p jsonstream.Progress, width uint16) string {
var (
width = p.width()
pbBox string
numbersBox string
)
@@ -89,7 +75,7 @@ func (p *JSONProgress) String() string {
var timeLeftBox string
if width > 50 {
if p.Current > 0 && p.Start > 0 && percentage < 50 {
fromStart := p.now().Sub(time.Unix(p.Start, 0))
fromStart := timeNow().UTC().Sub(time.Unix(p.Start, 0))
perEntry := fromStart / time.Duration(p.Current)
left := time.Duration(p.Total-p.Current) * perEntry
timeLeftBox = " " + left.Round(time.Second).String()
@@ -98,40 +84,6 @@ func (p *JSONProgress) String() string {
return pbBox + numbersBox + timeLeftBox
}
// now returns the current time in UTC, but can be overridden in tests
// by setting JSONProgress.nowFunc to a custom function.
func (p *JSONProgress) now() time.Time {
if p.nowFunc != nil {
return p.nowFunc()
}
return time.Now().UTC()
}
// width returns the current terminal's width, but can be overridden
// in tests by setting JSONProgress.winSize to a non-zero value.
func (p *JSONProgress) width() int {
if p.winSize != 0 {
return p.winSize
}
ws, err := term.GetWinsize(p.terminalFd)
if err == nil {
return int(ws.Width)
}
return 200
}
// JSONMessage defines a message struct. It describes
// the created time, where it from, status, ID of the
// message. It's used for docker events.
type JSONMessage struct {
Stream string `json:"stream,omitempty"`
Status string `json:"status,omitempty"`
Progress *JSONProgress `json:"progressDetail,omitempty"`
ID string `json:"id,omitempty"`
Error *jsonstream.Error `json:"errorDetail,omitempty"`
Aux *json.RawMessage `json:"aux,omitempty"` // Aux contains out-of-band data, such as digests for push signing and image id after building.
}
// We can probably use [aec.EmptyBuilder] for managing the output, but
// currently we're doing it all manually, so defining some consts for
// the basics we use.
@@ -164,7 +116,7 @@ func cursorDown(out io.Writer, l uint) {
// Display prints the JSONMessage to out. If isTerminal is true, it erases
// the entire current line when displaying the progressbar. It returns an
// error if the [JSONMessage.Error] field is non-nil.
func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
func Display(jm jsonstream.Message, out io.Writer, isTerminal bool, width uint16) error {
if jm.Error != nil {
return jm.Error
}
@@ -173,14 +125,17 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
clearLine(out)
endl = "\r"
_, _ = fmt.Fprint(out, endl)
} else if jm.Progress != nil && jm.Progress.String() != "" { // disable progressbar in non-terminal
} else if jm.Progress != nil && (jm.Progress.Current > 0 || jm.Progress.Total > 0) { // disable progressbar in non-terminal
return nil
}
if jm.ID != "" {
_, _ = fmt.Fprintf(out, "%s: ", jm.ID)
}
if jm.Progress != nil && isTerminal {
_, _ = fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl)
if width == 0 {
width = 200
}
_, _ = fmt.Fprintf(out, "%s %s%s", jm.Status, RenderTUIProgress(*jm.Progress, width), endl)
} else if jm.Stream != "" {
_, _ = fmt.Fprintf(out, "%s%s", jm.Stream, endl)
} else {
@@ -189,16 +144,16 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
return nil
}
type JSONMessagesStream iter.Seq2[JSONMessage, error]
type JSONMessagesStream iter.Seq2[jsonstream.Message, error]
// DisplayJSONMessagesStream reads a JSON message stream from in, and writes
// each [JSONMessage] to out.
// see DisplayJSONMessages for details
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
dec := json.NewDecoder(in)
var f JSONMessagesStream = func(yield func(JSONMessage, error) bool) {
var f JSONMessagesStream = func(yield func(jsonstream.Message, error) bool) {
for {
var jm JSONMessage
var jm jsonstream.Message
err := dec.Decode(&jm)
if errors.Is(err, io.EOF) {
break
@@ -228,8 +183,15 @@ func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr,
// - auxCallback allows handling the [JSONMessage.Aux] field. It is
// called if a JSONMessage contains an Aux field, in which case
// DisplayJSONMessagesStream does not present the JSONMessage.
func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
ids := make(map[string]uint)
var width uint16 = 200
if isTerminal {
ws, err := term.GetWinsize(terminalFd)
if err == nil {
width = ws.Width
}
}
for jm, err := range messages {
var diff uint
@@ -244,9 +206,6 @@ func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd
continue
}
if jm.Progress != nil {
jm.Progress.terminalFd = terminalFd
}
if jm.ID != "" && jm.Progress != nil {
line, ok := ids[jm.ID]
if !ok {
@@ -274,7 +233,7 @@ func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd
// with multiple tags).
ids = make(map[string]uint)
}
err := jm.Display(out, isTerminal)
err := Display(jm, out, isTerminal, width)
if jm.ID != "" && isTerminal {
cursorDown(out, diff)
}