diff --git a/cmd/docker-proxy/udp_proxy_linux.go b/cmd/docker-proxy/udp_proxy_linux.go index 7dfc8c43ae..560d893d7e 100644 --- a/cmd/docker-proxy/udp_proxy_linux.go +++ b/cmd/docker-proxy/udp_proxy_linux.go @@ -5,6 +5,7 @@ import ( "errors" "log" "net" + "os" "sync" "syscall" "time" @@ -49,7 +50,8 @@ type connTrackMap map[connTrackKey]*connTrackEntry // connTrackEntry wraps a UDP connection to provide thread-safe [net.Conn.Write] // and [net.Conn.Close] operations. type connTrackEntry struct { - conn *net.UDPConn + conn *net.UDPConn + lastW time.Time // This lock should be held before calling Write or Close on the wrapped // net.UDPConn. Read can be called concurrently to these operations. // @@ -64,6 +66,12 @@ func newConnTrackEntry(conn *net.UDPConn) *connTrackEntry { } } +func (cte *connTrackEntry) lastWrite() time.Time { + cte.mu.Lock() + defer cte.mu.Unlock() + return cte.lastW +} + // UDPProxy is proxy for which handles UDP datagrams. It implements the Proxy // interface to handle UDP traffic forwarding between the frontend and backend // addresses. @@ -121,6 +129,15 @@ func (proxy *UDPProxy) replyLoop(cte *connTrackEntry, serverAddr net.IP, clientA // expires: goto again } + // If the UDP connection is one-sided (i.e. the backend never sends + // replies), the connTrackEntry should not be GC'd until no writes + // happen for proxy.connTrackTimeout. + // + // Since the ReadDeadline is set to proxy.connTrackTimeout, in such + // case, the connTrackEntry will be GC'd at most after 2 * proxy.connTrackTimeout. + if errors.Is(err, os.ErrDeadlineExceeded) && time.Since(cte.lastWrite()) < proxy.connTrackTimeout { + continue + } return } for i := 0; i != read; { @@ -186,6 +203,7 @@ func (proxy *UDPProxy) Run() { break } i += written + cte.lastW = time.Now() } cte.mu.Unlock() } diff --git a/cmd/docker-proxy/udp_proxy_linux_test.go b/cmd/docker-proxy/udp_proxy_linux_test.go new file mode 100644 index 0000000000..e2f903d8b2 --- /dev/null +++ b/cmd/docker-proxy/udp_proxy_linux_test.go @@ -0,0 +1,78 @@ +package main + +import ( + "net" + "testing" + "time" + + "gotest.tools/v3/assert" +) + +// TestUDPOneSided makes sure that the conntrack entry isn't GC'd if the +// backend never writes to the UDP client. +func TestUDPOneSided(t *testing.T) { + frontend, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}) + assert.NilError(t, err) + defer frontend.Close() + + backend, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}) + assert.NilError(t, err) + defer backend.Close() + + type udpMsg struct { + data []byte + saddr *net.UDPAddr + } + msgs := make(chan udpMsg) + go func() { + for { + buf := make([]byte, 1024) + n, saddr, err := backend.ReadFromUDP(buf) + if err != nil { + return + } + msgs <- udpMsg{data: buf[:n], saddr: saddr} + } + }() + + proxy, err := NewUDPProxy(frontend, backend.LocalAddr().(*net.UDPAddr), ip4) + assert.NilError(t, err) + defer proxy.Close() + + const connTrackTimeout = 1 * time.Second + proxy.connTrackTimeout = connTrackTimeout + + go func() { + proxy.Run() + }() + + client, err := net.DialUDP("udp", nil, frontend.LocalAddr().(*net.UDPAddr)) + assert.NilError(t, err) + defer client.Close() + + var expSaddr *net.UDPAddr + for i := range 15 { + _, err = client.Write([]byte("hello")) + assert.NilError(t, err) + time.Sleep(100 * time.Millisecond) + + msg := <-msgs + assert.Equal(t, string(msg.data), "hello") + if i == 0 { + expSaddr = msg.saddr + } else { + assert.Equal(t, msg.saddr.Port, expSaddr.Port) + } + } + + // The conntrack entry is checked every connTrackTimeout, but the latest + // write might be less than connTrackTimeout ago. So we need to wait for + // at least twice the conntrack timeout to make sure the entry is GC'd. + time.Sleep(2 * connTrackTimeout) + _, err = client.Write([]byte("hello")) + assert.NilError(t, err) + + msg := <-msgs + assert.Equal(t, string(msg.data), "hello") + assert.Check(t, msg.saddr.Port != expSaddr.Port) +}