cmd/docker-proxy: pass open listener to proxy impl

In preparation for the daemon passing a listen fd, add command line
option -use-listen-fd to indicate that the fd is present (as fd 4).

If the new flag isn't given, open the listener as normal.

Refactor the TCP and UDP proxies to be constructed with an existing
TCPListener or UDPConn, respectively. Lift the responsibilty of opening
the listener to the entrypoint. Per the Single Responsibility Principle,
this structure affords changing how the listener is created without
having to touch the proxy implementations.

Co-authored-by: Cory Snider <csnider@mirantis.com>
Signed-off-by: Rob Murray <rob.murray@docker.com>
This commit is contained in:
Rob Murray
2024-07-08 13:50:22 +01:00
parent e0c7a839a8
commit ba2f3c0a98
6 changed files with 274 additions and 130 deletions

View File

@@ -1,9 +1,9 @@
package main
import (
"errors"
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
@@ -13,11 +13,27 @@ import (
"github.com/ishidawataru/sctp"
)
func main() {
f := os.NewFile(3, "signal-parent")
host, container := parseFlags()
// The caller is expected to pass-in open file descriptors ...
const (
// Pipe for reporting status, as a string. "0\n" if the proxy
// started normally. "1\n<error message>" otherwise.
parentPipeFd uintptr = 3 + iota
// If -use-listen-fd=true, a listening socket ready to accept TCP
// connections or receive UDP. (Without that option on the command
// line, the listener needs to be opened by docker-proxy, for
// compatibility with older docker daemons. In this case fd 4
// may belong to the Go runtime.)
listenSockFd
)
p, err := NewProxy(host, container)
func main() {
config := parseFlags()
p, err := newProxy(config)
if config.ListenSock != nil {
config.ListenSock.Close()
}
f := os.NewFile(parentPipeFd, "signal-parent")
if err != nil {
fmt.Fprintf(f, "1\n%s", err)
f.Close()
@@ -31,41 +47,114 @@ func main() {
p.Run()
}
func newProxy(config ProxyConfig) (p Proxy, err error) {
ipv := ipv4
if config.HostIP.To4() == nil {
ipv = ipv6
}
switch config.Proto {
case "tcp":
var listener *net.TCPListener
if config.ListenSock == nil {
// Fall back to HostIP:HostPort if no socket on fd 4, for compatibility with older daemons.
hostAddr := &net.TCPAddr{IP: config.HostIP, Port: config.HostPort}
listener, err = net.ListenTCP("tcp"+string(ipv), hostAddr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %s: %w", hostAddr, err)
}
} else {
l, err := net.FileListener(config.ListenSock)
if err != nil {
return nil, err
}
var ok bool
listener, ok = l.(*net.TCPListener)
if !ok {
return nil, fmt.Errorf("unexpected socket type for listener fd: %s", l.Addr().Network())
}
}
container := &net.TCPAddr{IP: config.ContainerIP, Port: config.ContainerPort}
p, err = NewTCPProxy(listener, container)
case "udp":
var listener *net.UDPConn
if config.ListenSock == nil {
// Fall back to HostIP:HostPort if no socket on fd 4, for compatibility with older daemons.
hostAddr := &net.UDPAddr{IP: config.HostIP, Port: config.HostPort}
listener, err = net.ListenUDP("udp"+string(ipv), hostAddr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %s: %w", hostAddr, err)
}
} else {
l, err := net.FilePacketConn(config.ListenSock)
if err != nil {
return nil, err
}
var ok bool
listener, ok = l.(*net.UDPConn)
if !ok {
return nil, fmt.Errorf("unexpected socket type for listener fd: %s", l.LocalAddr().Network())
}
}
container := &net.UDPAddr{IP: config.ContainerIP, Port: config.ContainerPort}
p, err = NewUDPProxy(listener, container)
case "sctp":
var listener *sctp.SCTPListener
if config.ListenSock != nil {
// There's no way to construct an SCTPListener from a file descriptor at the moment.
// If a socket has been passed in, it's probably from a newer daemon using a version
// of the sctp module that does allow it.
return nil, errors.New("cannot use supplied SCTP socket, check the latest docker-proxy is in your $PATH")
}
hostAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: config.HostIP}}, Port: config.HostPort}
container := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: config.ContainerIP}}, Port: config.ContainerPort}
listener, err = sctp.ListenSCTP("sctp"+string(ipv), hostAddr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %s: %w", hostAddr, err)
}
p, err = NewSCTPProxy(listener, container)
default:
return nil, fmt.Errorf("unsupported protocol %s", config.Proto)
}
return p, err
}
type ProxyConfig struct {
Proto string
HostIP, ContainerIP net.IP
HostPort, ContainerPort int
ListenSock *os.File
}
// parseFlags parses the flags passed on reexec to create the TCP/UDP/SCTP
// net.Addrs to map the host and container ports.
func parseFlags() (host net.Addr, container net.Addr) {
func parseFlags() ProxyConfig {
var (
proto = flag.String("proto", "tcp", "proxy protocol")
hostIP = flag.String("host-ip", "", "host ip")
hostPort = flag.Int("host-port", -1, "host port")
containerIP = flag.String("container-ip", "", "container ip")
containerPort = flag.Int("container-port", -1, "container port")
printVer = flag.Bool("v", false, "print version information and quit")
printVersion = flag.Bool("version", false, "print version information and quit")
config ProxyConfig
useListenFd bool
printVer bool
)
flag.StringVar(&config.Proto, "proto", "tcp", "proxy protocol")
flag.TextVar(&config.HostIP, "host-ip", net.IPv4zero, "host ip")
flag.IntVar(&config.HostPort, "host-port", -1, "host port")
flag.TextVar(&config.ContainerIP, "container-ip", net.IPv4zero, "container ip")
flag.IntVar(&config.ContainerPort, "container-port", -1, "container port")
flag.BoolVar(&useListenFd, "use-listen-fd", false, "use a supplied listen fd")
flag.BoolVar(&printVer, "v", false, "print version information and quit")
flag.BoolVar(&printVer, "version", false, "print version information and quit")
flag.Parse()
if *printVer || *printVersion {
if printVer {
fmt.Printf("docker-proxy (commit %s) version %s\n", dockerversion.GitCommit, dockerversion.Version)
os.Exit(0)
}
switch *proto {
case "tcp":
host = &net.TCPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
container = &net.TCPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
case "udp":
host = &net.UDPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
container = &net.UDPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
case "sctp":
host = &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.ParseIP(*hostIP)}}, Port: *hostPort}
container = &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.ParseIP(*containerIP)}}, Port: *containerPort}
default:
log.Fatalf("unsupported protocol %s", *proto)
if useListenFd {
config.ListenSock = os.NewFile(listenSockFd, "listen-sock")
}
return host, container
return config
}
func handleStopSignals(p Proxy) {

View File

@@ -1,3 +1,5 @@
//go:build !windows
package main
import (
@@ -5,13 +7,13 @@ import (
"fmt"
"io"
"net"
"runtime"
"os"
"strings"
"testing"
"time"
"github.com/ishidawataru/sctp"
"gotest.tools/v3/skip"
"gotest.tools/v3/assert"
)
var (
@@ -40,6 +42,8 @@ type UDPEchoServer struct {
testCtx *testing.T
}
const hopefullyFreePort = 25587
func NewEchoServer(t *testing.T, proto, address string, opts EchoServerOptions) EchoServer {
var server EchoServer
if !strings.HasPrefix(proto, "tcp") && opts.TCPHalfClose {
@@ -128,7 +132,31 @@ func (server *UDPEchoServer) Run() {
func (server *UDPEchoServer) LocalAddr() net.Addr { return server.conn.LocalAddr() }
func (server *UDPEchoServer) Close() { server.conn.Close() }
func tcpListener(t *testing.T, nw string, addr *net.TCPAddr) (*os.File, *net.TCPAddr) {
t.Helper()
l, err := net.ListenTCP(nw, addr)
assert.NilError(t, err)
osFile, err := l.File()
assert.NilError(t, err)
tcpAddr := l.Addr().(*net.TCPAddr)
err = l.Close()
assert.NilError(t, err)
return osFile, tcpAddr
}
func udpListener(t *testing.T, nw string, addr *net.UDPAddr) (*os.File, *net.UDPAddr) {
t.Helper()
l, err := net.ListenUDP(nw, addr)
assert.NilError(t, err)
osFile, err := l.File()
assert.NilError(t, err)
err = l.Close()
assert.NilError(t, err)
return osFile, l.LocalAddr().(*net.UDPAddr)
}
func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string, halfClose bool) {
t.Helper()
defer proxy.Close()
go proxy.Run()
var client net.Conn
@@ -167,98 +195,169 @@ func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string, halfClose
}
}
func testProxy(t *testing.T, proto string, proxy Proxy, halfClose bool) {
testProxyAt(t, proto, proxy, proxy.FrontendAddr().String(), halfClose)
}
func testTCP4Proxy(t *testing.T, halfClose bool) {
func testTCP4Proxy(t *testing.T, halfClose bool, hostPort int) {
t.Helper()
backend := NewEchoServer(t, "tcp", "127.0.0.1:0", EchoServerOptions{TCPHalfClose: halfClose})
defer backend.Close()
backend.Run()
backendAddr := backend.LocalAddr().(*net.TCPAddr)
var listener *os.File
frontendAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
if hostPort == 0 {
listener, frontendAddr = tcpListener(t, "tcp4", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})
} else {
frontendAddr.Port = hostPort
}
config := ProxyConfig{
Proto: "tcp",
HostIP: frontendAddr.IP,
HostPort: frontendAddr.Port,
ContainerIP: backendAddr.IP,
ContainerPort: backendAddr.Port,
ListenSock: listener,
}
proxy, err := newProxy(config)
if err != nil {
t.Fatal(err)
}
testProxy(t, "tcp", proxy, halfClose)
testProxyAt(t, "tcp", proxy, frontendAddr.String(), halfClose)
}
func TestTCP4Proxy(t *testing.T) {
testTCP4Proxy(t, false)
testTCP4Proxy(t, false, 0)
}
func TestTCP4ProxyNoListener(t *testing.T) {
testTCP4Proxy(t, false, hopefullyFreePort)
}
func TestTCP4ProxyHalfClose(t *testing.T) {
testTCP4Proxy(t, true)
testTCP4Proxy(t, true, 0)
}
func TestTCP6Proxy(t *testing.T) {
t.Skip("Need to start CI docker with --ipv6")
backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
defer backend.Close()
backend.Run()
frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
backendAddr := backend.LocalAddr().(*net.TCPAddr)
listener, frontendAddr := tcpListener(t, "tcp6", &net.TCPAddr{IP: net.IPv6loopback, Port: 0})
config := ProxyConfig{
Proto: "tcp",
HostIP: frontendAddr.IP,
HostPort: frontendAddr.Port,
ContainerIP: backendAddr.IP,
ContainerPort: backendAddr.Port,
ListenSock: listener,
}
proxy, err := newProxy(config)
if err != nil {
t.Fatal(err)
}
testProxy(t, "tcp", proxy, false)
testProxyAt(t, "tcp", proxy, frontendAddr.String(), false)
}
func TestTCPDualStackProxy(t *testing.T) {
// If I understand `godoc -src net favoriteAddrFamily` (used by the
// net.Listen* functions) correctly this should work, but it doesn't.
t.Skip("No support for dual stack yet")
backend := NewEchoServer(t, "tcp", "[::1]:0", EchoServerOptions{})
defer backend.Close()
backend.Run()
frontendAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 0}
proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
backendAddr := backend.LocalAddr().(*net.TCPAddr)
listener, frontendAddr := tcpListener(t, "tcp", &net.TCPAddr{IP: net.IPv6zero, Port: 0})
config := ProxyConfig{
Proto: "tcp",
HostIP: frontendAddr.IP,
HostPort: frontendAddr.Port,
ContainerIP: backendAddr.IP,
ContainerPort: backendAddr.Port,
ListenSock: listener,
}
proxy, err := newProxy(config)
if err != nil {
t.Fatal(err)
}
ipv4ProxyAddr := &net.TCPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: proxy.FrontendAddr().(*net.TCPAddr).Port,
Port: frontendAddr.Port,
}
testProxyAt(t, "tcp", proxy, ipv4ProxyAddr.String(), false)
}
func TestUDP4Proxy(t *testing.T) {
func testUDP4Proxy(t *testing.T, hostPort int) {
t.Helper()
backend := NewEchoServer(t, "udp", "127.0.0.1:0", EchoServerOptions{})
defer backend.Close()
backend.Run()
var listener *os.File
frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
if hostPort == 0 {
listener, frontendAddr = udpListener(t, "udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})
} else {
frontendAddr.Port = hostPort
}
backendAddr := backend.LocalAddr().(*net.UDPAddr)
config := ProxyConfig{
Proto: "udp",
HostIP: frontendAddr.IP,
HostPort: frontendAddr.Port,
ContainerIP: backendAddr.IP,
ContainerPort: backendAddr.Port,
ListenSock: listener,
}
proxy, err := newProxy(config)
if err != nil {
t.Fatal(err)
}
testProxy(t, "udp", proxy, false)
testProxyAt(t, "udp", proxy, frontendAddr.String(), false)
}
func TestUDP4Proxy(t *testing.T) {
testUDP4Proxy(t, 0)
}
func TestUDP4ProxyNoListener(t *testing.T) {
testUDP4Proxy(t, hopefullyFreePort)
}
func TestUDP6Proxy(t *testing.T) {
t.Skip("Need to start CI docker with --ipv6")
backend := NewEchoServer(t, "udp", "[::1]:0", EchoServerOptions{})
defer backend.Close()
backend.Run()
frontendAddr := &net.UDPAddr{IP: net.IPv6loopback, Port: 0}
proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
listener, frontendAddr := udpListener(t, "udp6", &net.UDPAddr{IP: net.IPv6loopback, Port: 0})
backendAddr := backend.LocalAddr().(*net.UDPAddr)
config := ProxyConfig{
Proto: "udp",
HostIP: frontendAddr.IP,
HostPort: frontendAddr.Port,
ContainerIP: backendAddr.IP,
ContainerPort: backendAddr.Port,
ListenSock: listener,
}
proxy, err := newProxy(config)
if err != nil {
t.Fatal(err)
}
testProxy(t, "udp", proxy, false)
testProxyAt(t, "udp", proxy, frontendAddr.String(), false)
}
func TestUDPWriteError(t *testing.T) {
frontendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
// Hopefully, this port will be free: */
backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 25587}
proxy, err := NewProxy(frontendAddr, backendAddr)
backendAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: hopefullyFreePort}
listener, frontendAddr := udpListener(t, "udp4", frontendAddr)
config := ProxyConfig{
Proto: "udp",
HostIP: frontendAddr.IP,
HostPort: frontendAddr.Port,
ContainerIP: backendAddr.IP,
ContainerPort: backendAddr.Port,
ListenSock: listener,
}
proxy, err := newProxy(config)
if err != nil {
t.Fatal(err)
}
defer proxy.Close()
go proxy.Run()
client, err := net.Dial("udp", "127.0.0.1:25587")
client, err := net.Dial("udp", frontendAddr.String())
if err != nil {
t.Fatalf("Can't connect to the proxy: %v", err)
}
@@ -266,7 +365,7 @@ func TestUDPWriteError(t *testing.T) {
// Make sure the proxy doesn't stop when there is no actual backend:
client.Write(testBuf)
client.Write(testBuf)
backend := NewEchoServer(t, "udp", "127.0.0.1:25587", EchoServerOptions{})
backend := NewEchoServer(t, "udp", backendAddr.String(), EchoServerOptions{})
defer backend.Close()
backend.Run()
client.SetDeadline(time.Now().Add(10 * time.Second))
@@ -282,31 +381,36 @@ func TestUDPWriteError(t *testing.T) {
}
}
func TestSCTP4Proxy(t *testing.T) {
skip.If(t, runtime.GOOS == "windows", "sctp is not supported on windows")
func TestSCTP4ProxyNoListener(t *testing.T) {
backend := NewEchoServer(t, "sctp", "127.0.0.1:0", EchoServerOptions{})
defer backend.Close()
backend.Run()
frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv4(127, 0, 0, 1)}}, Port: 0}
proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
if err != nil {
t.Fatal(err)
backendAddr := backend.LocalAddr().(*sctp.SCTPAddr)
config := ProxyConfig{
Proto: "sctp",
HostIP: net.IPv4(127, 0, 0, 1),
HostPort: hopefullyFreePort,
ContainerIP: backendAddr.IPAddrs[0].IP,
ContainerPort: backendAddr.Port,
}
testProxy(t, "sctp", proxy, false)
proxy, err := newProxy(config)
assert.NilError(t, err)
testProxyAt(t, "sctp", proxy, fmt.Sprintf("%s:%d", config.HostIP, config.HostPort), false)
}
func TestSCTP6Proxy(t *testing.T) {
t.Skip("Need to start CI docker with --ipv6")
skip.If(t, runtime.GOOS == "windows", "sctp is not supported on windows")
func TestSCTP6ProxyNoListener(t *testing.T) {
backend := NewEchoServer(t, "sctp", "[::1]:0", EchoServerOptions{})
defer backend.Close()
backend.Run()
frontendAddr := &sctp.SCTPAddr{IPAddrs: []net.IPAddr{{IP: net.IPv6loopback}}, Port: 0}
proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
if err != nil {
t.Fatal(err)
backendAddr := backend.LocalAddr().(*sctp.SCTPAddr)
config := ProxyConfig{
Proto: "sctp",
HostIP: net.IPv6loopback,
HostPort: hopefullyFreePort,
ContainerIP: backendAddr.IPAddrs[0].IP,
ContainerPort: backendAddr.Port,
}
testProxy(t, "sctp", proxy, false)
proxy, err := newProxy(config)
assert.NilError(t, err)
testProxyAt(t, "sctp", proxy, fmt.Sprintf("[%s]:%d", config.HostIP, config.HostPort), false)
}

View File

@@ -2,11 +2,7 @@
// and UDP.
package main
import (
"net"
"github.com/ishidawataru/sctp"
)
import "net"
// ipVersion refers to IP version - v4 or v6
type ipVersion string
@@ -34,17 +30,3 @@ type Proxy interface {
// BackendAddr returns the proxied address.
BackendAddr() net.Addr
}
// NewProxy creates a Proxy according to the specified frontendAddr and backendAddr.
func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
switch frontendAddr.(type) {
case *net.UDPAddr:
return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr))
case *net.TCPAddr:
return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr))
case *sctp.SCTPAddr:
return NewSCTPProxy(frontendAddr.(*sctp.SCTPAddr), backendAddr.(*sctp.SCTPAddr))
default:
panic("Unsupported protocol")
}
}

