libnetwork: handle coalesced endpoint events

The eventually-consistent nature of NetworkDB means we cannot depend on
events being received in the same order that they were sent. Nor can we
depend on receiving events for all intermediate states. It is possible
for a series of entry UPDATEs, or a DELETE followed by a CREATE with the
same key, to get coalesced into a single UPDATE event on the receiving
node. Watchers of NetworkDB tables therefore need to be prepared to
gracefully handle arbitrary UPDATEs of a key, including those where the
new value may have nothing in common with the previous value.

The libnetwork controller naively handled events for endpoint_table
assuming that an endpoint leave followed by a rejoin of the same
endpoint would always be expressed as a DELETE event followed by a
CREATE. It would handle a coalesced UPDATE as a CREATE, adding a new
service binding without removing the old one. This would
have various side effects, such as having the "transient state" of
having multiple conflicting service bindings where more than one
endpoint is assigned an IP address never settling.

Modify the libnetwork controller to handle an UPDATE by removing the
previous service binding then adding the new one.

Signed-off-by: Cory Snider <csnider@mirantis.com>
(cherry picked from commit 4538a1de0a)
Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider
2025-07-11 14:31:13 -04:00
parent bace1b8a3b
commit f099e911bd
4 changed files with 272 additions and 54 deletions

View File

@@ -0,0 +1,32 @@
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
//go:build go1.23
package iterutil
import (
"iter"
"maps"
)
// SameValues checks if a and b yield the same values, independent of order.
func SameValues[T comparable](a, b iter.Seq[T]) bool {
m, n := make(map[T]int), make(map[T]int)
for v := range a {
m[v]++
}
for v := range b {
n[v]++
}
return maps.Equal(m, n)
}
// Deref adapts an iterator of pointers to an iterator of values.
func Deref[T any, P *T](s iter.Seq[P]) iter.Seq[T] {
return func(yield func(T) bool) {
for p := range s {
if !yield(*p) {
return
}
}
}
}

View File

@@ -0,0 +1,31 @@
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
//go:build go1.23
package iterutil
import (
"slices"
"testing"
"gotest.tools/v3/assert"
)
func TestSameValues(t *testing.T) {
a := []int{1, 2, 3, 4, 3}
b := []int{3, 4, 3, 2, 1}
c := []int{1, 2, 3, 4}
assert.Check(t, SameValues(slices.Values(a), slices.Values(a)))
assert.Check(t, SameValues(slices.Values(c), slices.Values(c)))
assert.Check(t, SameValues(slices.Values(a), slices.Values(b)))
assert.Check(t, !SameValues(slices.Values(a), slices.Values(c)))
}
func TestDeref(t *testing.T) {
a := make([]*int, 3)
for i := range a {
a[i] = &i
}
b := slices.Collect(Deref(slices.Values(a)))
assert.DeepEqual(t, b, []int{0, 1, 2})
}

View File

