negociate content-type used by /events API

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof
2025-09-11 12:49:33 +02:00
committed by Sebastiaan van Stijn
parent b26972f9f2
commit df506c107e
12 changed files with 228 additions and 15 deletions

View File

@@ -45,6 +45,8 @@ keywords: "API, Docker, rcli, REST, documentation"
on API version `v1.52` and up. Older API versions still accept this field, but
may take no effect, depending on the kernel version and OCI runtime in use.
* Removed the `KernelMemoryTCP` field from the `GET /info` endpoint.
* `GET /events` supports content-type negotiation and can produce either `application/x-ndjson`
(Newline delimited JSON object stream) or `application/json-seq` (RFC7464).
## v1.51 API changes

View File

@@ -10176,7 +10176,8 @@ paths:
operationId: "SystemEvents"
produces:
- "application/json"
- "application/x-ndjson"
- "application/json-seq"
responses:
200:
description: "no error"

View File

@@ -11,6 +11,15 @@ const (
// MediaTypeMultiplexedStream is vendor specific MIME-Type set for stdin/stdout/stderr multiplexed streams
MediaTypeMultiplexedStream = "application/vnd.docker.multiplexed-stream"
// MediaTypeJSON is the MIME-Type for JSON objects
MediaTypeJSON = "application/json"
// MediaTypeNDJson is the MIME-Type for Newline Delimited JSON objects streams
MediaTypeNDJSON = "application/x-ndjson"
// MediaTypeJsonSequence is the MIME-Type for JSON Text Sequences (RFC7464)
MediaTypeJSONSequence = "application/json-seq"
)
// Ping contains response of Engine API:

View File

@@ -0,0 +1,50 @@
package internal
import (
"encoding/json"
"io"
"slices"
"github.com/moby/moby/api/types"
)
const rs = 0x1E
type DecoderFn func(v any) error
// NewJSONStreamDecoder builds adequate DecoderFn to read json records formatted with specified content-type
func NewJSONStreamDecoder(r io.Reader, contentType string) DecoderFn {
switch contentType {
case types.MediaTypeJSONSequence:
return json.NewDecoder(NewRSFilterReader(r)).Decode
case types.MediaTypeJSON, types.MediaTypeNDJSON:
fallthrough
default:
return json.NewDecoder(r).Decode
}
}
// RSFilterReader wraps an io.Reader and filters out ASCII RS characters
type RSFilterReader struct {
reader io.Reader
buffer []byte
}
// NewRSFilterReader creates a new RSFilterReader that filters out RS characters
func NewRSFilterReader(r io.Reader) *RSFilterReader {
return &RSFilterReader{
reader: r,
buffer: make([]byte, 4096), // Internal buffer for reading chunks
}
}
// Read implements the io.Reader interface, filtering out RS characters
func (r *RSFilterReader) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
n, err = r.reader.Read(p)
filtered := slices.DeleteFunc(p[:n], func(b byte) bool { return b == rs })
return len(filtered), err
}

View File

@@ -0,0 +1,29 @@
package internal
import (
"fmt"
"strings"
"testing"
"github.com/moby/moby/api/types"
"gotest.tools/v3/assert"
)
func Test_JsonSeqDecoder(t *testing.T) {
separator := string(rune(rs))
lf := "\n"
input := fmt.Sprintf(`%s{"hello":"world"}%s%s{ "hello": "again" }%s`, separator, lf, separator, lf)
decoder := NewJSONStreamDecoder(strings.NewReader(input), types.MediaTypeJSONSequence)
type Hello struct {
Hello string `json:"hello"`
}
var hello Hello
err := decoder(&hello)
assert.NilError(t, err)
assert.Equal(t, "world", hello.Hello)
var again Hello
err = decoder(&again)
assert.NilError(t, err)
assert.Equal(t, "again", again.Hello)
}

View File