View File

@@ -18,18 +18,7 @@ type SCTPProxy struct {
}
// NewSCTPProxy creates a new SCTPProxy.
func NewSCTPProxy(frontendAddr, backendAddr *sctp.SCTPAddr) (*SCTPProxy, error) {
// detect version of hostIP to bind only to correct version
ipVersion := ipv4
if frontendAddr.IPAddrs[0].IP.To4() == nil {
ipVersion = ipv6
}
listener, err := sctp.ListenSCTP("sctp"+string(ipVersion), frontendAddr)
if err != nil {
return nil, err
}
// If the port in frontendAddr was 0 then ListenSCTP will have a picked
// a port to listen on, hence the call to Addr to get that actual port:
func NewSCTPProxy(listener *sctp.SCTPListener, backendAddr *sctp.SCTPAddr) (*SCTPProxy, error) {
return &SCTPProxy{
listener: listener,
frontendAddr: listener.Addr().(*sctp.SCTPAddr),

View File

@@ -16,18 +16,7 @@ type TCPProxy struct {
}
// NewTCPProxy creates a new TCPProxy.
func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
// detect version of hostIP to bind only to correct version
ipVersion := ipv4
if frontendAddr.IP.To4() == nil {
ipVersion = ipv6
}
listener, err := net.ListenTCP("tcp"+string(ipVersion), frontendAddr)
if err != nil {
return nil, err
}
// If the port in frontendAddr was 0 then ListenTCP will have a picked
// a port to listen on, hence the call to Addr to get that actual port:
func NewTCPProxy(listener *net.TCPListener, backendAddr *net.TCPAddr) (*TCPProxy, error) {
return &TCPProxy{
listener: listener,
frontendAddr: listener.Addr().(*net.TCPAddr),

View File

@@ -54,16 +54,7 @@ type UDPProxy struct {
}
// NewUDPProxy creates a new UDPProxy.
func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) {
// detect version of hostIP to bind only to correct version
ipVersion := ipv4
if frontendAddr.IP.To4() == nil {
ipVersion = ipv6
}
listener, err := net.ListenUDP("udp"+string(ipVersion), frontendAddr)
if err != nil {
return nil, err
}
func NewUDPProxy(listener *net.UDPConn, backendAddr *net.UDPAddr) (*UDPProxy, error) {
return &UDPProxy{
listener: listener,
frontendAddr: listener.LocalAddr().(*net.UDPAddr),