diff --git a/client/client_interfaces.go b/client/client_interfaces.go index a2da96536a..9d2f4ea2ed 100644 --- a/client/client_interfaces.go +++ b/client/client_interfaces.go @@ -113,7 +113,7 @@ type ImageAPIClient interface { ImageImport(ctx context.Context, source ImageImportSource, ref string, options ImageImportOptions) (io.ReadCloser, error) ImageList(ctx context.Context, options ImageListOptions) ([]image.Summary, error) - ImagePull(ctx context.Context, ref string, options ImagePullOptions) (io.ReadCloser, error) + ImagePull(ctx context.Context, ref string, options ImagePullOptions) (ImagePullResponse, error) ImagePush(ctx context.Context, ref string, options ImagePushOptions) (io.ReadCloser, error) ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]image.DeleteResponse, error) ImageSearch(ctx context.Context, term string, options ImageSearchOptions) ([]registry.SearchResult, error) diff --git a/client/image_pull.go b/client/image_pull.go index 6cefaa9479..ffe27a365e 100644 --- a/client/image_pull.go +++ b/client/image_pull.go @@ -2,19 +2,84 @@ package client import ( "context" + "encoding/json" + "errors" "io" + "iter" "net/url" "strings" + "sync" cerrdefs "github.com/containerd/errdefs" "github.com/distribution/reference" + "github.com/moby/moby/client/pkg/jsonmessage" ) +func newImagePullResponse(rc io.ReadCloser) ImagePullResponse { + return ImagePullResponse{ + rc: rc, + close: &sync.Once{}, + } +} + +type ImagePullResponse struct { + rc io.ReadCloser + close *sync.Once +} + +// Read implements io.ReadCloser +func (r ImagePullResponse) Read(p []byte) (n int, err error) { + return r.rc.Read(p) +} + +// Close implements io.ReadCloser +func (r ImagePullResponse) Close() error { + if r.close == nil { + return nil + } + var err error + r.close.Do(func() { + if r.rc != nil { + err = r.rc.Close() + } + }) + return err +} + +// JSONMessages decodes the response stream as a sequence of JSONMessages. +// if stream ends or context is cancelled, the underlying [io.Reader] is closed. +func (r ImagePullResponse) JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error] { + context.AfterFunc(ctx, func() { + r.Close() + }) + dec := json.NewDecoder(r) + return func(yield func(jsonmessage.JSONMessage, error) bool) { + defer r.Close() + for { + var jm jsonmessage.JSONMessage + err := dec.Decode(&jm) + if errors.Is(err, io.EOF) { + break + } + if ctx.Err() != nil { + yield(jm, ctx.Err()) + return + } + if !yield(jm, err) { + return + } + } + } +} + // ImagePull requests the docker host to pull an image from a remote registry. // It executes the privileged function if the operation is unauthorized // and it tries one more time. -// It's up to the caller to handle the [io.ReadCloser] and close it. -func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePullOptions) (io.ReadCloser, error) { +// Callers can use [ImagePullResponse.JSONMessages] to monitor pull progress as +// a sequence of JSONMessages, [ImagePullResponse.Close] does not need to be +// called in this case. Or, use the [io.Reader] interface and call +// [ImagePullResponse.Close] after processing. +func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePullOptions) (ImagePullResponse, error) { // FIXME(vdemeester): there is currently used in a few way in docker/docker // - if not in trusted content, ref is used to pass the whole reference, and tag is empty // - if in trusted content, ref is used to pass the reference name, and tag for the digest @@ -23,7 +88,7 @@ func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePu ref, err := reference.ParseNormalizedNamed(refStr) if err != nil { - return nil, err + return ImagePullResponse{}, err } query := url.Values{} @@ -40,9 +105,10 @@ func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePu resp, err = cli.tryImageCreate(ctx, query, options.PrivilegeFunc) } if err != nil { - return nil, err + return ImagePullResponse{}, err } - return resp.Body, nil + + return newImagePullResponse(resp.Body), nil } // getAPITagFromNamedRef returns a tag from the specified reference. diff --git a/client/image_pull_opts.go b/client/image_pull_opts.go index 618540902c..3f1042a888 100644 --- a/client/image_pull_opts.go +++ b/client/image_pull_opts.go @@ -1,6 +1,8 @@ package client -import "context" +import ( + "context" +) // ImagePullOptions holds information to pull images. type ImagePullOptions struct { diff --git a/client/image_pull_test.go b/client/image_pull_test.go index 7878c15fad..7f3955cc36 100644 --- a/client/image_pull_test.go +++ b/client/image_pull_test.go @@ -8,9 +8,11 @@ import ( "io" "net/http" "testing" + "time" cerrdefs "github.com/containerd/errdefs" "github.com/moby/moby/api/types/registry" + "github.com/moby/moby/client/pkg/jsonmessage" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" ) @@ -194,3 +196,44 @@ func TestImagePullWithoutErrors(t *testing.T) { }) } } + +func TestImagePullResponse(t *testing.T) { + r, w := io.Pipe() + response := newImagePullResponse(r) + ctx, cancel := context.WithCancel(context.TODO()) + messages := response.JSONMessages(ctx) + c := make(chan jsonmessage.JSONMessage) + go func() { + for message, err := range messages { + if err != nil { + close(c) + break + } + c <- message + } + }() + + // Check we receive message sent to json stream + w.Write([]byte(`{"id":"test"}`)) + tiemout, _ := context.WithTimeout(context.TODO(), 100*time.Millisecond) + select { + case message := <-c: + assert.Equal(t, message.ID, "test") + case <-tiemout.Done(): + t.Fatal("expected message not received") + } + + // Check context cancelation + cancel() + tiemout, _ = context.WithTimeout(context.TODO(), 100*time.Millisecond) + select { + case _, ok := <-c: + assert.Check(t, !ok) + case <-tiemout.Done(): + t.Fatal("expected message not received") + } + + // Check Close can be ran twice without error + assert.NilError(t, response.Close()) + assert.NilError(t, response.Close()) +} diff --git a/client/pkg/jsonmessage/jsonmessage.go b/client/pkg/jsonmessage/jsonmessage.go index ff4b40a666..3820bcbaff 100644 --- a/client/pkg/jsonmessage/jsonmessage.go +++ b/client/pkg/jsonmessage/jsonmessage.go @@ -2,8 +2,10 @@ package jsonmessage import ( "encoding/json" + "errors" "fmt" "io" + "iter" "strings" "time" @@ -187,9 +189,32 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error { return nil } +type JSONMessagesStream iter.Seq2[JSONMessage, error] + // DisplayJSONMessagesStream reads a JSON message stream from in, and writes -// each [JSONMessage] to out. It returns an error if an invalid JSONMessage -// is received, or if a JSONMessage containers a non-zero [JSONMessage.Error]. +// each [JSONMessage] to out. +// see DisplayJSONMessages for details +func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error { + var dec = json.NewDecoder(in) + var f JSONMessagesStream = func(yield func(JSONMessage, error) bool) { + for { + var jm JSONMessage + err := dec.Decode(&jm) + if errors.Is(err, io.EOF) { + break + } + if !yield(jm, err) { + return + } + } + } + + return DisplayJSONMessages(f, out, terminalFd, isTerminal, auxCallback) +} + +// DisplayJSONMessages writes each [JSONMessage] from stream to out. +// It returns an error if an invalid JSONMessage is received, or if +// a JSONMessage containers a non-zero [JSONMessage.Error]. // // Presentation of the JSONMessage depends on whether a terminal is attached, // and on the terminal width. Progress bars ([JSONProgress]) are suppressed @@ -203,19 +228,12 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error { // - auxCallback allows handling the [JSONMessage.Aux] field. It is // called if a JSONMessage contains an Aux field, in which case // DisplayJSONMessagesStream does not present the JSONMessage. -func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error { - var ( - dec = json.NewDecoder(in) - ids = make(map[string]uint) - ) +func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error { + var ids = make(map[string]uint) - for { + for jm, err := range messages { var diff uint - var jm JSONMessage - if err := dec.Decode(&jm); err != nil { - if err == io.EOF { - break - } + if err != nil { return err } diff --git a/integration/image/pull_test.go b/integration/image/pull_test.go index d3fada4d71..49a4732cbd 100644 --- a/integration/image/pull_test.go +++ b/integration/image/pull_test.go @@ -153,9 +153,7 @@ func TestImagePullStoredDigestForOtherRepo(t *testing.T) { // Now, pull a totally different repo with a the same digest rdr, err = apiClient.ImagePull(ctx, path.Join(registry.DefaultURL, "other:image@"+desc.Digest.String()), client.ImagePullOptions{}) - if rdr != nil { - assert.Check(t, rdr.Close()) - } + assert.Check(t, rdr.Close()) assert.Assert(t, err != nil, "Expected error, got none: %v", err) assert.Assert(t, cerrdefs.IsNotFound(err), err) assert.Check(t, is.ErrorType(err, cerrdefs.IsNotFound)) diff --git a/vendor/github.com/moby/moby/client/client_interfaces.go b/vendor/github.com/moby/moby/client/client_interfaces.go index a2da96536a..9d2f4ea2ed 100644 --- a/vendor/github.com/moby/moby/client/client_interfaces.go +++ b/vendor/github.com/moby/moby/client/client_interfaces.go @@ -113,7 +113,7 @@ type ImageAPIClient interface { ImageImport(ctx context.Context, source ImageImportSource, ref string, options ImageImportOptions) (io.ReadCloser, error) ImageList(ctx context.Context, options ImageListOptions) ([]image.Summary, error) - ImagePull(ctx context.Context, ref string, options ImagePullOptions) (io.ReadCloser, error) + ImagePull(ctx context.Context, ref string, options ImagePullOptions) (ImagePullResponse, error) ImagePush(ctx context.Context, ref string, options ImagePushOptions) (io.ReadCloser, error) ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]image.DeleteResponse, error) ImageSearch(ctx context.Context, term string, options ImageSearchOptions) ([]registry.SearchResult, error) diff --git a/vendor/github.com/moby/moby/client/image_pull.go b/vendor/github.com/moby/moby/client/image_pull.go index 6cefaa9479..ffe27a365e 100644 --- a/vendor/github.com/moby/moby/client/image_pull.go +++ b/vendor/github.com/moby/moby/client/image_pull.go @@ -2,19 +2,84 @@ package client import ( "context" + "encoding/json" + "errors" "io" + "iter" "net/url" "strings" + "sync" cerrdefs "github.com/containerd/errdefs" "github.com/distribution/reference" + "github.com/moby/moby/client/pkg/jsonmessage" ) +func newImagePullResponse(rc io.ReadCloser) ImagePullResponse { + return ImagePullResponse{ + rc: rc, + close: &sync.Once{}, + } +} + +type ImagePullResponse struct { + rc io.ReadCloser + close *sync.Once +} + +// Read implements io.ReadCloser +func (r ImagePullResponse) Read(p []byte) (n int, err error) { + return r.rc.Read(p) +} + +// Close implements io.ReadCloser +func (r ImagePullResponse) Close() error { + if r.close == nil { + return nil + } + var err error + r.close.Do(func() { + if r.rc != nil { + err = r.rc.Close() + } + }) + return err +} + +// JSONMessages decodes the response stream as a sequence of JSONMessages. +// if stream ends or context is cancelled, the underlying [io.Reader] is closed. +func (r ImagePullResponse) JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error] { + context.AfterFunc(ctx, func() { + r.Close() + }) + dec := json.NewDecoder(r) + return func(yield func(jsonmessage.JSONMessage, error) bool) { + defer r.Close() + for { + var jm jsonmessage.JSONMessage + err := dec.Decode(&jm) + if errors.Is(err, io.EOF) { + break + } + if ctx.Err() != nil { + yield(jm, ctx.Err()) + return + } + if !yield(jm, err) { + return + } + } + } +} + // ImagePull requests the docker host to pull an image from a remote registry. // It executes the privileged function if the operation is unauthorized // and it tries one more time. -// It's up to the caller to handle the [io.ReadCloser] and close it. -func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePullOptions) (io.ReadCloser, error) { +// Callers can use [ImagePullResponse.JSONMessages] to monitor pull progress as +// a sequence of JSONMessages, [ImagePullResponse.Close] does not need to be +// called in this case. Or, use the [io.Reader] interface and call +// [ImagePullResponse.Close] after processing. +func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePullOptions) (ImagePullResponse, error) { // FIXME(vdemeester): there is currently used in a few way in docker/docker // - if not in trusted content, ref is used to pass the whole reference, and tag is empty // - if in trusted content, ref is used to pass the reference name, and tag for the digest @@ -23,7 +88,7 @@ func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePu ref, err := reference.ParseNormalizedNamed(refStr) if err != nil { - return nil, err + return ImagePullResponse{}, err } query := url.Values{} @@ -40,9 +105,10 @@ func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePu resp, err = cli.tryImageCreate(ctx, query, options.PrivilegeFunc) } if err != nil { - return nil, err + return ImagePullResponse{}, err } - return resp.Body, nil + + return newImagePullResponse(resp.Body), nil } // getAPITagFromNamedRef returns a tag from the specified reference. diff --git a/vendor/github.com/moby/moby/client/image_pull_opts.go b/vendor/github.com/moby/moby/client/image_pull_opts.go index 618540902c..3f1042a888 100644 --- a/vendor/github.com/moby/moby/client/image_pull_opts.go +++ b/vendor/github.com/moby/moby/client/image_pull_opts.go @@ -1,6 +1,8 @@ package client -import "context" +import ( + "context" +) // ImagePullOptions holds information to pull images. type ImagePullOptions struct { diff --git a/vendor/github.com/moby/moby/client/pkg/jsonmessage/jsonmessage.go b/vendor/github.com/moby/moby/client/pkg/jsonmessage/jsonmessage.go index ff4b40a666..3820bcbaff 100644 --- a/vendor/github.com/moby/moby/client/pkg/jsonmessage/jsonmessage.go +++ b/vendor/github.com/moby/moby/client/pkg/jsonmessage/jsonmessage.go @@ -2,8 +2,10 @@ package jsonmessage import ( "encoding/json" + "errors" "fmt" "io" + "iter" "strings" "time" @@ -187,9 +189,32 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error { return nil } +type JSONMessagesStream iter.Seq2[JSONMessage, error] + // DisplayJSONMessagesStream reads a JSON message stream from in, and writes -// each [JSONMessage] to out. It returns an error if an invalid JSONMessage -// is received, or if a JSONMessage containers a non-zero [JSONMessage.Error]. +// each [JSONMessage] to out. +// see DisplayJSONMessages for details +func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error { + var dec = json.NewDecoder(in) + var f JSONMessagesStream = func(yield func(JSONMessage, error) bool) { + for { + var jm JSONMessage + err := dec.Decode(&jm) + if errors.Is(err, io.EOF) { + break + } + if !yield(jm, err) { + return + } + } + } + + return DisplayJSONMessages(f, out, terminalFd, isTerminal, auxCallback) +} + +// DisplayJSONMessages writes each [JSONMessage] from stream to out. +// It returns an error if an invalid JSONMessage is received, or if +// a JSONMessage containers a non-zero [JSONMessage.Error]. // // Presentation of the JSONMessage depends on whether a terminal is attached, // and on the terminal width. Progress bars ([JSONProgress]) are suppressed @@ -203,19 +228,12 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error { // - auxCallback allows handling the [JSONMessage.Aux] field. It is // called if a JSONMessage contains an Aux field, in which case // DisplayJSONMessagesStream does not present the JSONMessage. -func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error { - var ( - dec = json.NewDecoder(in) - ids = make(map[string]uint) - ) +func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error { + var ids = make(map[string]uint) - for { + for jm, err := range messages { var diff uint - var jm JSONMessage - if err := dec.Decode(&jm); err != nil { - if err == io.EOF { - break - } + if err != nil { return err }