diff --git a/internal/iterutil/iterutil.go b/internal/iterutil/iterutil.go new file mode 100644 index 0000000000..d7d9dc0d5f --- /dev/null +++ b/internal/iterutil/iterutil.go @@ -0,0 +1,32 @@ +// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16: +//go:build go1.23 + +package iterutil + +import ( + "iter" + "maps" +) + +// SameValues checks if a and b yield the same values, independent of order. +func SameValues[T comparable](a, b iter.Seq[T]) bool { + m, n := make(map[T]int), make(map[T]int) + for v := range a { + m[v]++ + } + for v := range b { + n[v]++ + } + return maps.Equal(m, n) +} + +// Deref adapts an iterator of pointers to an iterator of values. +func Deref[T any, P *T](s iter.Seq[P]) iter.Seq[T] { + return func(yield func(T) bool) { + for p := range s { + if !yield(*p) { + return + } + } + } +} diff --git a/internal/iterutil/iterutil_test.go b/internal/iterutil/iterutil_test.go new file mode 100644 index 0000000000..c5a4de26da --- /dev/null +++ b/internal/iterutil/iterutil_test.go @@ -0,0 +1,31 @@ +// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16: +//go:build go1.23 + +package iterutil + +import ( + "slices" + "testing" + + "gotest.tools/v3/assert" +) + +func TestSameValues(t *testing.T) { + a := []int{1, 2, 3, 4, 3} + b := []int{3, 4, 3, 2, 1} + c := []int{1, 2, 3, 4} + + assert.Check(t, SameValues(slices.Values(a), slices.Values(a))) + assert.Check(t, SameValues(slices.Values(c), slices.Values(c))) + assert.Check(t, SameValues(slices.Values(a), slices.Values(b))) + assert.Check(t, !SameValues(slices.Values(a), slices.Values(c))) +} + +func TestDeref(t *testing.T) { + a := make([]*int, 3) + for i := range a { + a[i] = &i + } + b := slices.Collect(Deref(slices.Values(a))) + assert.DeepEqual(t, b, []int{0, 1, 2}) +} diff --git a/libnetwork/agent.go b/libnetwork/agent.go index 880a12f01e..42ebf60ed1 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -1,3 +1,6 @@ +// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16: +//go:build go1.23 + package libnetwork //go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto @@ -7,10 +10,13 @@ import ( "encoding/json" "fmt" "net" + "net/netip" + "slices" "sort" "sync" "github.com/containerd/log" + "github.com/docker/docker/internal/iterutil" "github.com/docker/docker/libnetwork/cluster" "github.com/docker/docker/libnetwork/discoverapi" "github.com/docker/docker/libnetwork/driverapi" @@ -850,83 +856,139 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) { c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd) } -func (c *Controller) handleEpTableEvent(ev events.Event) { - var value []byte - event := ev.(networkdb.WatchEvent) - switch { - case event.IsCreate(), event.IsUpdate(): - value = event.Value - case event.IsDelete(): - value = event.Prev - default: - log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event) - return - } +type endpointEvent struct { + EndpointRecord + // Virtual IP of the service to which this endpoint belongs. + VirtualIP netip.Addr + // IP assigned to this endpoint. + EndpointIP netip.Addr +} +func unmarshalEndpointRecord(data []byte) (*endpointEvent, error) { var epRec EndpointRecord - err := proto.Unmarshal(value, &epRec) - if err != nil { - log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value") - return + if err := proto.Unmarshal(data, &epRec); err != nil { + return nil, fmt.Errorf("failed to unmarshal endpoint record: %w", err) } + vip, _ := netip.ParseAddr(epRec.VirtualIP) + eip, _ := netip.ParseAddr(epRec.EndpointIP) + + if epRec.Name == "" || !eip.IsValid() { + return nil, fmt.Errorf("invalid endpoint name/ip in service table event %s", data) + } + + return &endpointEvent{ + EndpointRecord: epRec, + VirtualIP: vip, + EndpointIP: eip, + }, nil +} + +// EquivalentTo returns true if ev is semantically equivalent to other. +func (ev *endpointEvent) EquivalentTo(other *endpointEvent) bool { + return ev.Name == other.Name && + ev.ServiceName == other.ServiceName && + ev.ServiceID == other.ServiceID && + ev.VirtualIP == other.VirtualIP && + ev.EndpointIP == other.EndpointIP && + ev.ServiceDisabled == other.ServiceDisabled && + iterutil.SameValues( + iterutil.Deref(slices.Values(ev.IngressPorts)), + iterutil.Deref(slices.Values(other.IngressPorts))) && + iterutil.SameValues(slices.Values(ev.Aliases), slices.Values(other.Aliases)) && + iterutil.SameValues(slices.Values(ev.TaskAliases), slices.Values(other.TaskAliases)) +} + +func (c *Controller) handleEpTableEvent(ev events.Event) { + event := ev.(networkdb.WatchEvent) nid := event.NetworkID eid := event.Key - containerName := epRec.Name - svcName := epRec.ServiceName - svcID := epRec.ServiceID - vip := net.ParseIP(epRec.VirtualIP) - ip := net.ParseIP(epRec.EndpointIP) - ingressPorts := epRec.IngressPorts - serviceAliases := epRec.Aliases - taskAliases := epRec.TaskAliases - logger := log.G(context.TODO()).WithFields(log.Fields{ - "evt": event, - "R": epRec, - }) - - if containerName == "" || ip == nil { - logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value) - return + var prev, epRec *endpointEvent + if event.Prev != nil { + var err error + prev, err = unmarshalEndpointRecord(event.Prev) + if err != nil { + log.G(context.TODO()).WithError(err).Error("error unmarshaling previous value from service table event") + return + } + } + if event.Value != nil { + var err error + epRec, err = unmarshalEndpointRecord(event.Value) + if err != nil { + log.G(context.TODO()).WithError(err).Error("error unmarshaling service table event") + return + } } + logger := log.G(context.TODO()).WithFields(log.Fields{ + "evt": event, + "R": epRec, + "prev": prev, + }) logger.Debug("handleEpTableEvent") - switch { - case event.IsCreate(), event.IsUpdate(): - if svcID != "" { + if prev != nil { + if epRec != nil && prev.EquivalentTo(epRec) { + // Avoid flapping if we would otherwise remove a service + // binding then immediately replace it with an equivalent one. + return + } + + if prev.ServiceID != "" { + // This is a remote task part of a service + if !prev.ServiceDisabled { + err := c.rmServiceBinding(prev.ServiceName, prev.ServiceID, nid, eid, + prev.Name, prev.VirtualIP.AsSlice(), prev.IngressPorts, + prev.Aliases, prev.TaskAliases, prev.EndpointIP.AsSlice(), + "handleEpTableEvent", true, true) + if err != nil { + logger.WithError(err).Error("failed removing service binding") + } + } + } else { + // This is a remote container simply attached to an attachable network + err := c.delContainerNameResolution(nid, eid, prev.Name, prev.TaskAliases, + prev.EndpointIP.AsSlice(), "handleEpTableEvent") + if err != nil { + logger.WithError(err).Errorf("failed removing container name resolution") + } + } + } + + if epRec != nil { + if epRec.ServiceID != "" { // This is a remote task part of a service if epRec.ServiceDisabled { - if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil { - logger.WithError(err).Error("failed disabling service binding") - return + // Don't double-remove a service binding + if prev == nil || prev.ServiceID != epRec.ServiceID || !prev.ServiceDisabled { + err := c.rmServiceBinding(epRec.ServiceName, epRec.ServiceID, + nid, eid, epRec.Name, epRec.VirtualIP.AsSlice(), + epRec.IngressPorts, epRec.Aliases, epRec.TaskAliases, + epRec.EndpointIP.AsSlice(), "handleEpTableEvent", true, false) + if err != nil { + logger.WithError(err).Error("failed disabling service binding") + return + } } } else { - if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { + err := c.addServiceBinding(epRec.ServiceName, epRec.ServiceID, nid, eid, + epRec.Name, epRec.VirtualIP.AsSlice(), epRec.IngressPorts, + epRec.Aliases, epRec.TaskAliases, epRec.EndpointIP.AsSlice(), + "handleEpTableEvent") + if err != nil { logger.WithError(err).Error("failed adding service binding") return } } } else { // This is a remote container simply attached to an attachable network - if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil { + err := c.addContainerNameResolution(nid, eid, epRec.Name, epRec.TaskAliases, + epRec.EndpointIP.AsSlice(), "handleEpTableEvent") + if err != nil { logger.WithError(err).Errorf("failed adding container name resolution") } } - - case event.IsDelete(): - if svcID != "" { - // This is a remote task part of a service - if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil { - logger.WithError(err).Error("failed removing service binding") - return - } - } else { - // This is a remote container simply attached to an attachable network - if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil { - logger.WithError(err).Errorf("failed removing container name resolution") - } - } } } diff --git a/libnetwork/agent_test.go b/libnetwork/agent_test.go new file mode 100644 index 0000000000..39db24c778 --- /dev/null +++ b/libnetwork/agent_test.go @@ -0,0 +1,93 @@ +// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16: +//go:build go1.23 + +package libnetwork + +import ( + "net/netip" + "slices" + "testing" + + "gotest.tools/v3/assert" +) + +func TestEndpointEvent_EquivalentTo(t *testing.T) { + assert.Check(t, (&endpointEvent{}).EquivalentTo(&endpointEvent{})) + + a := endpointEvent{ + EndpointRecord: EndpointRecord{ + Name: "foo", + ServiceName: "bar", + ServiceID: "baz", + IngressPorts: []*PortConfig{ + { + Protocol: ProtocolTCP, + TargetPort: 80, + }, + { + Name: "dns", + Protocol: ProtocolUDP, + TargetPort: 5353, + PublishedPort: 53, + }, + }, + }, + VirtualIP: netip.MustParseAddr("10.0.0.42"), + EndpointIP: netip.MustParseAddr("192.168.69.42"), + } + assert.Check(t, a.EquivalentTo(&a)) + + reflexiveEquiv := func(a, b *endpointEvent) bool { + t.Helper() + assert.Check(t, a.EquivalentTo(b) == b.EquivalentTo(a), "reflexive equivalence") + return a.EquivalentTo(b) + } + + b := a + b.ServiceDisabled = true + assert.Check(t, !reflexiveEquiv(&a, &b), "differing by ServiceDisabled") + + c := a + c.IngressPorts = slices.Clone(a.IngressPorts) + slices.Reverse(c.IngressPorts) + assert.Check(t, reflexiveEquiv(&a, &c), "IngressPorts order should not matter") + + d := a + d.IngressPorts = append(d.IngressPorts, a.IngressPorts[0]) + assert.Check(t, !reflexiveEquiv(&a, &d), "Differing number of copies of IngressPort entries should not be equivalent") + d.IngressPorts = a.IngressPorts[:1] + assert.Check(t, !reflexiveEquiv(&a, &d), "Removing an IngressPort entry should not be equivalent") + + e := a + e.Aliases = []string{"alias1", "alias2"} + assert.Check(t, !reflexiveEquiv(&a, &e), "Differing Aliases should not be equivalent") + + f := a + f.TaskAliases = []string{"taskalias1", "taskalias2"} + assert.Check(t, !reflexiveEquiv(&a, &f), "Adding TaskAliases should not be equivalent") + g := a + g.TaskAliases = []string{"taskalias2", "taskalias1"} + assert.Check(t, reflexiveEquiv(&f, &g), "TaskAliases order should not matter") + g.TaskAliases = g.TaskAliases[:1] + assert.Check(t, !reflexiveEquiv(&f, &g), "Differing number of TaskAliases should not be equivalent") + + h := a + h.EndpointIP = netip.MustParseAddr("192.168.69.43") + assert.Check(t, !reflexiveEquiv(&a, &h), "Differing EndpointIP should not be equivalent") + + i := a + i.VirtualIP = netip.MustParseAddr("10.0.0.69") + assert.Check(t, !reflexiveEquiv(&a, &i), "Differing VirtualIP should not be equivalent") + + j := a + j.ServiceID = "qux" + assert.Check(t, !reflexiveEquiv(&a, &j), "Differing ServiceID should not be equivalent") + + k := a + k.ServiceName = "quux" + assert.Check(t, !reflexiveEquiv(&a, &k), "Differing ServiceName should not be equivalent") + + l := a + l.Name = "aaaaa" + assert.Check(t, !reflexiveEquiv(&a, &l), "Differing Name should not be equivalent") +}