@@ -2,12 +2,14 @@ package client
import (
"context"
"encoding/json"
"net/http"
"net/url"
"time"
"github.com/moby/moby/api/types"
"github.com/moby/moby/api/types/events"
"github.com/moby/moby/api/types/filters"
"github.com/moby/moby/client/internal"
"github.com/moby/moby/client/internal/timestamp"
)
@@ -37,7 +39,10 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
return
}
resp, err := cli.get(ctx, "/events", query, nil)
headers := http.Header{}
headers.Add("Accept", types.MediaTypeJSONSequence)
headers.Add("Accept", types.MediaTypeNDJSON)
resp, err := cli.get(ctx, "/events", query, headers)
if err != nil {
close(started)
errs <- err
@@ -45,7 +50,8 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
}
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)
contentType := resp.Header.Get("Content-Type")
decoder := internal.NewJSONStreamDecoder(resp.Body, contentType)
close(started)
for {
@@ -55,7 +61,7 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
return
default:
var event events.Message
if err := decoder.Decode(&event); err != nil {
if err := decoder(&event); err != nil {
errs <- err
return
}

View File

@@ -0,0 +1,44 @@
package httputils
import (
"encoding/json"
"io"
"github.com/moby/moby/api/types"
)
const rs = 0x1E
type EncoderFn func(any) error
// NewJSONStreamEncoder builds adequate EncoderFn to write json records using selected content-type formalism
func NewJSONStreamEncoder(w io.Writer, contentType string) EncoderFn {
jsonEncoder := json.NewEncoder(w)
switch contentType {
case types.MediaTypeJSONSequence:
jseq := &jsonSeq{
w: w,
json: jsonEncoder,
}
return jseq.Encode
case types.MediaTypeNDJSON, types.MediaTypeJSON:
fallthrough
default:
return jsonEncoder.Encode
}
}
type jsonSeq struct {
w io.Writer
json *json.Encoder
}
// Encode prefixes every written record with an ASCII record separator.
func (js *jsonSeq) Encode(record any) error {
_, err := js.w.Write([]byte{rs})
if err != nil {
return err
}
// JSON-seq also requires a LF character, bu json.Encoder already adds one
return js.json.Encode(record)
}

View File

@@ -8,7 +8,9 @@ import (
"time"
"github.com/containerd/log"
"github.com/golang/gddo/httputil"
"github.com/moby/moby/api/pkg/authconfig"
"github.com/moby/moby/api/types"
buildtypes "github.com/moby/moby/api/types/build"
"github.com/moby/moby/api/types/events"
"github.com/moby/moby/api/types/filters"
@@ -296,13 +298,17 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
return err
}
w.Header().Set("Content-Type", "application/json")
contentType := httputil.NegotiateContentType(r, []string{
types.MediaTypeNDJSON,
types.MediaTypeJSONSequence,
}, types.MediaTypeJSON) // output isn't actually JSON but API used to this content-type
w.Header().Set("Content-Type", contentType)
w.WriteHeader(http.StatusOK)
output := ioutils.NewWriteFlusher(w)
defer output.Close()
output.Flush()
enc := json.NewEncoder(output)
encode := httputils.NewJSONStreamEncoder(output, contentType)
buffered, l := s.backend.SubscribeToEvents(since, until, ef)
defer s.backend.UnsubscribeFromEvents(l)
@@ -325,12 +331,12 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
continue
}
if includeLegacyFields {
if err := enc.Encode(backFillLegacy(&ev)); err != nil {
if err := encode(backFillLegacy(&ev)); err != nil {
return err
}
continue
}
if err := enc.Encode(ev); err != nil {
if err := encode(ev); err != nil {
return err
}
}
@@ -351,12 +357,12 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
continue
}
if includeLegacyFields {
if err := enc.Encode(backFillLegacy(&jev)); err != nil {
if err := encode(backFillLegacy(&jev)); err != nil {
return err
}
continue
}
if err := enc.Encode(jev); err != nil {
if err := encode(jev); err != nil {
return err
}
case <-timeout:

View File

@@ -11,6 +11,15 @@ const (
// MediaTypeMultiplexedStream is vendor specific MIME-Type set for stdin/stdout/stderr multiplexed streams
MediaTypeMultiplexedStream = "application/vnd.docker.multiplexed-stream"
// MediaTypeJSON is the MIME-Type for JSON objects
MediaTypeJSON = "application/json"
// MediaTypeNDJson is the MIME-Type for Newline Delimited JSON objects streams
MediaTypeNDJSON = "application/x-ndjson"
// MediaTypeJsonSequence is the MIME-Type for JSON Text Sequences (RFC7464)
MediaTypeJSONSequence = "application/json-seq"
)
// Ping contains response of Engine API:

View File

@@ -0,0 +1,50 @@
package internal
import (
"encoding/json"
"io"
"slices"
"github.com/moby/moby/api/types"
)
const rs = 0x1E
type DecoderFn func(v any) error
// NewJSONStreamDecoder builds adequate DecoderFn to read json records formatted with specified content-type
func NewJSONStreamDecoder(r io.Reader, contentType string) DecoderFn {
switch contentType {
case types.MediaTypeJSONSequence:
return json.NewDecoder(NewRSFilterReader(r)).Decode
case types.MediaTypeJSON, types.MediaTypeNDJSON:
fallthrough
default:
return json.NewDecoder(r).Decode
}
}
// RSFilterReader wraps an io.Reader and filters out ASCII RS characters
type RSFilterReader struct {
reader io.Reader
buffer []byte
}
// NewRSFilterReader creates a new RSFilterReader that filters out RS characters
func NewRSFilterReader(r io.Reader) *RSFilterReader {
return &RSFilterReader{
reader: r,
buffer: make([]byte, 4096), // Internal buffer for reading chunks
}
}
// Read implements the io.Reader interface, filtering out RS characters
func (r *RSFilterReader) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
n, err = r.reader.Read(p)
filtered := slices.DeleteFunc(p[:n], func(b byte) bool { return b == rs })
return len(filtered), err
}

View File

@@ -2,12 +2,14 @@ package client
import (
"context"
"encoding/json"
"net/http"
"net/url"
"time"
"github.com/moby/moby/api/types"
"github.com/moby/moby/api/types/events"
"github.com/moby/moby/api/types/filters"
"github.com/moby/moby/client/internal"
"github.com/moby/moby/client/internal/timestamp"
)
@@ -37,7 +39,10 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
return
}
resp, err := cli.get(ctx, "/events", query, nil)
headers := http.Header{}
headers.Add("Accept", types.MediaTypeJSONSequence)
headers.Add("Accept", types.MediaTypeNDJSON)
resp, err := cli.get(ctx, "/events", query, headers)
if err != nil {
close(started)
errs <- err
@@ -45,7 +50,8 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
}
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)
contentType := resp.Header.Get("Content-Type")
decoder := internal.NewJSONStreamDecoder(resp.Body, contentType)
close(started)
for {
@@ -55,7 +61,7 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
return
default:
var event events.Message
if err := decoder.Decode(&event); err != nil {
if err := decoder(&event); err != nil {
errs <- err
return
}

1
vendor/modules.txt vendored
View File

@@ -966,6 +966,7 @@ github.com/moby/moby/api/types/volume
# github.com/moby/moby/client v0.1.0-beta.0 => ./client
## explicit; go 1.23.0
github.com/moby/moby/client
github.com/moby/moby/client/internal
github.com/moby/moby/client/internal/timestamp
github.com/moby/moby/client/pkg/jsonmessage
github.com/moby/moby/client/pkg/stringid