mirror of
https://github.com/moby/moby.git
synced 2026-01-11 10:41:43 +00:00
Merge pull request #50744 from dmcgowan/add-grpc-support
Natively support gRPC on the docker socket
This commit is contained in:
@@ -306,13 +306,23 @@ func (cli *daemonCLI) start(ctx context.Context) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Enable HTTP/1, HTTP/2 and h2c on the HTTP server. h2c won't be used for *tls.Conn listeners, and HTTP/2 won't be
|
||||
// used for non-TLS connections.
|
||||
var p http.Protocols
|
||||
p.SetHTTP1(true)
|
||||
p.SetHTTP2(true)
|
||||
p.SetUnencryptedHTTP2(true)
|
||||
|
||||
routers := buildRouters(routerOptions{
|
||||
features: d.Features,
|
||||
daemon: d,
|
||||
cluster: c,
|
||||
builder: b,
|
||||
})
|
||||
httpServer.Handler = apiServer.CreateMux(ctx, routers...)
|
||||
gs := newGRPCServer(ctx)
|
||||
b.backend.RegisterGRPC(gs)
|
||||
httpServer.Protocols = &p
|
||||
httpServer.Handler = newHTTPHandler(ctx, gs, apiServer.CreateMux(ctx, routers...))
|
||||
|
||||
go d.ProcessClusterNotifications(ctx, c.GetWatchStream())
|
||||
|
||||
@@ -816,6 +826,7 @@ func newAPIServerTLSConfig(cfg *config.Config) (*tls.Config, error) {
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "invalid TLS configuration")
|
||||
}
|
||||
tlsConfig.NextProtos = []string{"h2", "http/1.1"}
|
||||
}
|
||||
|
||||
return tlsConfig, nil
|
||||
|
||||
70
daemon/command/httphandler.go
Normal file
70
daemon/command/httphandler.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/containerd/containerd/v2/defaults"
|
||||
"github.com/containerd/log"
|
||||
"github.com/moby/buildkit/util/grpcerrors"
|
||||
"github.com/moby/buildkit/util/stack"
|
||||
"github.com/moby/buildkit/util/tracing"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/moby/moby/v2/daemon/internal/otelutil"
|
||||
)
|
||||
|
||||
type httpHandler struct {
|
||||
ctx context.Context
|
||||
grpcServer *grpc.Server
|
||||
apiServer http.Handler
|
||||
}
|
||||
|
||||
func newHTTPHandler(ctx context.Context, gs *grpc.Server, apiServer http.Handler) http.Handler {
|
||||
return &httpHandler{
|
||||
ctx: ctx,
|
||||
grpcServer: gs,
|
||||
apiServer: apiServer,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("Content-Type"), "application/grpc") {
|
||||
h.grpcServer.ServeHTTP(w, r)
|
||||
} else {
|
||||
h.apiServer.ServeHTTP(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func newGRPCServer(ctx context.Context) *grpc.Server {
|
||||
tp, _ := otelutil.NewTracerProvider(ctx, false)
|
||||
return grpc.NewServer(
|
||||
grpc.StatsHandler(tracing.ServerStatsHandler(otelgrpc.WithTracerProvider(tp))),
|
||||
grpc.ChainUnaryInterceptor(unaryInterceptor, grpcerrors.UnaryServerInterceptor),
|
||||
grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor),
|
||||
grpc.MaxRecvMsgSize(defaults.DefaultMaxRecvMsgSize),
|
||||
grpc.MaxSendMsgSize(defaults.DefaultMaxSendMsgSize),
|
||||
)
|
||||
}
|
||||
|
||||
func unaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, _ error) {
|
||||
// This method is used by the clients to send their traces to buildkit so they can be included
|
||||
// in the daemon trace and stored in the build history record. This method can not be traced because
|
||||
// it would cause an infinite loop.
|
||||
if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
resp, err := handler(ctx, req)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error(info.FullMethod)
|
||||
if log.GetLevel() >= log.DebugLevel {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "%+v", stack.Formatter(grpcerrors.FromGRPC(err)))
|
||||
}
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package build
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -34,15 +33,8 @@ func TestBuildkitHistoryTracePropagation(t *testing.T) {
|
||||
ctx := testutil.StartSpan(baseContext, t)
|
||||
|
||||
c := testEnv.APIClient()
|
||||
opts := []client.ClientOpt{
|
||||
client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) {
|
||||
return c.DialHijack(ctx, "/session", proto, meta)
|
||||
}),
|
||||
client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
|
||||
return c.DialHijack(ctx, "/grpc", "h2c", nil)
|
||||
}),
|
||||
}
|
||||
bc, err := client.New(ctx, "", opts...)
|
||||
bc, err := client.New(ctx, c.DaemonHost())
|
||||
|
||||
assert.NilError(t, err)
|
||||
defer bc.Close()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user