From 30b27ab6eaed38232bb3fb0f56f743cb5e76073f Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Fri, 13 Jun 2025 16:17:25 -0400 Subject: [PATCH 1/5] libnetwork/networkdb: drop id field from network The map key for nDB.networks is the network ID. The struct field is not actually used anywhere in practice. Signed-off-by: Cory Snider --- libnetwork/networkdb/delegate.go | 5 ++--- libnetwork/networkdb/networkdb.go | 5 +---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 9aa876058f..678ceaa336 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -136,7 +136,6 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { // This remote network join is being seen the first time. nodeNetworks[nEvent.NetworkID] = &network{ - id: nEvent.NetworkID, ltime: nEvent.LTime, } @@ -452,10 +451,10 @@ func (d *delegate) LocalState(join bool) []byte { } for name, nn := range d.nDB.networks { - for _, n := range nn { + for nid, n := range nn { pp.Networks = append(pp.Networks, &NetworkEntry{ LTime: n.ltime, - NetworkID: n.id, + NetworkID: nid, NodeName: name, Leaving: n.leaving, }) diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index fdc43f36b6..e6da7ebd10 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -128,9 +128,6 @@ type node struct { // network describes the node/network attachment. type network struct { - // Network ID - id string - // Lamport time for the latest state of the entry. ltime serf.LamportTime @@ -621,7 +618,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { if ok { entries = n.entriesNumber.Load() } - nodeNetworks[nid] = &network{id: nid, ltime: ltime} + nodeNetworks[nid] = &network{ltime: ltime} nodeNetworks[nid].entriesNumber.Store(entries) nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: func() int { From 51f31826eea3bd33b3175c879d38901fe8c55ceb Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Fri, 13 Jun 2025 16:44:25 -0400 Subject: [PATCH 2/5] libnetwork/networkdb: don't clear queue on rejoin When joining a network that was previously joined but not yet reaped, NetworkDB replaces the network struct value with a zeroed-out one with the entries count copied over. This is also the case when joining a network that is currently joined! Consequently, joining a network has the side effect of clearing the broadcast queue. If the queue is cleared while messages are still pending broadcast, convergence may be delayed until the next bulk sync cycle. Make it an error to join a network twice without leaving. Retain the existing broadcast queue when rejoining a network that has not yet been reaped. Signed-off-by: Cory Snider --- libnetwork/networkdb/networkdb.go | 48 +++++++++++++++++++------------ 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index e6da7ebd10..a89eb28cad 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -614,32 +614,42 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { nDB.networks[nDB.config.NodeID] = nodeNetworks } n, ok := nodeNetworks[nid] - var entries int64 if ok { - entries = n.entriesNumber.Load() - } - nodeNetworks[nid] = &network{ltime: ltime} - nodeNetworks[nid].entriesNumber.Store(entries) - nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - // TODO fcrisciani this can be optimized maybe avoiding the lock? - // this call is done each GetBroadcasts call to evaluate the number of - // replicas for the message - nDB.RLock() - defer nDB.RUnlock() - return len(nDB.networkNodes[nid]) - }, - RetransmitMult: 4, + if !n.leaving { + nDB.Unlock() + return fmt.Errorf("networkdb: network %s is already joined", nid) + } + n.ltime = ltime + n.inSync = false + n.leaving = false + n.reapTime = 0 + } else { + n = &network{ + ltime: ltime, + tableBroadcasts: &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + // TODO fcrisciani this can be optimized maybe avoiding the lock? + // this call is done each GetBroadcasts call to evaluate the number of + // replicas for the message + nDB.RLock() + defer nDB.RUnlock() + return len(nDB.networkNodes[nid]) + }, + RetransmitMult: 4, + }, + } } nDB.addNetworkNode(nid, nDB.config.NodeID) - networkNodes := nDB.networkNodes[nid] - n = nodeNetworks[nid] - nDB.Unlock() if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil { - return fmt.Errorf("failed to send leave network event for %s: %v", nid, err) + nDB.Unlock() + return fmt.Errorf("failed to send join network event for %s: %v", nid, err) } + nodeNetworks[nid] = n + networkNodes := nDB.networkNodes[nid] + nDB.Unlock() + log.G(context.TODO()).Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid) if _, err := nDB.bulkSync(networkNodes, true); err != nil { log.G(context.TODO()).Errorf("Error bulk syncing while joining network %s: %v", nid, err) From dbb0d881098987c6e90f937dfe7cccc581f1e6ba Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Fri, 13 Jun 2025 15:53:01 -0400 Subject: [PATCH 3/5] libn/networkdb: use distinct type for own networks NetworkDB uses a muli-dimensional map of struct network to keep track of network attachments for both remote nodes and the local node. Only a subset of the struct fields are used for remote nodes' network attachments. The tableBroadcasts pointer field in particular is always initialized for network values representing local attachments (read: nDB.networks[nDB.config.NodeID]) and always nil for remote attachments. Consequently, unnecessary defensive nil-pointer checks are peppered throughout the code despite the aforementioned invariant. Enshrine the invariant that tableBroadcasts is initialized iff the network attachment is for the local node in the type system. Pare down struct network to only the fields needed for remote network attachments and move the local-only fields into a new struct thisNodeNetwork. Elide the unnecessary nil-checks. Signed-off-by: Cory Snider --- libnetwork/networkdb/broadcast.go | 17 +----- libnetwork/networkdb/cluster.go | 29 ++++----- libnetwork/networkdb/delegate.go | 21 ++++--- libnetwork/networkdb/networkdb.go | 67 ++++++++++----------- libnetwork/networkdb/networkdb_test.go | 48 +++++++-------- libnetwork/networkdb/networkdbdiagnostic.go | 3 +- 6 files changed, 87 insertions(+), 98 deletions(-) diff --git a/libnetwork/networkdb/broadcast.go b/libnetwork/networkdb/broadcast.go index efcfcc2426..d13e3a202b 100644 --- a/libnetwork/networkdb/broadcast.go +++ b/libnetwork/networkdb/broadcast.go @@ -142,27 +142,16 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st return err } - var broadcastQ *memberlist.TransmitLimitedQueue nDB.RLock() - thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID] - if ok { - // The network may have been removed - network, networkOk := thisNodeNetworks[nid] - if !networkOk { - nDB.RUnlock() - return nil - } - - broadcastQ = network.tableBroadcasts - } + n, ok := nDB.thisNodeNetworks[nid] nDB.RUnlock() // The network may have been removed - if broadcastQ == nil { + if !ok { return nil } - broadcastQ.QueueBroadcast(&tableEventMessage{ + n.tableBroadcasts.QueueBroadcast(&tableEventMessage{ msg: raw, id: nid, tname: tname, diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index 94239c3a11..645a9c77cd 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -369,6 +369,15 @@ func (nDB *NetworkDB) reapState() { func (nDB *NetworkDB) reapNetworks() { nDB.Lock() + for id, n := range nDB.thisNodeNetworks { + if n.leaving { + if n.reapTime <= 0 { + delete(nDB.thisNodeNetworks, id) + continue + } + n.reapTime -= reapPeriod + } + } for _, nn := range nDB.networks { for id, n := range nn { if n.leaving { @@ -387,7 +396,7 @@ func (nDB *NetworkDB) reapTableEntries() { var nodeNetworks []string // This is best effort, if the list of network changes will be picked up in the next cycle nDB.RLock() - for nid := range nDB.networks[nDB.config.NodeID] { + for nid := range nDB.thisNodeNetworks { nodeNetworks = append(nodeNetworks, nid) } nDB.RUnlock() @@ -430,8 +439,7 @@ func (nDB *NetworkDB) reapTableEntries() { func (nDB *NetworkDB) gossip() { networkNodes := make(map[string][]string) nDB.RLock() - thisNodeNetworks := nDB.networks[nDB.config.NodeID] - for nid := range thisNodeNetworks { + for nid := range nDB.thisNodeNetworks { networkNodes[nid] = nDB.networkNodes[nid] } printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod @@ -451,7 +459,7 @@ func (nDB *NetworkDB) gossip() { bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead nDB.RLock() - network, ok := thisNodeNetworks[nid] + network, ok := nDB.thisNodeNetworks[nid] nDB.RUnlock() if !ok || network == nil { // It is normal for the network to be removed @@ -461,21 +469,14 @@ func (nDB *NetworkDB) gossip() { continue } - broadcastQ := network.tableBroadcasts - - if broadcastQ == nil { - log.G(context.TODO()).Errorf("Invalid broadcastQ encountered while gossiping for network %s", nid) - continue - } - - msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail) + msgs := network.tableBroadcasts.GetBroadcasts(compoundOverhead, bytesAvail) // Collect stats and print the queue info, note this code is here also to have a view of the queues empty network.qMessagesSent.Add(int64(len(msgs))) if printStats { msent := network.qMessagesSent.Swap(0) log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d", nDB.config.Hostname, nDB.config.NodeID, - nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber.Load(), broadcastQ.NumQueued(), + nid, network.leaving, network.tableBroadcasts.NumNodes(), network.entriesNumber.Load(), network.tableBroadcasts.NumQueued(), msent/int64((nDB.config.StatsPrintPeriod/time.Second))) } @@ -510,7 +511,7 @@ func (nDB *NetworkDB) gossip() { func (nDB *NetworkDB) bulkSyncTables() { var networks []string nDB.RLock() - for nid, network := range nDB.networks[nDB.config.NodeID] { + for nid, network := range nDB.thisNodeNetworks { if network.leaving { continue } diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 678ceaa336..bae02a959f 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -135,9 +135,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { } // This remote network join is being seen the first time. - nodeNetworks[nEvent.NetworkID] = &network{ - ltime: nEvent.LTime, - } + nodeNetworks[nEvent.NetworkID] = &network{ltime: nEvent.LTime} nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName) return true @@ -154,8 +152,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool defer nDB.Unlock() // Ignore the table events for networks that are in the process of going away - networks := nDB.networks[nDB.config.NodeID] - network, ok := networks[tEvent.NetworkID] + network, ok := nDB.thisNodeNetworks[tEvent.NetworkID] // Check if the owner of the event is still part of the network nodes := nDB.networkNodes[tEvent.NetworkID] var nodePresent bool @@ -286,11 +283,11 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { } nDB.RLock() - n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID] + n, ok := nDB.thisNodeNetworks[tEvent.NetworkID] nDB.RUnlock() - // if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present - if !ok || n.leaving || n.tableBroadcasts == nil { + // if the network is not there anymore, OR we are leaving the network + if !ok || n.leaving { return } @@ -450,6 +447,14 @@ func (d *delegate) LocalState(join bool) []byte { NodeName: d.nDB.config.NodeID, } + for nid, n := range d.nDB.thisNodeNetworks { + pp.Networks = append(pp.Networks, &NetworkEntry{ + LTime: n.ltime, + NetworkID: nid, + NodeName: d.nDB.config.NodeID, + Leaving: n.leaving, + }) + } for name, nn := range d.nDB.networks { for nid, n := range nn { pp.Networks = append(pp.Networks, &NetworkEntry{ diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index a89eb28cad..0d77748f2a 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -62,11 +62,14 @@ type NetworkDB struct { // List of all peer nodes which have left leftNodes map[string]*node - // A multi-dimensional map of network/node attachments. The - // first key is a node name and the second key is a network ID - // for the network that node is participating in. + // A multi-dimensional map of network/node attachments for peer nodes. + // The first key is a node name and the second key is a network ID for + // the network that node is participating in. networks map[string]map[string]*network + // A map of this node's network attachments. + thisNodeNetworks map[string]*thisNodeNetwork + // A map of nodes which are participating in a given // network. The key is a network ID. networkNodes map[string][]string @@ -131,15 +134,20 @@ type network struct { // Lamport time for the latest state of the entry. ltime serf.LamportTime - // Gets set to true after the first bulk sync happens - inSync bool - // Node leave is in progress. leaving bool // Number of seconds still left before a deleted network entry gets // removed from networkDB reapTime time.Duration +} + +// thisNodeNetwork describes a network attachment on the local node. +type thisNodeNetwork struct { + network + + // Gets set to true after the first bulk sync happens + inSync bool // The broadcast queue for table event gossip. This is only // initialized for this node's network attachment entries. @@ -270,13 +278,14 @@ func new(c *Config) *NetworkDB { byTable: iradix.New[*entry](), byNetwork: iradix.New[*entry](), }, - networks: make(map[string]map[string]*network), - nodes: make(map[string]*node), - failedNodes: make(map[string]*node), - leftNodes: make(map[string]*node), - networkNodes: make(map[string][]string), - bulkSyncAckTbl: make(map[string]chan struct{}), - broadcaster: events.NewBroadcaster(), + networks: make(map[string]map[string]*network), + thisNodeNetworks: make(map[string]*thisNodeNetwork), + nodes: make(map[string]*node), + failedNodes: make(map[string]*node), + leftNodes: make(map[string]*node), + networkNodes: make(map[string][]string), + bulkSyncAckTbl: make(map[string]chan struct{}), + broadcaster: events.NewBroadcaster(), } } @@ -608,24 +617,17 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { ltime := nDB.networkClock.Increment() nDB.Lock() - nodeNetworks, ok := nDB.networks[nDB.config.NodeID] - if !ok { - nodeNetworks = make(map[string]*network) - nDB.networks[nDB.config.NodeID] = nodeNetworks - } - n, ok := nodeNetworks[nid] + n, ok := nDB.thisNodeNetworks[nid] if ok { if !n.leaving { nDB.Unlock() return fmt.Errorf("networkdb: network %s is already joined", nid) } - n.ltime = ltime + n.network = network{ltime: ltime} n.inSync = false - n.leaving = false - n.reapTime = 0 } else { - n = &network{ - ltime: ltime, + n = &thisNodeNetwork{ + network: network{ltime: ltime}, tableBroadcasts: &memberlist.TransmitLimitedQueue{ NumNodes: func() int { // TODO fcrisciani this can be optimized maybe avoiding the lock? @@ -646,7 +648,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { return fmt.Errorf("failed to send join network event for %s: %v", nid, err) } - nodeNetworks[nid] = n + nDB.thisNodeNetworks[nid] = n networkNodes := nDB.networkNodes[nid] nDB.Unlock() @@ -685,12 +687,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { // Update all the local entries marking them for deletion and delete all the remote entries nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID) - nodeNetworks, ok := nDB.networks[nDB.config.NodeID] - if !ok { - return fmt.Errorf("could not find self node for network %s while trying to leave", nid) - } - - n, ok := nodeNetworks[nid] + n, ok := nDB.thisNodeNetworks[nid] if !ok { return fmt.Errorf("could not find network %s while trying to leave", nid) } @@ -741,7 +738,7 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string { defer nDB.RUnlock() var networks []string - for nid := range nDB.networks[nDB.config.NodeID] { + for nid := range nDB.thisNodeNetworks { if n, ok := nDB.networks[nodeName][nid]; ok { if !n.leaving { networks = append(networks, nid) @@ -757,7 +754,7 @@ func (nDB *NetworkDB) updateLocalNetworkTime() { defer nDB.Unlock() ltime := nDB.networkClock.Increment() - for _, n := range nDB.networks[nDB.config.NodeID] { + for _, n := range nDB.thisNodeNetworks { n.ltime = ltime } } @@ -769,7 +766,7 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, v *entry) (okT nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Insert([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)), v) if !okNetwork { // Add only if it is an insert not an update - n, ok := nDB.networks[nDB.config.NodeID][nid] + n, ok := nDB.thisNodeNetworks[nid] if ok { n.entriesNumber.Add(1) } @@ -784,7 +781,7 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwo nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Delete([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key))) if okNetwork { // Remove only if the delete is successful - n, ok := nDB.networks[nDB.config.NodeID][nid] + n, ok := nDB.thisNodeNetworks[nid] if ok { n.entriesNumber.Add(-1) } diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index 93b31eac42..111de19b5f 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -111,25 +111,23 @@ func (nDB *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id strin } for i := int64(0); i < maxRetries; i++ { nDB.RLock() - nn, nnok := nDB.networks[node] - if nnok { - n, ok := nn[id] - var leaving bool - if ok { - leaving = n.leaving - } - nDB.RUnlock() - if present && ok { - return - } - - if !present && - ((ok && leaving) || - !ok) { - return + var vn *network + if node == nDB.config.NodeID { + if n, ok := nDB.thisNodeNetworks[id]; ok { + vn = &n.network } } else { - nDB.RUnlock() + if nn, nnok := nDB.networks[node]; nnok { + if n, ok := nn[id]; ok { + vn = n + } + } + } + exists := vn != nil && !vn.leaving + nDB.RUnlock() + + if present == exists { + return } time.Sleep(sleepInterval) @@ -546,15 +544,15 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { // Wait for the propagation on db[0] dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true) witness0("network1", 2) - if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving { - t.Fatalf("The network should not be marked as leaving:%t", n.leaving) + if n, ok := dbs[0].thisNodeNetworks["network1"]; !ok || n.leaving { + t.Fatal("The network should not be marked as leaving") } // Wait for the propagation on db[1] dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) witness1("network1", 2) - if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving { - t.Fatalf("The network should not be marked as leaving:%t", n.leaving) + if n, ok := dbs[1].thisNodeNetworks["network1"]; !ok || n.leaving { + t.Fatal("The network should not be marked as leaving") } // Try a quick leave/join @@ -599,7 +597,7 @@ func TestNetworkDBGarbageCollection(t *testing.T) { } for i := 0; i < 2; i++ { dbs[i].Lock() - assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match") + assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].thisNodeNetworks["network1"].entriesNumber.Load()), "entries number should match") dbs[i].Unlock() } @@ -610,14 +608,14 @@ func TestNetworkDBGarbageCollection(t *testing.T) { assert.NilError(t, err) for i := 0; i < 3; i++ { dbs[i].Lock() - assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match") + assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].thisNodeNetworks["network1"].entriesNumber.Load()), "entries number should match") dbs[i].Unlock() } // at this point the entries should had been all deleted time.Sleep(30 * time.Second) for i := 0; i < 3; i++ { dbs[i].Lock() - assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected") + assert.Check(t, is.Equal(int64(0), dbs[i].thisNodeNetworks["network1"].entriesNumber.Load()), "entries should had been garbage collected") dbs[i].Unlock() } @@ -625,7 +623,7 @@ func TestNetworkDBGarbageCollection(t *testing.T) { time.Sleep(15 * time.Second) for i := 0; i < 3; i++ { dbs[i].Lock() - assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected") + assert.Check(t, is.Equal(int64(0), dbs[i].thisNodeNetworks["network1"].entriesNumber.Load()), "entries should had been garbage collected") dbs[i].Unlock() } diff --git a/libnetwork/networkdb/networkdbdiagnostic.go b/libnetwork/networkdb/networkdbdiagnostic.go index 9646453f3f..8b3ca13916 100644 --- a/libnetwork/networkdb/networkdbdiagnostic.go +++ b/libnetwork/networkdb/networkdbdiagnostic.go @@ -439,8 +439,7 @@ func (nDB *NetworkDB) dbNetworkStats(w http.ResponseWriter, r *http.Request) { } nDB.RLock() - networks := nDB.networks[nDB.config.NodeID] - network, ok := networks[r.Form["nid"][0]] + network, ok := nDB.thisNodeNetworks[r.Form["nid"][0]] entries := -1 qLen := -1 From e9a715490924910a3b543bd08c44452fc6050867 Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Thu, 12 Jun 2025 17:30:09 -0400 Subject: [PATCH 4/5] libnetwork/networkdb: improve TestCRUDTableEntries Log more details when assertions fail to provide a more complete picture of what went wrong when TestCRUDTableEntries fails. Log the state of each NetworkDB instance at various points in TestCRUDTableEntries to provide an even more complete picture. Increase the global logger verbosity in tests so warnings and debug logs are printed to the test log. Signed-off-by: Cory Snider --- libnetwork/networkdb/networkdb_test.go | 69 +++++++++++++++++++------- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index 111de19b5f..3414a2f69d 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -6,10 +6,13 @@ import ( "net" "os" "strconv" + "strings" "sync/atomic" "testing" + "text/tabwriter" "time" + cerrdefs "github.com/containerd/errdefs" "github.com/containerd/log" "github.com/docker/docker/pkg/stringid" "github.com/docker/go-events" @@ -27,7 +30,7 @@ func init() { func TestMain(m *testing.M) { os.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0o644) - log.SetLevel("error") + log.SetLevel("debug") os.Exit(m.Run()) } @@ -85,18 +88,14 @@ func (nDB *NetworkDB) verifyNodeExistence(t *testing.T, node string, present boo nDB.RLock() _, ok := nDB.nodes[node] nDB.RUnlock() - if present && ok { - return - } - - if !present && !ok { + if present == ok { return } time.Sleep(50 * time.Millisecond) } - t.Errorf("%v(%v): Node existence verification for node %s failed", nDB.config.Hostname, nDB.config.NodeID, node) + t.Errorf("%v(%v): expected node %s existence in the cluster = %v, got %v", nDB.config.Hostname, nDB.config.NodeID, node, present, !present) } func (nDB *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) { @@ -109,6 +108,7 @@ func (nDB *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id strin } else { maxRetries = 80 } + var ok, leaving bool for i := int64(0); i < maxRetries; i++ { nDB.RLock() var vn *network @@ -123,35 +123,49 @@ func (nDB *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id strin } } } - exists := vn != nil && !vn.leaving + ok = vn != nil + leaving = ok && vn.leaving nDB.RUnlock() - if present == exists { + if present == (ok && !leaving) { return } time.Sleep(sleepInterval) } - t.Error("Network existence verification failed") + if present { + t.Errorf("%v(%v): want node %v to be a member of network %q, got that it is not a member (ok=%v, leaving=%v)", + nDB.config.Hostname, nDB.config.NodeID, node, id, ok, leaving) + } else { + t.Errorf("%v(%v): want node %v to not be a member of network %q, got that it is a member (ok=%v, leaving=%v)", + nDB.config.Hostname, nDB.config.NodeID, node, id, ok, leaving) + } } func (nDB *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) { t.Helper() n := 80 + var v []byte for i := 0; i < n; i++ { - v, err := nDB.GetEntry(tname, nid, key) + var err error + v, err = nDB.GetEntry(tname, nid, key) if present && err == nil && string(v) == value { return } - if err != nil && !present { + if cerrdefs.IsNotFound(err) && !present { return } + if err != nil && !cerrdefs.IsNotFound(err) { + t.Errorf("%v(%v): unexpected error while getting entry %v/%v in network %q: %v", + nDB.config.Hostname, nDB.config.NodeID, tname, key, nid, err) + } time.Sleep(50 * time.Millisecond) } - t.Errorf("Entry existence verification test failed for %v(%v)", nDB.config.Hostname, nDB.config.NodeID) + t.Errorf("%v(%v): want entry %v/%v in network %q to be (present=%v, value=%q), got (present=%v, value=%q)", + nDB.config.Hostname, nDB.config.NodeID, tname, key, nid, present, value, !present, string(v)) } func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) { @@ -274,6 +288,19 @@ func TestNetworkDBCRUDTableEntry(t *testing.T) { closeNetworkDBInstances(t, dbs) } +func (nDB *NetworkDB) dumpTable(t *testing.T, tname string) { + t.Helper() + var b strings.Builder + tw := tabwriter.NewWriter(&b, 10, 1, 1, ' ', 0) + tw.Write([]byte("NetworkID\tKey\tValue\tFlags\n")) + nDB.WalkTable(tname, func(nid, key string, value []byte, deleting bool) bool { + fmt.Fprintf(tw, "%s\t%s\t%s\t%v\n", nid, key, string(value), map[bool]string{true: "D"}[deleting]) + return false + }) + tw.Flush() + t.Logf("%s(%s): Table %s:\n%s", nDB.config.Hostname, nDB.config.NodeID, tname, b.String()) +} + func TestNetworkDBCRUDTableEntries(t *testing.T) { dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) @@ -302,18 +329,24 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) { assert.NilError(t, err) } + for n := range dbs { + dbs[n].dumpTable(t, "test_table") + } + for i := 1; i <= n; i++ { dbs[0].verifyEntryExistence(t, "test_table", "network1", fmt.Sprintf("test_key1%d", i), fmt.Sprintf("test_value1%d", i), true) - assert.NilError(t, err) } for i := 1; i <= n; i++ { dbs[1].verifyEntryExistence(t, "test_table", "network1", fmt.Sprintf("test_key0%d", i), fmt.Sprintf("test_value0%d", i), true) - assert.NilError(t, err) + } + + for n := range dbs { + dbs[n].dumpTable(t, "test_table") } // Verify deletes @@ -332,13 +365,15 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) { for i := 1; i <= n; i++ { dbs[0].verifyEntryExistence(t, "test_table", "network1", fmt.Sprintf("test_key1%d", i), "", false) - assert.NilError(t, err) } for i := 1; i <= n; i++ { dbs[1].verifyEntryExistence(t, "test_table", "network1", fmt.Sprintf("test_key0%d", i), "", false) - assert.NilError(t, err) + } + + for n := range dbs { + dbs[n].dumpTable(t, "test_table") } closeNetworkDBInstances(t, dbs) From 6ec6e0991a8054bfac511d984575bffceaaeb873 Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Thu, 12 Jun 2025 17:35:07 -0400 Subject: [PATCH 5/5] libnetwork/networkdb: prioritize local broadcasts A network node is responsible for both broadcasting table events for entries it owns and for rebroadcasting table events from other nodes it has received. Table events to be broadcast are added to a single queue per network, including events for rebroadcasting. As the memberlist TransmitLimitedQueue is (to a first approximation) LIFO, a flood of events from other nodes could delay the broadcasting of locally-generated events indefinitely. Prioritize broadcasting local events by splitting up the queues and only pulling from the rebroadcast queue if there is free space in the gossip packet after draining the local-broadcast queue. Signed-off-by: Cory Snider --- libnetwork/networkdb/broadcast.go | 15 +++++++++++++ libnetwork/networkdb/cluster.go | 7 +++--- libnetwork/networkdb/delegate.go | 13 +++-------- libnetwork/networkdb/networkdb.go | 36 ++++++++++++++++++++++--------- 4 files changed, 48 insertions(+), 23 deletions(-) diff --git a/libnetwork/networkdb/broadcast.go b/libnetwork/networkdb/broadcast.go index d13e3a202b..8b414b5613 100644 --- a/libnetwork/networkdb/broadcast.go +++ b/libnetwork/networkdb/broadcast.go @@ -159,3 +159,18 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st }) return nil } + +func getBroadcasts(overhead, limit int, queues ...*memberlist.TransmitLimitedQueue) [][]byte { + var msgs [][]byte + for _, q := range queues { + b := q.GetBroadcasts(overhead, limit) + for _, m := range b { + limit -= overhead + len(m) + } + msgs = append(msgs, b...) + if limit <= 0 { + break + } + } + return msgs +} diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index 645a9c77cd..131318ee81 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -469,14 +469,15 @@ func (nDB *NetworkDB) gossip() { continue } - msgs := network.tableBroadcasts.GetBroadcasts(compoundOverhead, bytesAvail) + msgs := getBroadcasts(compoundOverhead, bytesAvail, network.tableBroadcasts, network.tableRebroadcasts) // Collect stats and print the queue info, note this code is here also to have a view of the queues empty network.qMessagesSent.Add(int64(len(msgs))) if printStats { msent := network.qMessagesSent.Swap(0) - log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d", + log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d+%d netMsg/s:%d", nDB.config.Hostname, nDB.config.NodeID, - nid, network.leaving, network.tableBroadcasts.NumNodes(), network.entriesNumber.Load(), network.tableBroadcasts.NumQueued(), + nid, network.leaving, network.tableBroadcasts.NumNodes(), network.entriesNumber.Load(), + network.tableBroadcasts.NumQueued(), network.tableRebroadcasts.NumQueued(), msent/int64((nDB.config.StatsPrintPeriod/time.Second))) } diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index bae02a959f..6b0ec99f70 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -292,11 +292,11 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { } // if the queue is over the threshold, avoid distributing information coming from TCP sync - if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync { + if isBulkSync && n.tableRebroadcasts.NumQueued() > maxQueueLenBroadcastOnSync { return } - n.tableBroadcasts.QueueBroadcast(&tableEventMessage{ + n.tableRebroadcasts.QueueBroadcast(&tableEventMessage{ msg: buf, id: tEvent.NetworkID, tname: tEvent.TableName, @@ -419,14 +419,7 @@ func (d *delegate) NotifyMsg(buf []byte) { } func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { - msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit) - for _, m := range msgs { - limit -= overhead + len(m) - } - if limit > 0 { - msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...) - } - return msgs + return getBroadcasts(overhead, limit, d.nDB.networkBroadcasts, d.nDB.nodeBroadcasts) } func (d *delegate) LocalState(join bool) []byte { diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index 0d77748f2a..da6bac4d97 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -149,10 +149,21 @@ type thisNodeNetwork struct { // Gets set to true after the first bulk sync happens inSync bool - // The broadcast queue for table event gossip. This is only - // initialized for this node's network attachment entries. + // The broadcast queue for this network's table event gossip + // for entries owned by this node. tableBroadcasts *memberlist.TransmitLimitedQueue + // The broadcast queue for this network's table event gossip + // relayed from other nodes. + // + // Messages in this queue are broadcasted when there is space available + // in the gossip packet after filling it with tableBroadcast messages. + // Relayed messages are broadcasted at a lower priority than messages + // originating from this node to ensure that local messages are always + // broadcasted in a timely manner, irrespective of how many messages + // from other nodes are queued for rebroadcasting. + tableRebroadcasts *memberlist.TransmitLimitedQueue + // Number of gossip messages sent related to this network during the last stats collection period qMessagesSent atomic.Int64 @@ -626,17 +637,22 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { n.network = network{ltime: ltime} n.inSync = false } else { + numNodes := func() int { + // TODO fcrisciani this can be optimized maybe avoiding the lock? + // this call is done each GetBroadcasts call to evaluate the number of + // replicas for the message + nDB.RLock() + defer nDB.RUnlock() + return len(nDB.networkNodes[nid]) + } n = &thisNodeNetwork{ network: network{ltime: ltime}, tableBroadcasts: &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - // TODO fcrisciani this can be optimized maybe avoiding the lock? - // this call is done each GetBroadcasts call to evaluate the number of - // replicas for the message - nDB.RLock() - defer nDB.RUnlock() - return len(nDB.networkNodes[nid]) - }, + NumNodes: numNodes, + RetransmitMult: 4, + }, + tableRebroadcasts: &memberlist.TransmitLimitedQueue{ + NumNodes: numNodes, RetransmitMult: 4, }, }