diff --git a/client/internal/jsonmessages.go b/client/internal/jsonmessages.go index ebbb5faa39..03afc4e80a 100644 --- a/client/internal/jsonmessages.go +++ b/client/internal/jsonmessages.go @@ -45,12 +45,15 @@ func (r stream) Close() error { // JSONMessages decodes the response stream as a sequence of JSONMessages. // if stream ends or context is cancelled, the underlying [io.Reader] is closed. func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error] { - context.AfterFunc(ctx, func() { + stop := context.AfterFunc(ctx, func() { _ = r.Close() }) dec := json.NewDecoder(r) return func(yield func(jsonstream.Message, error) bool) { - defer r.Close() + defer func() { + stop() // unregister AfterFunc + r.Close() + }() for { var jm jsonstream.Message err := dec.Decode(&jm) diff --git a/client/utils.go b/client/utils.go index f2ba4744c4..4415e0dc5a 100644 --- a/client/utils.go +++ b/client/utils.go @@ -136,14 +136,19 @@ func newCancelReadCloser(ctx context.Context, rc io.ReadCloser) io.ReadCloser { rc: rc, close: sync.OnceValue(rc.Close), } - context.AfterFunc(ctx, func() { _ = crc.Close() }) + crc.stop = context.AfterFunc(ctx, func() { _ = crc.Close() }) return crc } type cancelReadCloser struct { rc io.ReadCloser close func() error + stop func() bool } func (c *cancelReadCloser) Read(p []byte) (int, error) { return c.rc.Read(p) } -func (c *cancelReadCloser) Close() error { return c.close() } + +func (c *cancelReadCloser) Close() error { + c.stop() // unregister AfterFunc + return c.close() +} diff --git a/vendor/github.com/moby/moby/client/internal/jsonmessages.go b/vendor/github.com/moby/moby/client/internal/jsonmessages.go index ebbb5faa39..03afc4e80a 100644 --- a/vendor/github.com/moby/moby/client/internal/jsonmessages.go +++ b/vendor/github.com/moby/moby/client/internal/jsonmessages.go @@ -45,12 +45,15 @@ func (r stream) Close() error { // JSONMessages decodes the response stream as a sequence of JSONMessages. // if stream ends or context is cancelled, the underlying [io.Reader] is closed. func (r stream) JSONMessages(ctx context.Context) iter.Seq2[jsonstream.Message, error] { - context.AfterFunc(ctx, func() { + stop := context.AfterFunc(ctx, func() { _ = r.Close() }) dec := json.NewDecoder(r) return func(yield func(jsonstream.Message, error) bool) { - defer r.Close() + defer func() { + stop() // unregister AfterFunc + r.Close() + }() for { var jm jsonstream.Message err := dec.Decode(&jm) diff --git a/vendor/github.com/moby/moby/client/utils.go b/vendor/github.com/moby/moby/client/utils.go index f2ba4744c4..4415e0dc5a 100644 --- a/vendor/github.com/moby/moby/client/utils.go +++ b/vendor/github.com/moby/moby/client/utils.go @@ -136,14 +136,19 @@ func newCancelReadCloser(ctx context.Context, rc io.ReadCloser) io.ReadCloser { rc: rc, close: sync.OnceValue(rc.Close), } - context.AfterFunc(ctx, func() { _ = crc.Close() }) + crc.stop = context.AfterFunc(ctx, func() { _ = crc.Close() }) return crc } type cancelReadCloser struct { rc io.ReadCloser close func() error + stop func() bool } func (c *cancelReadCloser) Read(p []byte) (int, error) { return c.rc.Read(p) } -func (c *cancelReadCloser) Close() error { return c.close() } + +func (c *cancelReadCloser) Close() error { + c.stop() // unregister AfterFunc + return c.close() +}