mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
Merge pull request #50953 from ndeloof/ndjson
fix content-type declared by /events API
This commit is contained in:
@@ -45,6 +45,8 @@ keywords: "API, Docker, rcli, REST, documentation"
|
||||
on API version `v1.52` and up. Older API versions still accept this field, but
|
||||
may take no effect, depending on the kernel version and OCI runtime in use.
|
||||
* Removed the `KernelMemoryTCP` field from the `GET /info` endpoint.
|
||||
* `GET /events` supports content-type negotiation and can produce either `application/x-ndjson`
|
||||
(Newline delimited JSON object stream) or `application/json-seq` (RFC7464).
|
||||
|
||||
## v1.51 API changes
|
||||
|
||||
|
||||
@@ -10235,7 +10235,8 @@ paths:
|
||||
|
||||
operationId: "SystemEvents"
|
||||
produces:
|
||||
- "application/json"
|
||||
- "application/x-ndjson"
|
||||
- "application/json-seq"
|
||||
responses:
|
||||
200:
|
||||
description: "no error"
|
||||
|
||||
@@ -11,6 +11,15 @@ const (
|
||||
|
||||
// MediaTypeMultiplexedStream is vendor specific MIME-Type set for stdin/stdout/stderr multiplexed streams
|
||||
MediaTypeMultiplexedStream = "application/vnd.docker.multiplexed-stream"
|
||||
|
||||
// MediaTypeJSON is the MIME-Type for JSON objects
|
||||
MediaTypeJSON = "application/json"
|
||||
|
||||
// MediaTypeNDJson is the MIME-Type for Newline Delimited JSON objects streams
|
||||
MediaTypeNDJSON = "application/x-ndjson"
|
||||
|
||||
// MediaTypeJsonSequence is the MIME-Type for JSON Text Sequences (RFC7464)
|
||||
MediaTypeJSONSequence = "application/json-seq"
|
||||
)
|
||||
|
||||
// Ping contains response of Engine API:
|
||||
|
||||
50
client/internal/json-stream.go
Normal file
50
client/internal/json-stream.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"slices"
|
||||
|
||||
"github.com/moby/moby/api/types"
|
||||
)
|
||||
|
||||
const rs = 0x1E
|
||||
|
||||
type DecoderFn func(v any) error
|
||||
|
||||
// NewJSONStreamDecoder builds adequate DecoderFn to read json records formatted with specified content-type
|
||||
func NewJSONStreamDecoder(r io.Reader, contentType string) DecoderFn {
|
||||
switch contentType {
|
||||
case types.MediaTypeJSONSequence:
|
||||
return json.NewDecoder(NewRSFilterReader(r)).Decode
|
||||
case types.MediaTypeJSON, types.MediaTypeNDJSON:
|
||||
fallthrough
|
||||
default:
|
||||
return json.NewDecoder(r).Decode
|
||||
}
|
||||
}
|
||||
|
||||
// RSFilterReader wraps an io.Reader and filters out ASCII RS characters
|
||||
type RSFilterReader struct {
|
||||
reader io.Reader
|
||||
buffer []byte
|
||||
}
|
||||
|
||||
// NewRSFilterReader creates a new RSFilterReader that filters out RS characters
|
||||
func NewRSFilterReader(r io.Reader) *RSFilterReader {
|
||||
return &RSFilterReader{
|
||||
reader: r,
|
||||
buffer: make([]byte, 4096), // Internal buffer for reading chunks
|
||||
}
|
||||
}
|
||||
|
||||
// Read implements the io.Reader interface, filtering out RS characters
|
||||
func (r *RSFilterReader) Read(p []byte) (n int, err error) {
|
||||
if len(p) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
n, err = r.reader.Read(p)
|
||||
filtered := slices.DeleteFunc(p[:n], func(b byte) bool { return b == rs })
|
||||
return len(filtered), err
|
||||
}
|
||||
29
client/internal/json-stream_test.go
Normal file
29
client/internal/json-stream_test.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/moby/moby/api/types"
|
||||
"gotest.tools/v3/assert"
|
||||
)
|
||||
|
||||
func Test_JsonSeqDecoder(t *testing.T) {
|
||||
separator := string(rune(rs))
|
||||
lf := "\n"
|
||||
input := fmt.Sprintf(`%s{"hello":"world"}%s%s{ "hello": "again" }%s`, separator, lf, separator, lf)
|
||||
decoder := NewJSONStreamDecoder(strings.NewReader(input), types.MediaTypeJSONSequence)
|
||||
type Hello struct {
|
||||
Hello string `json:"hello"`
|
||||
}
|
||||
var hello Hello
|
||||
err := decoder(&hello)
|
||||
assert.NilError(t, err)
|
||||
assert.Equal(t, "world", hello.Hello)
|
||||
|
||||
var again Hello
|
||||
err = decoder(&again)
|
||||
assert.NilError(t, err)
|
||||
assert.Equal(t, "again", again.Hello)
|
||||
}
|
||||
@@ -2,12 +2,14 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/moby/moby/api/types"
|
||||
"github.com/moby/moby/api/types/events"
|
||||
"github.com/moby/moby/api/types/filters"
|
||||
"github.com/moby/moby/client/internal"
|
||||
"github.com/moby/moby/client/internal/timestamp"
|
||||
)
|
||||
|
||||
@@ -37,7 +39,10 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := cli.get(ctx, "/events", query, nil)
|
||||
headers := http.Header{}
|
||||
headers.Add("Accept", types.MediaTypeJSONSequence)
|
||||
headers.Add("Accept", types.MediaTypeNDJSON)
|
||||
resp, err := cli.get(ctx, "/events", query, headers)
|
||||
if err != nil {
|
||||
close(started)
|
||||
errs <- err
|
||||
@@ -45,7 +50,8 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
contentType := resp.Header.Get("Content-Type")
|
||||
decoder := internal.NewJSONStreamDecoder(resp.Body, contentType)
|
||||
|
||||
close(started)
|
||||
for {
|
||||
@@ -55,7 +61,7 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
|
||||
return
|
||||
default:
|
||||
var event events.Message
|
||||
if err := decoder.Decode(&event); err != nil {
|
||||
if err := decoder(&event); err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
|
||||
44
daemon/server/httputils/json-seq.go
Normal file
44
daemon/server/httputils/json-seq.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package httputils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/moby/moby/api/types"
|
||||
)
|
||||
|
||||
const rs = 0x1E
|
||||
|
||||
type EncoderFn func(any) error
|
||||
|
||||
// NewJSONStreamEncoder builds adequate EncoderFn to write json records using selected content-type formalism
|
||||
func NewJSONStreamEncoder(w io.Writer, contentType string) EncoderFn {
|
||||
jsonEncoder := json.NewEncoder(w)
|
||||
switch contentType {
|
||||
case types.MediaTypeJSONSequence:
|
||||
jseq := &jsonSeq{
|
||||
w: w,
|
||||
json: jsonEncoder,
|
||||
}
|
||||
return jseq.Encode
|
||||
case types.MediaTypeNDJSON, types.MediaTypeJSON:
|
||||
fallthrough
|
||||
default:
|
||||
return jsonEncoder.Encode
|
||||
}
|
||||
}
|
||||
|
||||
type jsonSeq struct {
|
||||
w io.Writer
|
||||
json *json.Encoder
|
||||
}
|
||||
|
||||
// Encode prefixes every written record with an ASCII record separator.
|
||||
func (js *jsonSeq) Encode(record any) error {
|
||||
_, err := js.w.Write([]byte{rs})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// JSON-seq also requires a LF character, bu json.Encoder already adds one
|
||||
return js.json.Encode(record)
|
||||
}
|
||||
@@ -8,7 +8,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/containerd/log"
|
||||
"github.com/golang/gddo/httputil"
|
||||
"github.com/moby/moby/api/pkg/authconfig"
|
||||
"github.com/moby/moby/api/types"
|
||||
buildtypes "github.com/moby/moby/api/types/build"
|
||||
"github.com/moby/moby/api/types/events"
|
||||
"github.com/moby/moby/api/types/filters"
|
||||
@@ -296,13 +298,17 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
|
||||
return err
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
contentType := httputil.NegotiateContentType(r, []string{
|
||||
types.MediaTypeNDJSON,
|
||||
types.MediaTypeJSONSequence,
|
||||
}, types.MediaTypeJSON) // output isn't actually JSON but API used to this content-type
|
||||
w.Header().Set("Content-Type", contentType)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
output := ioutils.NewWriteFlusher(w)
|
||||
defer output.Close()
|
||||
output.Flush()
|
||||
|
||||
enc := json.NewEncoder(output)
|
||||
encode := httputils.NewJSONStreamEncoder(output, contentType)
|
||||
|
||||
buffered, l := s.backend.SubscribeToEvents(since, until, ef)
|
||||
defer s.backend.UnsubscribeFromEvents(l)
|
||||
@@ -325,12 +331,12 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
|
||||
continue
|
||||
}
|
||||
if includeLegacyFields {
|
||||
if err := enc.Encode(backFillLegacy(&ev)); err != nil {
|
||||
if err := encode(backFillLegacy(&ev)); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := enc.Encode(ev); err != nil {
|
||||
if err := encode(ev); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -351,12 +357,12 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
|
||||
continue
|
||||
}
|
||||
if includeLegacyFields {
|
||||
if err := enc.Encode(backFillLegacy(&jev)); err != nil {
|
||||
if err := encode(backFillLegacy(&jev)); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := enc.Encode(jev); err != nil {
|
||||
if err := encode(jev); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-timeout:
|
||||
|
||||
9
vendor/github.com/moby/moby/api/types/types.go
generated
vendored
9
vendor/github.com/moby/moby/api/types/types.go
generated
vendored
@@ -11,6 +11,15 @@ const (
|
||||
|
||||
// MediaTypeMultiplexedStream is vendor specific MIME-Type set for stdin/stdout/stderr multiplexed streams
|
||||
MediaTypeMultiplexedStream = "application/vnd.docker.multiplexed-stream"
|
||||
|
||||
// MediaTypeJSON is the MIME-Type for JSON objects
|
||||
MediaTypeJSON = "application/json"
|
||||
|
||||
// MediaTypeNDJson is the MIME-Type for Newline Delimited JSON objects streams
|
||||
MediaTypeNDJSON = "application/x-ndjson"
|
||||
|
||||
// MediaTypeJsonSequence is the MIME-Type for JSON Text Sequences (RFC7464)
|
||||
MediaTypeJSONSequence = "application/json-seq"
|
||||
)
|
||||
|
||||
// Ping contains response of Engine API:
|
||||
|
||||
50
vendor/github.com/moby/moby/client/internal/json-stream.go
generated
vendored
Normal file
50
vendor/github.com/moby/moby/client/internal/json-stream.go
generated
vendored
Normal file
@@ -0,0 +1,50 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"slices"
|
||||
|
||||
"github.com/moby/moby/api/types"
|
||||
)
|
||||
|
||||
const rs = 0x1E
|
||||
|
||||
type DecoderFn func(v any) error
|
||||
|
||||
// NewJSONStreamDecoder builds adequate DecoderFn to read json records formatted with specified content-type
|
||||
func NewJSONStreamDecoder(r io.Reader, contentType string) DecoderFn {
|
||||
switch contentType {
|
||||
case types.MediaTypeJSONSequence:
|
||||
return json.NewDecoder(NewRSFilterReader(r)).Decode
|
||||
case types.MediaTypeJSON, types.MediaTypeNDJSON:
|
||||
fallthrough
|
||||
default:
|
||||
return json.NewDecoder(r).Decode
|
||||
}
|
||||
}
|
||||
|
||||
// RSFilterReader wraps an io.Reader and filters out ASCII RS characters
|
||||
type RSFilterReader struct {
|
||||
reader io.Reader
|
||||
buffer []byte
|
||||
}
|
||||
|
||||
// NewRSFilterReader creates a new RSFilterReader that filters out RS characters
|
||||
func NewRSFilterReader(r io.Reader) *RSFilterReader {
|
||||
return &RSFilterReader{
|
||||
reader: r,
|
||||
buffer: make([]byte, 4096), // Internal buffer for reading chunks
|
||||
}
|
||||
}
|
||||
|
||||
// Read implements the io.Reader interface, filtering out RS characters
|
||||
func (r *RSFilterReader) Read(p []byte) (n int, err error) {
|
||||
if len(p) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
n, err = r.reader.Read(p)
|
||||
filtered := slices.DeleteFunc(p[:n], func(b byte) bool { return b == rs })
|
||||
return len(filtered), err
|
||||
}
|
||||
14
vendor/github.com/moby/moby/client/system_events.go
generated
vendored
14
vendor/github.com/moby/moby/client/system_events.go
generated
vendored
@@ -2,12 +2,14 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/moby/moby/api/types"
|
||||
"github.com/moby/moby/api/types/events"
|
||||
"github.com/moby/moby/api/types/filters"
|
||||
"github.com/moby/moby/client/internal"
|
||||
"github.com/moby/moby/client/internal/timestamp"
|
||||
)
|
||||
|
||||
@@ -37,7 +39,10 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := cli.get(ctx, "/events", query, nil)
|
||||
headers := http.Header{}
|
||||
headers.Add("Accept", types.MediaTypeJSONSequence)
|
||||
headers.Add("Accept", types.MediaTypeNDJSON)
|
||||
resp, err := cli.get(ctx, "/events", query, headers)
|
||||
if err != nil {
|
||||
close(started)
|
||||
errs <- err
|
||||
@@ -45,7 +50,8 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
contentType := resp.Header.Get("Content-Type")
|
||||
decoder := internal.NewJSONStreamDecoder(resp.Body, contentType)
|
||||
|
||||
close(started)
|
||||
for {
|
||||
@@ -55,7 +61,7 @@ func (cli *Client) Events(ctx context.Context, options EventsListOptions) (<-cha
|
||||
return
|
||||
default:
|
||||
var event events.Message
|
||||
if err := decoder.Decode(&event); err != nil {
|
||||
if err := decoder(&event); err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
|
||||
1
vendor/modules.txt
vendored
1
vendor/modules.txt
vendored
@@ -966,6 +966,7 @@ github.com/moby/moby/api/types/volume
|
||||
# github.com/moby/moby/client v0.1.0-beta.0 => ./client
|
||||
## explicit; go 1.23.0
|
||||
github.com/moby/moby/client
|
||||
github.com/moby/moby/client/internal
|
||||
github.com/moby/moby/client/internal/timestamp
|
||||
github.com/moby/moby/client/pkg/jsonmessage
|
||||
github.com/moby/moby/client/pkg/stringid
|
||||
|
||||
Reference in New Issue
Block a user