diff --git a/api/docs/CHANGELOG.md b/api/docs/CHANGELOG.md index 4512117bc5..f2312ff5d2 100644 --- a/api/docs/CHANGELOG.md +++ b/api/docs/CHANGELOG.md @@ -45,6 +45,8 @@ keywords: "API, Docker, rcli, REST, documentation" on API version `v1.52` and up. Older API versions still accept this field, but may take no effect, depending on the kernel version and OCI runtime in use. * Removed the `KernelMemoryTCP` field from the `GET /info` endpoint. +* `GET /events` supports content-type negotiation and can produce either `application/x-ndjson` + (Newline delimited JSON object stream) or `application/json-seq` (RFC7464). ## v1.51 API changes diff --git a/api/swagger.yaml b/api/swagger.yaml index 4664b3fca6..9d5c72514e 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -10235,7 +10235,8 @@ paths: operationId: "SystemEvents" produces: - - "application/json" + - "application/x-ndjson" + - "application/json-seq" responses: 200: description: "no error" diff --git a/api/types/types.go b/api/types/types.go index 3d00f354b6..32fbcc639f 100644 --- a/api/types/types.go +++ b/api/types/types.go @@ -11,6 +11,15 @@ const ( // MediaTypeMultiplexedStream is vendor specific MIME-Type set for stdin/stdout/stderr multiplexed streams MediaTypeMultiplexedStream = "application/vnd.docker.multiplexed-stream" + + // MediaTypeJSON is the MIME-Type for JSON objects + MediaTypeJSON = "application/json" + + // MediaTypeNDJson is the MIME-Type for Newline Delimited JSON objects streams + MediaTypeNDJSON = "application/x-ndjson" + + // MediaTypeJsonSequence is the MIME-Type for JSON Text Sequences (RFC7464) + MediaTypeJSONSequence = "application/json-seq" ) // Ping contains response of Engine API: diff --git a/client/internal/json-stream.go b/client/internal/json-stream.go new file mode 100644 index 0000000000..552978f9a1 --- /dev/null +++ b/client/internal/json-stream.go @@ -0,0 +1,50 @@ +package internal + +import ( + "encoding/json" + "io" + "slices" + + "github.com/moby/moby/api/types" +) + +const rs = 0x1E + +type DecoderFn func(v any) error + +// NewJSONStreamDecoder builds adequate DecoderFn to read json records formatted with specified content-type +func NewJSONStreamDecoder(r io.Reader, contentType string) DecoderFn { + switch contentType { + case types.MediaTypeJSONSequence: + return json.NewDecoder(NewRSFilterReader(r)).Decode + case types.MediaTypeJSON, types.MediaTypeNDJSON: + fallthrough + default: + return json.NewDecoder(r).Decode + } +} + +// RSFilterReader wraps an io.Reader and filters out ASCII RS characters +type RSFilterReader struct { + reader io.Reader + buffer []byte +} + +// NewRSFilterReader creates a new RSFilterReader that filters out RS characters +func NewRSFilterReader(r io.Reader) *RSFilterReader { + return &RSFilterReader{ + reader: r, + buffer: make([]byte, 4096), // Internal buffer for reading chunks + } +} + +// Read implements the io.Reader interface, filtering out RS characters +func (r *RSFilterReader) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + + n, err = r.reader.Read(p) + filtered := slices.DeleteFunc(p[:n], func(b byte) bool { return b == rs }) + return len(filtered), err +} diff --git a/client/internal/json-stream_test.go b/client/internal/json-stream_test.go new file mode 100644 index 0000000000..59174634f3 --- /dev/null +++ b/client/internal/json-stream_test.go @@ -0,0 +1,29 @@ +package internal + +import ( + "fmt" + "strings" + "testing" + + "github.com/moby/moby/api/types" + "gotest.tools/v3/assert" +) + +func Test_JsonSeqDecoder(t *testing.T) { + separator := string(rune(rs)) + lf := "\n" + input := fmt.Sprintf(`%s{"hello":"world"}%s%s{ "hello": "again" }%s`, separator, lf, separator, lf) + decoder := NewJSONStreamDecoder(strings.NewReader(input), types.MediaTypeJSONSequence) + type Hello struct { + Hello string `json:"hello"` + } + var hello Hello + err := decoder(&hello) + assert.NilError(t, err) + assert.Equal(t, "world", hello.Hello) + + var again Hello + err = decoder(&again) + assert.NilError(t, err) + assert.Equal(t, "again", again.Hello) +} diff --git a/client/system_events.go b/client/system_events.go index 93b12cdefd..bee7402b59 100644 --- a/client/system_events.go +++ b/client/system_events.go @@ -2,12 +2,14 @@ package client import ( "context" - "encoding/json" + "net/http" "net/url" "time" + "github.com/moby/moby/api/types" "github.com/moby/moby/api/types/events" "github.com/moby/moby/api/types/filters" + "github.com/moby/moby/client/internal" "github.com/moby/moby/client/internal/timestamp" ) @@ -37,7 +39,10 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha return } - resp, err := cli.get(ctx, "/events", query, nil) + headers := http.Header{} + headers.Add("Accept", types.MediaTypeJSONSequence) + headers.Add("Accept", types.MediaTypeNDJSON) + resp, err := cli.get(ctx, "/events", query, headers) if err != nil { close(started) errs <- err @@ -45,7 +50,8 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha } defer resp.Body.Close() - decoder := json.NewDecoder(resp.Body) + contentType := resp.Header.Get("Content-Type") + decoder := internal.NewJSONStreamDecoder(resp.Body, contentType) close(started) for { @@ -55,7 +61,7 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha return default: var event events.Message - if err := decoder.Decode(&event); err != nil { + if err := decoder(&event); err != nil { errs <- err return } diff --git a/daemon/server/httputils/json-seq.go b/daemon/server/httputils/json-seq.go new file mode 100644 index 0000000000..624247b2d4 --- /dev/null +++ b/daemon/server/httputils/json-seq.go @@ -0,0 +1,44 @@ +package httputils + +import ( + "encoding/json" + "io" + + "github.com/moby/moby/api/types" +) + +const rs = 0x1E + +type EncoderFn func(any) error + +// NewJSONStreamEncoder builds adequate EncoderFn to write json records using selected content-type formalism +func NewJSONStreamEncoder(w io.Writer, contentType string) EncoderFn { + jsonEncoder := json.NewEncoder(w) + switch contentType { + case types.MediaTypeJSONSequence: + jseq := &jsonSeq{ + w: w, + json: jsonEncoder, + } + return jseq.Encode + case types.MediaTypeNDJSON, types.MediaTypeJSON: + fallthrough + default: + return jsonEncoder.Encode + } +} + +type jsonSeq struct { + w io.Writer + json *json.Encoder +} + +// Encode prefixes every written record with an ASCII record separator. +func (js *jsonSeq) Encode(record any) error { + _, err := js.w.Write([]byte{rs}) + if err != nil { + return err + } + // JSON-seq also requires a LF character, bu json.Encoder already adds one + return js.json.Encode(record) +} diff --git a/daemon/server/router/system/system_routes.go b/daemon/server/router/system/system_routes.go index 6384c4f26f..3307ce59f5 100644 --- a/daemon/server/router/system/system_routes.go +++ b/daemon/server/router/system/system_routes.go @@ -8,7 +8,9 @@ import ( "time" "github.com/containerd/log" + "github.com/golang/gddo/httputil" "github.com/moby/moby/api/pkg/authconfig" + "github.com/moby/moby/api/types" buildtypes "github.com/moby/moby/api/types/build" "github.com/moby/moby/api/types/events" "github.com/moby/moby/api/types/filters" @@ -296,13 +298,17 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r * return err } - w.Header().Set("Content-Type", "application/json") + contentType := httputil.NegotiateContentType(r, []string{ + types.MediaTypeNDJSON, + types.MediaTypeJSONSequence, + }, types.MediaTypeJSON) // output isn't actually JSON but API used to this content-type + w.Header().Set("Content-Type", contentType) w.WriteHeader(http.StatusOK) output := ioutils.NewWriteFlusher(w) defer output.Close() output.Flush() - enc := json.NewEncoder(output) + encode := httputils.NewJSONStreamEncoder(output, contentType) buffered, l := s.backend.SubscribeToEvents(since, until, ef) defer s.backend.UnsubscribeFromEvents(l) @@ -325,12 +331,12 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r * continue } if includeLegacyFields { - if err := enc.Encode(backFillLegacy(&ev)); err != nil { + if err := encode(backFillLegacy(&ev)); err != nil { return err } continue } - if err := enc.Encode(ev); err != nil { + if err := encode(ev); err != nil { return err } } @@ -351,12 +357,12 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r * continue } if includeLegacyFields { - if err := enc.Encode(backFillLegacy(&jev)); err != nil { + if err := encode(backFillLegacy(&jev)); err != nil { return err } continue } - if err := enc.Encode(jev); err != nil { + if err := encode(jev); err != nil { return err } case <-timeout: diff --git a/vendor/github.com/moby/moby/api/types/types.go b/vendor/github.com/moby/moby/api/types/types.go index 3d00f354b6..32fbcc639f 100644 --- a/vendor/github.com/moby/moby/api/types/types.go +++ b/vendor/github.com/moby/moby/api/types/types.go @@ -11,6 +11,15 @@ const ( // MediaTypeMultiplexedStream is vendor specific MIME-Type set for stdin/stdout/stderr multiplexed streams MediaTypeMultiplexedStream = "application/vnd.docker.multiplexed-stream" + + // MediaTypeJSON is the MIME-Type for JSON objects + MediaTypeJSON = "application/json" + + // MediaTypeNDJson is the MIME-Type for Newline Delimited JSON objects streams + MediaTypeNDJSON = "application/x-ndjson" + + // MediaTypeJsonSequence is the MIME-Type for JSON Text Sequences (RFC7464) + MediaTypeJSONSequence = "application/json-seq" ) // Ping contains response of Engine API: diff --git a/vendor/github.com/moby/moby/client/internal/json-stream.go b/vendor/github.com/moby/moby/client/internal/json-stream.go new file mode 100644 index 0000000000..552978f9a1 --- /dev/null +++ b/vendor/github.com/moby/moby/client/internal/json-stream.go @@ -0,0 +1,50 @@ +package internal + +import ( + "encoding/json" + "io" + "slices" + + "github.com/moby/moby/api/types" +) + +const rs = 0x1E + +type DecoderFn func(v any) error + +// NewJSONStreamDecoder builds adequate DecoderFn to read json records formatted with specified content-type +func NewJSONStreamDecoder(r io.Reader, contentType string) DecoderFn { + switch contentType { + case types.MediaTypeJSONSequence: + return json.NewDecoder(NewRSFilterReader(r)).Decode + case types.MediaTypeJSON, types.MediaTypeNDJSON: + fallthrough + default: + return json.NewDecoder(r).Decode + } +} + +// RSFilterReader wraps an io.Reader and filters out ASCII RS characters +type RSFilterReader struct { + reader io.Reader + buffer []byte +} + +// NewRSFilterReader creates a new RSFilterReader that filters out RS characters +func NewRSFilterReader(r io.Reader) *RSFilterReader { + return &RSFilterReader{ + reader: r, + buffer: make([]byte, 4096), // Internal buffer for reading chunks + } +} + +// Read implements the io.Reader interface, filtering out RS characters +func (r *RSFilterReader) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + + n, err = r.reader.Read(p) + filtered := slices.DeleteFunc(p[:n], func(b byte) bool { return b == rs }) + return len(filtered), err +} diff --git a/vendor/github.com/moby/moby/client/system_events.go b/vendor/github.com/moby/moby/client/system_events.go index 93b12cdefd..bee7402b59 100644 --- a/vendor/github.com/moby/moby/client/system_events.go +++ b/vendor/github.com/moby/moby/client/system_events.go @@ -2,12 +2,14 @@ package client import ( "context" - "encoding/json" + "net/http" "net/url" "time" + "github.com/moby/moby/api/types" "github.com/moby/moby/api/types/events" "github.com/moby/moby/api/types/filters" + "github.com/moby/moby/client/internal" "github.com/moby/moby/client/internal/timestamp" ) @@ -37,7 +39,10 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha return } - resp, err := cli.get(ctx, "/events", query, nil) + headers := http.Header{} + headers.Add("Accept", types.MediaTypeJSONSequence) + headers.Add("Accept", types.MediaTypeNDJSON) + resp, err := cli.get(ctx, "/events", query, headers) if err != nil { close(started) errs <- err @@ -45,7 +50,8 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha } defer resp.Body.Close() - decoder := json.NewDecoder(resp.Body) + contentType := resp.Header.Get("Content-Type") + decoder := internal.NewJSONStreamDecoder(resp.Body, contentType) close(started) for { @@ -55,7 +61,7 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha return default: var event events.Message - if err := decoder.Decode(&event); err != nil { + if err := decoder(&event); err != nil { errs <- err return } diff --git a/vendor/modules.txt b/vendor/modules.txt index 83626381a7..6e0ecc5b7a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -966,6 +966,7 @@ github.com/moby/moby/api/types/volume # github.com/moby/moby/client v0.1.0-beta.0 => ./client ## explicit; go 1.23.0 github.com/moby/moby/client +github.com/moby/moby/client/internal github.com/moby/moby/client/internal/timestamp github.com/moby/moby/client/pkg/jsonmessage github.com/moby/moby/client/pkg/stringid