@@ -1,3 +1,6 @@
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
//go:build go1.23
package libnetwork
//go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto
@@ -7,10 +10,13 @@ import (
"encoding/json"
"fmt"
"net"
"net/netip"
"slices"
"sort"
"sync"
"github.com/containerd/log"
"github.com/docker/docker/internal/iterutil"
"github.com/docker/docker/libnetwork/cluster"
"github.com/docker/docker/libnetwork/discoverapi"
"github.com/docker/docker/libnetwork/driverapi"
@@ -850,83 +856,139 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
}
func (c *Controller) handleEpTableEvent(ev events.Event) {
var value []byte
event := ev.(networkdb.WatchEvent)
switch {
case event.IsCreate(), event.IsUpdate():
value = event.Value
case event.IsDelete():
value = event.Prev
default:
log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
return
}
type endpointEvent struct {
EndpointRecord
// Virtual IP of the service to which this endpoint belongs.
VirtualIP netip.Addr
// IP assigned to this endpoint.
EndpointIP netip.Addr
}
func unmarshalEndpointRecord(data []byte) (*endpointEvent, error) {
var epRec EndpointRecord
err := proto.Unmarshal(value, &epRec)
if err != nil {
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
return
if err := proto.Unmarshal(data, &epRec); err != nil {
return nil, fmt.Errorf("failed to unmarshal endpoint record: %w", err)
}
vip, _ := netip.ParseAddr(epRec.VirtualIP)
eip, _ := netip.ParseAddr(epRec.EndpointIP)
if epRec.Name == "" || !eip.IsValid() {
return nil, fmt.Errorf("invalid endpoint name/ip in service table event %s", data)
}
return &endpointEvent{
EndpointRecord: epRec,
VirtualIP: vip,
EndpointIP: eip,
}, nil
}
// EquivalentTo returns true if ev is semantically equivalent to other.
func (ev *endpointEvent) EquivalentTo(other *endpointEvent) bool {
return ev.Name == other.Name &&
ev.ServiceName == other.ServiceName &&
ev.ServiceID == other.ServiceID &&
ev.VirtualIP == other.VirtualIP &&
ev.EndpointIP == other.EndpointIP &&
ev.ServiceDisabled == other.ServiceDisabled &&
iterutil.SameValues(
iterutil.Deref(slices.Values(ev.IngressPorts)),
iterutil.Deref(slices.Values(other.IngressPorts))) &&
iterutil.SameValues(slices.Values(ev.Aliases), slices.Values(other.Aliases)) &&
iterutil.SameValues(slices.Values(ev.TaskAliases), slices.Values(other.TaskAliases))
}
func (c *Controller) handleEpTableEvent(ev events.Event) {
event := ev.(networkdb.WatchEvent)
nid := event.NetworkID
eid := event.Key
containerName := epRec.Name
svcName := epRec.ServiceName
svcID := epRec.ServiceID
vip := net.ParseIP(epRec.VirtualIP)
ip := net.ParseIP(epRec.EndpointIP)
ingressPorts := epRec.IngressPorts
serviceAliases := epRec.Aliases
taskAliases := epRec.TaskAliases
logger := log.G(context.TODO()).WithFields(log.Fields{
"evt": event,
"R": epRec,
})
if containerName == "" || ip == nil {
logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
var prev, epRec *endpointEvent
if event.Prev != nil {
var err error
prev, err = unmarshalEndpointRecord(event.Prev)
if err != nil {
log.G(context.TODO()).WithError(err).Error("error unmarshaling previous value from service table event")
return
}
}
if event.Value != nil {
var err error
epRec, err = unmarshalEndpointRecord(event.Value)
if err != nil {
log.G(context.TODO()).WithError(err).Error("error unmarshaling service table event")
return
}
}
logger := log.G(context.TODO()).WithFields(log.Fields{
"evt": event,
"R": epRec,
"prev": prev,
})
logger.Debug("handleEpTableEvent")
switch {
case event.IsCreate(), event.IsUpdate():
if svcID != "" {
if prev != nil {
if epRec != nil && prev.EquivalentTo(epRec) {
// Avoid flapping if we would otherwise remove a service
// binding then immediately replace it with an equivalent one.
return
}
if prev.ServiceID != "" {
// This is a remote task part of a service
if !prev.ServiceDisabled {
err := c.rmServiceBinding(prev.ServiceName, prev.ServiceID, nid, eid,
prev.Name, prev.VirtualIP.AsSlice(), prev.IngressPorts,
prev.Aliases, prev.TaskAliases, prev.EndpointIP.AsSlice(),
"handleEpTableEvent", true, true)
if err != nil {
logger.WithError(err).Error("failed removing service binding")
}
}
} else {
// This is a remote container simply attached to an attachable network
err := c.delContainerNameResolution(nid, eid, prev.Name, prev.TaskAliases,
prev.EndpointIP.AsSlice(), "handleEpTableEvent")
if err != nil {
logger.WithError(err).Errorf("failed removing container name resolution")
}
}
}
if epRec != nil {
if epRec.ServiceID != "" {
// This is a remote task part of a service
if epRec.ServiceDisabled {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
logger.WithError(err).Error("failed disabling service binding")
return
// Don't double-remove a service binding
if prev == nil || prev.ServiceID != epRec.ServiceID || !prev.ServiceDisabled {
err := c.rmServiceBinding(epRec.ServiceName, epRec.ServiceID,
nid, eid, epRec.Name, epRec.VirtualIP.AsSlice(),
epRec.IngressPorts, epRec.Aliases, epRec.TaskAliases,
epRec.EndpointIP.AsSlice(), "handleEpTableEvent", true, false)
if err != nil {
logger.WithError(err).Error("failed disabling service binding")
return
}
}
} else {
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
err := c.addServiceBinding(epRec.ServiceName, epRec.ServiceID, nid, eid,
epRec.Name, epRec.VirtualIP.AsSlice(), epRec.IngressPorts,
epRec.Aliases, epRec.TaskAliases, epRec.EndpointIP.AsSlice(),
"handleEpTableEvent")
if err != nil {
logger.WithError(err).Error("failed adding service binding")
return
}
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
err := c.addContainerNameResolution(nid, eid, epRec.Name, epRec.TaskAliases,
epRec.EndpointIP.AsSlice(), "handleEpTableEvent")
if err != nil {
logger.WithError(err).Errorf("failed adding container name resolution")
}
}
case event.IsDelete():
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
logger.WithError(err).Error("failed removing service binding")
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logger.WithError(err).Errorf("failed removing container name resolution")
}
}
}
}

93
libnetwork/agent_test.go Normal file
View File

@@ -0,0 +1,93 @@
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
//go:build go1.23
package libnetwork
import (
"net/netip"
"slices"
"testing"
"gotest.tools/v3/assert"
)
func TestEndpointEvent_EquivalentTo(t *testing.T) {
assert.Check(t, (&endpointEvent{}).EquivalentTo(&endpointEvent{}))
a := endpointEvent{
EndpointRecord: EndpointRecord{
Name: "foo",
ServiceName: "bar",
ServiceID: "baz",
IngressPorts: []*PortConfig{
{
Protocol: ProtocolTCP,
TargetPort: 80,
},
{
Name: "dns",
Protocol: ProtocolUDP,
TargetPort: 5353,
PublishedPort: 53,
},
},
},
VirtualIP: netip.MustParseAddr("10.0.0.42"),
EndpointIP: netip.MustParseAddr("192.168.69.42"),
}
assert.Check(t, a.EquivalentTo(&a))
reflexiveEquiv := func(a, b *endpointEvent) bool {
t.Helper()
assert.Check(t, a.EquivalentTo(b) == b.EquivalentTo(a), "reflexive equivalence")
return a.EquivalentTo(b)
}
b := a
b.ServiceDisabled = true
assert.Check(t, !reflexiveEquiv(&a, &b), "differing by ServiceDisabled")
c := a
c.IngressPorts = slices.Clone(a.IngressPorts)
slices.Reverse(c.IngressPorts)
assert.Check(t, reflexiveEquiv(&a, &c), "IngressPorts order should not matter")
d := a
d.IngressPorts = append(d.IngressPorts, a.IngressPorts[0])
assert.Check(t, !reflexiveEquiv(&a, &d), "Differing number of copies of IngressPort entries should not be equivalent")
d.IngressPorts = a.IngressPorts[:1]
assert.Check(t, !reflexiveEquiv(&a, &d), "Removing an IngressPort entry should not be equivalent")
e := a
e.Aliases = []string{"alias1", "alias2"}
assert.Check(t, !reflexiveEquiv(&a, &e), "Differing Aliases should not be equivalent")
f := a
f.TaskAliases = []string{"taskalias1", "taskalias2"}
assert.Check(t, !reflexiveEquiv(&a, &f), "Adding TaskAliases should not be equivalent")
g := a
g.TaskAliases = []string{"taskalias2", "taskalias1"}
assert.Check(t, reflexiveEquiv(&f, &g), "TaskAliases order should not matter")
g.TaskAliases = g.TaskAliases[:1]
assert.Check(t, !reflexiveEquiv(&f, &g), "Differing number of TaskAliases should not be equivalent")
h := a
h.EndpointIP = netip.MustParseAddr("192.168.69.43")
assert.Check(t, !reflexiveEquiv(&a, &h), "Differing EndpointIP should not be equivalent")
i := a
i.VirtualIP = netip.MustParseAddr("10.0.0.69")
assert.Check(t, !reflexiveEquiv(&a, &i), "Differing VirtualIP should not be equivalent")
j := a
j.ServiceID = "qux"
assert.Check(t, !reflexiveEquiv(&a, &j), "Differing ServiceID should not be equivalent")
k := a
k.ServiceName = "quux"
assert.Check(t, !reflexiveEquiv(&a, &k), "Differing ServiceName should not be equivalent")
l := a
l.Name = "aaaaa"
assert.Check(t, !reflexiveEquiv(&a, &l), "Differing Name should not be equivalent")
}