From f099e911bd99581bb0f6c0802cc5c10081c457b4 Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Fri, 11 Jul 2025 14:31:13 -0400 Subject: [PATCH] libnetwork: handle coalesced endpoint events The eventually-consistent nature of NetworkDB means we cannot depend on events being received in the same order that they were sent. Nor can we depend on receiving events for all intermediate states. It is possible for a series of entry UPDATEs, or a DELETE followed by a CREATE with the same key, to get coalesced into a single UPDATE event on the receiving node. Watchers of NetworkDB tables therefore need to be prepared to gracefully handle arbitrary UPDATEs of a key, including those where the new value may have nothing in common with the previous value. The libnetwork controller naively handled events for endpoint_table assuming that an endpoint leave followed by a rejoin of the same endpoint would always be expressed as a DELETE event followed by a CREATE. It would handle a coalesced UPDATE as a CREATE, adding a new service binding without removing the old one. This would have various side effects, such as having the "transient state" of having multiple conflicting service bindings where more than one endpoint is assigned an IP address never settling. Modify the libnetwork controller to handle an UPDATE by removing the previous service binding then adding the new one. Signed-off-by: Cory Snider (cherry picked from commit 4538a1de0a0eb33e55e5b103e46b27ba0a74a489) Signed-off-by: Cory Snider --- internal/iterutil/iterutil.go | 32 ++++++ internal/iterutil/iterutil_test.go | 31 ++++++ libnetwork/agent.go | 170 ++++++++++++++++++++--------- libnetwork/agent_test.go | 93 ++++++++++++++++ 4 files changed, 272 insertions(+), 54 deletions(-) create mode 100644 internal/iterutil/iterutil.go create mode 100644 internal/iterutil/iterutil_test.go create mode 100644 libnetwork/agent_test.go diff --git a/internal/iterutil/iterutil.go b/internal/iterutil/iterutil.go new file mode 100644 index 0000000000..d7d9dc0d5f --- /dev/null +++ b/internal/iterutil/iterutil.go @@ -0,0 +1,32 @@ +// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16: +//go:build go1.23 + +package iterutil + +import ( + "iter" + "maps" +) + +// SameValues checks if a and b yield the same values, independent of order. +func SameValues[T comparable](a, b iter.Seq[T]) bool { + m, n := make(map[T]int), make(map[T]int) + for v := range a { + m[v]++ + } + for v := range b { + n[v]++ + } + return maps.Equal(m, n) +} + +// Deref adapts an iterator of pointers to an iterator of values. +func Deref[T any, P *T](s iter.Seq[P]) iter.Seq[T] { + return func(yield func(T) bool) { + for p := range s { + if !yield(*p) { + return + } + } + } +} diff --git a/internal/iterutil/iterutil_test.go b/internal/iterutil/iterutil_test.go new file mode 100644 index 0000000000..c5a4de26da --- /dev/null +++ b/internal/iterutil/iterutil_test.go @@ -0,0 +1,31 @@ +// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16: +//go:build go1.23 + +package iterutil + +import ( + "slices" + "testing" + + "gotest.tools/v3/assert" +) + +func TestSameValues(t *testing.T) { + a := []int{1, 2, 3, 4, 3} + b := []int{3, 4, 3, 2, 1} + c := []int{1, 2, 3, 4} + + assert.Check(t, SameValues(slices.Values(a), slices.Values(a))) + assert.Check(t, SameValues(slices.Values(c), slices.Values(c))) + assert.Check(t, SameValues(slices.Values(a), slices.Values(b))) + assert.Check(t, !SameValues(slices.Values(a), slices.Values(c))) +} + +func TestDeref(t *testing.T) { + a := make([]*int, 3) + for i := range a { + a[i] = &i + } + b := slices.Collect(Deref(slices.Values(a))) + assert.DeepEqual(t, b, []int{0, 1, 2}) +} diff --git a/libnetwork/agent.go b/libnetwork/agent.go index 880a12f01e..42ebf60ed1 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -1,3 +1,6 @@ +// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16: +//go:build go1.23 + package libnetwork //go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto @@ -7,10 +10,13 @@ import ( "encoding/json" "fmt" "net" + "net/netip" + "slices" "sort" "sync" "github.com/containerd/log" + "github.com/docker/docker/internal/iterutil" "github.com/docker/docker/libnetwork/cluster" "github.com/docker/docker/libnetwork/discoverapi" "github.com/docker/docker/libnetwork/driverapi" @@ -850,83 +856,139 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) { c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd) } -func (c *Controller) handleEpTableEvent(ev events.Event) { - var value []byte - event := ev.(networkdb.WatchEvent) - switch { - case event.IsCreate(), event.IsUpdate(): - value = event.Value - case event.IsDelete(): - value = event.Prev - default: - log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event) - return - } +type endpointEvent struct { + EndpointRecord + // Virtual IP of the service to which this endpoint belongs. + VirtualIP netip.Addr + // IP assigned to this endpoint. + EndpointIP netip.Addr +} +func unmarshalEndpointRecord(data []byte) (*endpointEvent, error) { var epRec EndpointRecord - err := proto.Unmarshal(value, &epRec) - if err != nil { - log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value") - return + if err := proto.Unmarshal(data, &epRec); err != nil { + return nil, fmt.Errorf("failed to unmarshal endpoint record: %w", err) } + vip, _ := netip.ParseAddr(epRec.VirtualIP) + eip, _ := netip.ParseAddr(epRec.EndpointIP) + + if epRec.Name == "" || !eip.IsValid() { + return nil, fmt.Errorf("invalid endpoint name/ip in service table event %s", data) + } + + return &endpointEvent{ + EndpointRecord: epRec, + VirtualIP: vip, + EndpointIP: eip, + }, nil +} + +// EquivalentTo returns true if ev is semantically equivalent to other. +func (ev *endpointEvent) EquivalentTo(other *endpointEvent) bool { + return ev.Name == other.Name && + ev.ServiceName == other.ServiceName && + ev.ServiceID == other.ServiceID && + ev.VirtualIP == other.VirtualIP && + ev.EndpointIP == other.EndpointIP && + ev.ServiceDisabled == other.ServiceDisabled && + iterutil.SameValues( + iterutil.Deref(slices.Values(ev.IngressPorts)), + iterutil.Deref(slices.Values(other.IngressPorts))) && + iterutil.SameValues(slices.Values(ev.Aliases), slices.Values(other.Aliases)) && + iterutil.SameValues(slices.Values(ev.TaskAliases), slices.Values(other.TaskAliases)) +} + +func (c *Controller) handleEpTableEvent(ev events.Event) { + event := ev.(networkdb.WatchEvent) nid := event.NetworkID eid := event.Key - containerName := epRec.Name - svcName := epRec.ServiceName - svcID := epRec.ServiceID - vip := net.ParseIP(epRec.VirtualIP) - ip := net.ParseIP(epRec.EndpointIP) - ingressPorts := epRec.IngressPorts - serviceAliases := epRec.Aliases - taskAliases := epRec.TaskAliases - logger := log.G(context.TODO()).WithFields(log.Fields{ - "evt": event, - "R": epRec, - }) - - if containerName == "" || ip == nil { - logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value) - return + var prev, epRec *endpointEvent + if event.Prev != nil { + var err error + prev, err = unmarshalEndpointRecord(event.Prev) + if err != nil { + log.G(context.TODO()).WithError(err).Error("error unmarshaling previous value from service table event") + return + } + } + if event.Value != nil { + var err error + epRec, err = unmarshalEndpointRecord(event.Value) + if err != nil { + log.G(context.TODO()).WithError(err).Error("error unmarshaling service table event") + return + } } + logger := log.G(context.TODO()).WithFields(log.Fields{ + "evt": event, + "R": epRec, + "prev": prev, + }) logger.Debug("handleEpTableEvent") - switch { - case event.IsCreate(), event.IsUpdate(): - if svcID != "" { + if prev != nil { + if epRec != nil && prev.EquivalentTo(epRec) { + // Avoid flapping if we would otherwise remove a service + // binding then immediately replace it with an equivalent one. + return + } + + if prev.ServiceID != "" { + // This is a remote task part of a service + if !prev.ServiceDisabled { + err := c.rmServiceBinding(prev.ServiceName, prev.ServiceID, nid, eid, + prev.Name, prev.VirtualIP.AsSlice(), prev.IngressPorts, + prev.Aliases, prev.TaskAliases, prev.EndpointIP.AsSlice(), + "handleEpTableEvent", true, true) + if err != nil { + logger.WithError(err).Error("failed removing service binding") + } + } + } else { + // This is a remote container simply attached to an attachable network + err := c.delContainerNameResolution(nid, eid, prev.Name, prev.TaskAliases, + prev.EndpointIP.AsSlice(), "handleEpTableEvent") + if err != nil { + logger.WithError(err).Errorf("failed removing container name resolution") + } + } + } + + if epRec != nil { + if epRec.ServiceID != "" { // This is a remote task part of a service if epRec.ServiceDisabled { - if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil { - logger.WithError(err).Error("failed disabling service binding") - return + // Don't double-remove a service binding + if prev == nil || prev.ServiceID != epRec.ServiceID || !prev.ServiceDisabled { + err := c.rmServiceBinding(epRec.ServiceName, epRec.ServiceID, + nid, eid, epRec.Name, epRec.VirtualIP.AsSlice(), + epRec.IngressPorts, epRec.Aliases, epRec.TaskAliases, + epRec.EndpointIP.AsSlice(), "handleEpTableEvent", true, false) + if err != nil { + logger.WithError(err).Error("failed disabling service binding") + return + } } } else { - if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { + err := c.addServiceBinding(epRec.ServiceName, epRec.ServiceID, nid, eid, + epRec.Name, epRec.VirtualIP.AsSlice(), epRec.IngressPorts, + epRec.Aliases, epRec.TaskAliases, epRec.EndpointIP.AsSlice(), + "handleEpTableEvent") + if err != nil { logger.WithError(err).Error("failed adding service binding") return } } } else { // This is a remote container simply attached to an attachable network - if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil { + err := c.addContainerNameResolution(nid, eid, epRec.Name, epRec.TaskAliases, + epRec.EndpointIP.AsSlice(), "handleEpTableEvent") + if err != nil { logger.WithError(err).Errorf("failed adding container name resolution") } } - - case event.IsDelete(): - if svcID != "" { - // This is a remote task part of a service - if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil { - logger.WithError(err).Error("failed removing service binding") - return - } - } else { - // This is a remote container simply attached to an attachable network - if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil { - logger.WithError(err).Errorf("failed removing container name resolution") - } - } } } diff --git a/libnetwork/agent_test.go b/libnetwork/agent_test.go new file mode 100644 index 0000000000..39db24c778 --- /dev/null +++ b/libnetwork/agent_test.go @@ -0,0 +1,93 @@ +// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16: +//go:build go1.23 + +package libnetwork + +import ( + "net/netip" + "slices" + "testing" + + "gotest.tools/v3/assert" +) + +func TestEndpointEvent_EquivalentTo(t *testing.T) { + assert.Check(t, (&endpointEvent{}).EquivalentTo(&endpointEvent{})) + + a := endpointEvent{ + EndpointRecord: EndpointRecord{ + Name: "foo", + ServiceName: "bar", + ServiceID: "baz", + IngressPorts: []*PortConfig{ + { + Protocol: ProtocolTCP, + TargetPort: 80, + }, + { + Name: "dns", + Protocol: ProtocolUDP, + TargetPort: 5353, + PublishedPort: 53, + }, + }, + }, + VirtualIP: netip.MustParseAddr("10.0.0.42"), + EndpointIP: netip.MustParseAddr("192.168.69.42"), + } + assert.Check(t, a.EquivalentTo(&a)) + + reflexiveEquiv := func(a, b *endpointEvent) bool { + t.Helper() + assert.Check(t, a.EquivalentTo(b) == b.EquivalentTo(a), "reflexive equivalence") + return a.EquivalentTo(b) + } + + b := a + b.ServiceDisabled = true + assert.Check(t, !reflexiveEquiv(&a, &b), "differing by ServiceDisabled") + + c := a + c.IngressPorts = slices.Clone(a.IngressPorts) + slices.Reverse(c.IngressPorts) + assert.Check(t, reflexiveEquiv(&a, &c), "IngressPorts order should not matter") + + d := a + d.IngressPorts = append(d.IngressPorts, a.IngressPorts[0]) + assert.Check(t, !reflexiveEquiv(&a, &d), "Differing number of copies of IngressPort entries should not be equivalent") + d.IngressPorts = a.IngressPorts[:1] + assert.Check(t, !reflexiveEquiv(&a, &d), "Removing an IngressPort entry should not be equivalent") + + e := a + e.Aliases = []string{"alias1", "alias2"} + assert.Check(t, !reflexiveEquiv(&a, &e), "Differing Aliases should not be equivalent") + + f := a + f.TaskAliases = []string{"taskalias1", "taskalias2"} + assert.Check(t, !reflexiveEquiv(&a, &f), "Adding TaskAliases should not be equivalent") + g := a + g.TaskAliases = []string{"taskalias2", "taskalias1"} + assert.Check(t, reflexiveEquiv(&f, &g), "TaskAliases order should not matter") + g.TaskAliases = g.TaskAliases[:1] + assert.Check(t, !reflexiveEquiv(&f, &g), "Differing number of TaskAliases should not be equivalent") + + h := a + h.EndpointIP = netip.MustParseAddr("192.168.69.43") + assert.Check(t, !reflexiveEquiv(&a, &h), "Differing EndpointIP should not be equivalent") + + i := a + i.VirtualIP = netip.MustParseAddr("10.0.0.69") + assert.Check(t, !reflexiveEquiv(&a, &i), "Differing VirtualIP should not be equivalent") + + j := a + j.ServiceID = "qux" + assert.Check(t, !reflexiveEquiv(&a, &j), "Differing ServiceID should not be equivalent") + + k := a + k.ServiceName = "quux" + assert.Check(t, !reflexiveEquiv(&a, &k), "Differing ServiceName should not be equivalent") + + l := a + l.Name = "aaaaa" + assert.Check(t, !reflexiveEquiv(&a, &l), "Differing Name should not be equivalent") +}