mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
Merge pull request #50193 from corhere/libn/networkdb-fix-crudtable-flakes-harder
libnetwork/networkdb: prioritize local table broadcasts over event rebroadcasts
This commit is contained in:
@@ -142,27 +142,16 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
|
||||
return err
|
||||
}
|
||||
|
||||
var broadcastQ *memberlist.TransmitLimitedQueue
|
||||
nDB.RLock()
|
||||
thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID]
|
||||
if ok {
|
||||
// The network may have been removed
|
||||
network, networkOk := thisNodeNetworks[nid]
|
||||
if !networkOk {
|
||||
nDB.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
broadcastQ = network.tableBroadcasts
|
||||
}
|
||||
n, ok := nDB.thisNodeNetworks[nid]
|
||||
nDB.RUnlock()
|
||||
|
||||
// The network may have been removed
|
||||
if broadcastQ == nil {
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
broadcastQ.QueueBroadcast(&tableEventMessage{
|
||||
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
|
||||
msg: raw,
|
||||
id: nid,
|
||||
tname: tname,
|
||||
@@ -170,3 +159,18 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func getBroadcasts(overhead, limit int, queues ...*memberlist.TransmitLimitedQueue) [][]byte {
|
||||
var msgs [][]byte
|
||||
for _, q := range queues {
|
||||
b := q.GetBroadcasts(overhead, limit)
|
||||
for _, m := range b {
|
||||
limit -= overhead + len(m)
|
||||
}
|
||||
msgs = append(msgs, b...)
|
||||
if limit <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
|
||||
@@ -369,6 +369,15 @@ func (nDB *NetworkDB) reapState() {
|
||||
|
||||
func (nDB *NetworkDB) reapNetworks() {
|
||||
nDB.Lock()
|
||||
for id, n := range nDB.thisNodeNetworks {
|
||||
if n.leaving {
|
||||
if n.reapTime <= 0 {
|
||||
delete(nDB.thisNodeNetworks, id)
|
||||
continue
|
||||
}
|
||||
n.reapTime -= reapPeriod
|
||||
}
|
||||
}
|
||||
for _, nn := range nDB.networks {
|
||||
for id, n := range nn {
|
||||
if n.leaving {
|
||||
@@ -387,7 +396,7 @@ func (nDB *NetworkDB) reapTableEntries() {
|
||||
var nodeNetworks []string
|
||||
// This is best effort, if the list of network changes will be picked up in the next cycle
|
||||
nDB.RLock()
|
||||
for nid := range nDB.networks[nDB.config.NodeID] {
|
||||
for nid := range nDB.thisNodeNetworks {
|
||||
nodeNetworks = append(nodeNetworks, nid)
|
||||
}
|
||||
nDB.RUnlock()
|
||||
@@ -430,8 +439,7 @@ func (nDB *NetworkDB) reapTableEntries() {
|
||||
func (nDB *NetworkDB) gossip() {
|
||||
networkNodes := make(map[string][]string)
|
||||
nDB.RLock()
|
||||
thisNodeNetworks := nDB.networks[nDB.config.NodeID]
|
||||
for nid := range thisNodeNetworks {
|
||||
for nid := range nDB.thisNodeNetworks {
|
||||
networkNodes[nid] = nDB.networkNodes[nid]
|
||||
}
|
||||
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
|
||||
@@ -451,7 +459,7 @@ func (nDB *NetworkDB) gossip() {
|
||||
bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead
|
||||
|
||||
nDB.RLock()
|
||||
network, ok := thisNodeNetworks[nid]
|
||||
network, ok := nDB.thisNodeNetworks[nid]
|
||||
nDB.RUnlock()
|
||||
if !ok || network == nil {
|
||||
// It is normal for the network to be removed
|
||||
@@ -461,21 +469,15 @@ func (nDB *NetworkDB) gossip() {
|
||||
continue
|
||||
}
|
||||
|
||||
broadcastQ := network.tableBroadcasts
|
||||
|
||||
if broadcastQ == nil {
|
||||
log.G(context.TODO()).Errorf("Invalid broadcastQ encountered while gossiping for network %s", nid)
|
||||
continue
|
||||
}
|
||||
|
||||
msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
|
||||
msgs := getBroadcasts(compoundOverhead, bytesAvail, network.tableBroadcasts, network.tableRebroadcasts)
|
||||
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
|
||||
network.qMessagesSent.Add(int64(len(msgs)))
|
||||
if printStats {
|
||||
msent := network.qMessagesSent.Swap(0)
|
||||
log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
|
||||
log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d+%d netMsg/s:%d",
|
||||
nDB.config.Hostname, nDB.config.NodeID,
|
||||
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber.Load(), broadcastQ.NumQueued(),
|
||||
nid, network.leaving, network.tableBroadcasts.NumNodes(), network.entriesNumber.Load(),
|
||||
network.tableBroadcasts.NumQueued(), network.tableRebroadcasts.NumQueued(),
|
||||
msent/int64((nDB.config.StatsPrintPeriod/time.Second)))
|
||||
}
|
||||
|
||||
@@ -510,7 +512,7 @@ func (nDB *NetworkDB) gossip() {
|
||||
func (nDB *NetworkDB) bulkSyncTables() {
|
||||
var networks []string
|
||||
nDB.RLock()
|
||||
for nid, network := range nDB.networks[nDB.config.NodeID] {
|
||||
for nid, network := range nDB.thisNodeNetworks {
|
||||
if network.leaving {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -135,10 +135,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
|
||||
}
|
||||
|
||||
// This remote network join is being seen the first time.
|
||||
nodeNetworks[nEvent.NetworkID] = &network{
|
||||
id: nEvent.NetworkID,
|
||||
ltime: nEvent.LTime,
|
||||
}
|
||||
nodeNetworks[nEvent.NetworkID] = &network{ltime: nEvent.LTime}
|
||||
|
||||
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
|
||||
return true
|
||||
@@ -155,8 +152,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
|
||||
defer nDB.Unlock()
|
||||
|
||||
// Ignore the table events for networks that are in the process of going away
|
||||
networks := nDB.networks[nDB.config.NodeID]
|
||||
network, ok := networks[tEvent.NetworkID]
|
||||
network, ok := nDB.thisNodeNetworks[tEvent.NetworkID]
|
||||
// Check if the owner of the event is still part of the network
|
||||
nodes := nDB.networkNodes[tEvent.NetworkID]
|
||||
var nodePresent bool
|
||||
@@ -287,20 +283,20 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
|
||||
}
|
||||
|
||||
nDB.RLock()
|
||||
n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
|
||||
n, ok := nDB.thisNodeNetworks[tEvent.NetworkID]
|
||||
nDB.RUnlock()
|
||||
|
||||
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
|
||||
if !ok || n.leaving || n.tableBroadcasts == nil {
|
||||
// if the network is not there anymore, OR we are leaving the network
|
||||
if !ok || n.leaving {
|
||||
return
|
||||
}
|
||||
|
||||
// if the queue is over the threshold, avoid distributing information coming from TCP sync
|
||||
if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
|
||||
if isBulkSync && n.tableRebroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
|
||||
return
|
||||
}
|
||||
|
||||
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
|
||||
n.tableRebroadcasts.QueueBroadcast(&tableEventMessage{
|
||||
msg: buf,
|
||||
id: tEvent.NetworkID,
|
||||
tname: tEvent.TableName,
|
||||
@@ -423,14 +419,7 @@ func (d *delegate) NotifyMsg(buf []byte) {
|
||||
}
|
||||
|
||||
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
||||
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
|
||||
for _, m := range msgs {
|
||||
limit -= overhead + len(m)
|
||||
}
|
||||
if limit > 0 {
|
||||
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
|
||||
}
|
||||
return msgs
|
||||
return getBroadcasts(overhead, limit, d.nDB.networkBroadcasts, d.nDB.nodeBroadcasts)
|
||||
}
|
||||
|
||||
func (d *delegate) LocalState(join bool) []byte {
|
||||
@@ -451,11 +440,19 @@ func (d *delegate) LocalState(join bool) []byte {
|
||||
NodeName: d.nDB.config.NodeID,
|
||||
}
|
||||
|
||||
for nid, n := range d.nDB.thisNodeNetworks {
|
||||
pp.Networks = append(pp.Networks, &NetworkEntry{
|
||||
LTime: n.ltime,
|
||||
NetworkID: nid,
|
||||
NodeName: d.nDB.config.NodeID,
|
||||
Leaving: n.leaving,
|
||||
})
|
||||
}
|
||||
for name, nn := range d.nDB.networks {
|
||||
for _, n := range nn {
|
||||
for nid, n := range nn {
|
||||
pp.Networks = append(pp.Networks, &NetworkEntry{
|
||||
LTime: n.ltime,
|
||||
NetworkID: n.id,
|
||||
NetworkID: nid,
|
||||
NodeName: name,
|
||||
Leaving: n.leaving,
|
||||
})
|
||||
|
||||
@@ -62,11 +62,14 @@ type NetworkDB struct {
|
||||
// List of all peer nodes which have left
|
||||
leftNodes map[string]*node
|
||||
|
||||
// A multi-dimensional map of network/node attachments. The
|
||||
// first key is a node name and the second key is a network ID
|
||||
// for the network that node is participating in.
|
||||
// A multi-dimensional map of network/node attachments for peer nodes.
|
||||
// The first key is a node name and the second key is a network ID for
|
||||
// the network that node is participating in.
|
||||
networks map[string]map[string]*network
|
||||
|
||||
// A map of this node's network attachments.
|
||||
thisNodeNetworks map[string]*thisNodeNetwork
|
||||
|
||||
// A map of nodes which are participating in a given
|
||||
// network. The key is a network ID.
|
||||
networkNodes map[string][]string
|
||||
@@ -128,26 +131,39 @@ type node struct {
|
||||
|
||||
// network describes the node/network attachment.
|
||||
type network struct {
|
||||
// Network ID
|
||||
id string
|
||||
|
||||
// Lamport time for the latest state of the entry.
|
||||
ltime serf.LamportTime
|
||||
|
||||
// Gets set to true after the first bulk sync happens
|
||||
inSync bool
|
||||
|
||||
// Node leave is in progress.
|
||||
leaving bool
|
||||
|
||||
// Number of seconds still left before a deleted network entry gets
|
||||
// removed from networkDB
|
||||
reapTime time.Duration
|
||||
}
|
||||
|
||||
// The broadcast queue for table event gossip. This is only
|
||||
// initialized for this node's network attachment entries.
|
||||
// thisNodeNetwork describes a network attachment on the local node.
|
||||
type thisNodeNetwork struct {
|
||||
network
|
||||
|
||||
// Gets set to true after the first bulk sync happens
|
||||
inSync bool
|
||||
|
||||
// The broadcast queue for this network's table event gossip
|
||||
// for entries owned by this node.
|
||||
tableBroadcasts *memberlist.TransmitLimitedQueue
|
||||
|
||||
// The broadcast queue for this network's table event gossip
|
||||
// relayed from other nodes.
|
||||
//
|
||||
// Messages in this queue are broadcasted when there is space available
|
||||
// in the gossip packet after filling it with tableBroadcast messages.
|
||||
// Relayed messages are broadcasted at a lower priority than messages
|
||||
// originating from this node to ensure that local messages are always
|
||||
// broadcasted in a timely manner, irrespective of how many messages
|
||||
// from other nodes are queued for rebroadcasting.
|
||||
tableRebroadcasts *memberlist.TransmitLimitedQueue
|
||||
|
||||
// Number of gossip messages sent related to this network during the last stats collection period
|
||||
qMessagesSent atomic.Int64
|
||||
|
||||
@@ -273,13 +289,14 @@ func new(c *Config) *NetworkDB {
|
||||
byTable: iradix.New[*entry](),
|
||||
byNetwork: iradix.New[*entry](),
|
||||
},
|
||||
networks: make(map[string]map[string]*network),
|
||||
nodes: make(map[string]*node),
|
||||
failedNodes: make(map[string]*node),
|
||||
leftNodes: make(map[string]*node),
|
||||
networkNodes: make(map[string][]string),
|
||||
bulkSyncAckTbl: make(map[string]chan struct{}),
|
||||
broadcaster: events.NewBroadcaster(),
|
||||
networks: make(map[string]map[string]*network),
|
||||
thisNodeNetworks: make(map[string]*thisNodeNetwork),
|
||||
nodes: make(map[string]*node),
|
||||
failedNodes: make(map[string]*node),
|
||||
leftNodes: make(map[string]*node),
|
||||
networkNodes: make(map[string][]string),
|
||||
bulkSyncAckTbl: make(map[string]chan struct{}),
|
||||
broadcaster: events.NewBroadcaster(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -611,38 +628,46 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
|
||||
ltime := nDB.networkClock.Increment()
|
||||
|
||||
nDB.Lock()
|
||||
nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
|
||||
if !ok {
|
||||
nodeNetworks = make(map[string]*network)
|
||||
nDB.networks[nDB.config.NodeID] = nodeNetworks
|
||||
}
|
||||
n, ok := nodeNetworks[nid]
|
||||
var entries int64
|
||||
n, ok := nDB.thisNodeNetworks[nid]
|
||||
if ok {
|
||||
entries = n.entriesNumber.Load()
|
||||
}
|
||||
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
|
||||
nodeNetworks[nid].entriesNumber.Store(entries)
|
||||
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
|
||||
NumNodes: func() int {
|
||||
if !n.leaving {
|
||||
nDB.Unlock()
|
||||
return fmt.Errorf("networkdb: network %s is already joined", nid)
|
||||
}
|
||||
n.network = network{ltime: ltime}
|
||||
n.inSync = false
|
||||
} else {
|
||||
numNodes := func() int {
|
||||
// TODO fcrisciani this can be optimized maybe avoiding the lock?
|
||||
// this call is done each GetBroadcasts call to evaluate the number of
|
||||
// replicas for the message
|
||||
nDB.RLock()
|
||||
defer nDB.RUnlock()
|
||||
return len(nDB.networkNodes[nid])
|
||||
},
|
||||
RetransmitMult: 4,
|
||||
}
|
||||
n = &thisNodeNetwork{
|
||||
network: network{ltime: ltime},
|
||||
tableBroadcasts: &memberlist.TransmitLimitedQueue{
|
||||
NumNodes: numNodes,
|
||||
RetransmitMult: 4,
|
||||
},
|
||||
tableRebroadcasts: &memberlist.TransmitLimitedQueue{
|
||||
NumNodes: numNodes,
|
||||
RetransmitMult: 4,
|
||||
},
|
||||
}
|
||||
}
|
||||
nDB.addNetworkNode(nid, nDB.config.NodeID)
|
||||
networkNodes := nDB.networkNodes[nid]
|
||||
n = nodeNetworks[nid]
|
||||
nDB.Unlock()
|
||||
|
||||
if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
|
||||
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
|
||||
nDB.Unlock()
|
||||
return fmt.Errorf("failed to send join network event for %s: %v", nid, err)
|
||||
}
|
||||
|
||||
nDB.thisNodeNetworks[nid] = n
|
||||
networkNodes := nDB.networkNodes[nid]
|
||||
nDB.Unlock()
|
||||
|
||||
log.G(context.TODO()).Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
|
||||
if _, err := nDB.bulkSync(networkNodes, true); err != nil {
|
||||
log.G(context.TODO()).Errorf("Error bulk syncing while joining network %s: %v", nid, err)
|
||||
@@ -678,12 +703,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
|
||||
// Update all the local entries marking them for deletion and delete all the remote entries
|
||||
nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID)
|
||||
|
||||
nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
|
||||
if !ok {
|
||||
return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
|
||||
}
|
||||
|
||||
n, ok := nodeNetworks[nid]
|
||||
n, ok := nDB.thisNodeNetworks[nid]
|
||||
if !ok {
|
||||
return fmt.Errorf("could not find network %s while trying to leave", nid)
|
||||
}
|
||||
@@ -734,7 +754,7 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
|
||||
defer nDB.RUnlock()
|
||||
|
||||
var networks []string
|
||||
for nid := range nDB.networks[nDB.config.NodeID] {
|
||||
for nid := range nDB.thisNodeNetworks {
|
||||
if n, ok := nDB.networks[nodeName][nid]; ok {
|
||||
if !n.leaving {
|
||||
networks = append(networks, nid)
|
||||
@@ -750,7 +770,7 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
|
||||
defer nDB.Unlock()
|
||||
|
||||
ltime := nDB.networkClock.Increment()
|
||||
for _, n := range nDB.networks[nDB.config.NodeID] {
|
||||
for _, n := range nDB.thisNodeNetworks {
|
||||
n.ltime = ltime
|
||||
}
|
||||
}
|
||||
@@ -762,7 +782,7 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, v *entry) (okT
|
||||
nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Insert([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)), v)
|
||||
if !okNetwork {
|
||||
// Add only if it is an insert not an update
|
||||
n, ok := nDB.networks[nDB.config.NodeID][nid]
|
||||
n, ok := nDB.thisNodeNetworks[nid]
|
||||
if ok {
|
||||
n.entriesNumber.Add(1)
|
||||
}
|
||||
@@ -777,7 +797,7 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwo
|
||||
nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Delete([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)))
|
||||
if okNetwork {
|
||||
// Remove only if the delete is successful
|
||||
n, ok := nDB.networks[nDB.config.NodeID][nid]
|
||||
n, ok := nDB.thisNodeNetworks[nid]
|
||||
if ok {
|
||||
n.entriesNumber.Add(-1)
|
||||
}
|
||||
|
||||
@@ -6,10 +6,13 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
cerrdefs "github.com/containerd/errdefs"
|
||||
"github.com/containerd/log"
|
||||
"github.com/docker/docker/pkg/stringid"
|
||||
"github.com/docker/go-events"
|
||||
@@ -27,7 +30,7 @@ func init() {
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
os.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0o644)
|
||||
log.SetLevel("error")
|
||||
log.SetLevel("debug")
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
@@ -85,18 +88,14 @@ func (nDB *NetworkDB) verifyNodeExistence(t *testing.T, node string, present boo
|
||||
nDB.RLock()
|
||||
_, ok := nDB.nodes[node]
|
||||
nDB.RUnlock()
|
||||
if present && ok {
|
||||
return
|
||||
}
|
||||
|
||||
if !present && !ok {
|
||||
if present == ok {
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
t.Errorf("%v(%v): Node existence verification for node %s failed", nDB.config.Hostname, nDB.config.NodeID, node)
|
||||
t.Errorf("%v(%v): expected node %s existence in the cluster = %v, got %v", nDB.config.Hostname, nDB.config.NodeID, node, present, !present)
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
|
||||
@@ -109,51 +108,64 @@ func (nDB *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id strin
|
||||
} else {
|
||||
maxRetries = 80
|
||||
}
|
||||
var ok, leaving bool
|
||||
for i := int64(0); i < maxRetries; i++ {
|
||||
nDB.RLock()
|
||||
nn, nnok := nDB.networks[node]
|
||||
if nnok {
|
||||
n, ok := nn[id]
|
||||
var leaving bool
|
||||
if ok {
|
||||
leaving = n.leaving
|
||||
}
|
||||
nDB.RUnlock()
|
||||
if present && ok {
|
||||
return
|
||||
}
|
||||
|
||||
if !present &&
|
||||
((ok && leaving) ||
|
||||
!ok) {
|
||||
return
|
||||
var vn *network
|
||||
if node == nDB.config.NodeID {
|
||||
if n, ok := nDB.thisNodeNetworks[id]; ok {
|
||||
vn = &n.network
|
||||
}
|
||||
} else {
|
||||
nDB.RUnlock()
|
||||
if nn, nnok := nDB.networks[node]; nnok {
|
||||
if n, ok := nn[id]; ok {
|
||||
vn = n
|
||||
}
|
||||
}
|
||||
}
|
||||
ok = vn != nil
|
||||
leaving = ok && vn.leaving
|
||||
nDB.RUnlock()
|
||||
|
||||
if present == (ok && !leaving) {
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(sleepInterval)
|
||||
}
|
||||
|
||||
t.Error("Network existence verification failed")
|
||||
if present {
|
||||
t.Errorf("%v(%v): want node %v to be a member of network %q, got that it is not a member (ok=%v, leaving=%v)",
|
||||
nDB.config.Hostname, nDB.config.NodeID, node, id, ok, leaving)
|
||||
} else {
|
||||
t.Errorf("%v(%v): want node %v to not be a member of network %q, got that it is a member (ok=%v, leaving=%v)",
|
||||
nDB.config.Hostname, nDB.config.NodeID, node, id, ok, leaving)
|
||||
}
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
|
||||
t.Helper()
|
||||
n := 80
|
||||
var v []byte
|
||||
for i := 0; i < n; i++ {
|
||||
v, err := nDB.GetEntry(tname, nid, key)
|
||||
var err error
|
||||
v, err = nDB.GetEntry(tname, nid, key)
|
||||
if present && err == nil && string(v) == value {
|
||||
return
|
||||
}
|
||||
if err != nil && !present {
|
||||
if cerrdefs.IsNotFound(err) && !present {
|
||||
return
|
||||
}
|
||||
if err != nil && !cerrdefs.IsNotFound(err) {
|
||||
t.Errorf("%v(%v): unexpected error while getting entry %v/%v in network %q: %v",
|
||||
nDB.config.Hostname, nDB.config.NodeID, tname, key, nid, err)
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
t.Errorf("Entry existence verification test failed for %v(%v)", nDB.config.Hostname, nDB.config.NodeID)
|
||||
t.Errorf("%v(%v): want entry %v/%v in network %q to be (present=%v, value=%q), got (present=%v, value=%q)",
|
||||
nDB.config.Hostname, nDB.config.NodeID, tname, key, nid, present, value, !present, string(v))
|
||||
}
|
||||
|
||||
func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
|
||||
@@ -276,6 +288,19 @@ func TestNetworkDBCRUDTableEntry(t *testing.T) {
|
||||
closeNetworkDBInstances(t, dbs)
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) dumpTable(t *testing.T, tname string) {
|
||||
t.Helper()
|
||||
var b strings.Builder
|
||||
tw := tabwriter.NewWriter(&b, 10, 1, 1, ' ', 0)
|
||||
tw.Write([]byte("NetworkID\tKey\tValue\tFlags\n"))
|
||||
nDB.WalkTable(tname, func(nid, key string, value []byte, deleting bool) bool {
|
||||
fmt.Fprintf(tw, "%s\t%s\t%s\t%v\n", nid, key, string(value), map[bool]string{true: "D"}[deleting])
|
||||
return false
|
||||
})
|
||||
tw.Flush()
|
||||
t.Logf("%s(%s): Table %s:\n%s", nDB.config.Hostname, nDB.config.NodeID, tname, b.String())
|
||||
}
|
||||
|
||||
func TestNetworkDBCRUDTableEntries(t *testing.T) {
|
||||
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
|
||||
|
||||
@@ -304,18 +329,24 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) {
|
||||
assert.NilError(t, err)
|
||||
}
|
||||
|
||||
for n := range dbs {
|
||||
dbs[n].dumpTable(t, "test_table")
|
||||
}
|
||||
|
||||
for i := 1; i <= n; i++ {
|
||||
dbs[0].verifyEntryExistence(t, "test_table", "network1",
|
||||
fmt.Sprintf("test_key1%d", i),
|
||||
fmt.Sprintf("test_value1%d", i), true)
|
||||
assert.NilError(t, err)
|
||||
}
|
||||
|
||||
for i := 1; i <= n; i++ {
|
||||
dbs[1].verifyEntryExistence(t, "test_table", "network1",
|
||||
fmt.Sprintf("test_key0%d", i),
|
||||
fmt.Sprintf("test_value0%d", i), true)
|
||||
assert.NilError(t, err)
|
||||
}
|
||||
|
||||
for n := range dbs {
|
||||
dbs[n].dumpTable(t, "test_table")
|
||||
}
|
||||
|
||||
// Verify deletes
|
||||
@@ -334,13 +365,15 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) {
|
||||
for i := 1; i <= n; i++ {
|
||||
dbs[0].verifyEntryExistence(t, "test_table", "network1",
|
||||
fmt.Sprintf("test_key1%d", i), "", false)
|
||||
assert.NilError(t, err)
|
||||
}
|
||||
|
||||
for i := 1; i <= n; i++ {
|
||||
dbs[1].verifyEntryExistence(t, "test_table", "network1",
|
||||
fmt.Sprintf("test_key0%d", i), "", false)
|
||||
assert.NilError(t, err)
|
||||
}
|
||||
|
||||
for n := range dbs {
|
||||
dbs[n].dumpTable(t, "test_table")
|
||||
}
|
||||
|
||||
closeNetworkDBInstances(t, dbs)
|
||||
@@ -546,15 +579,15 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
|
||||
// Wait for the propagation on db[0]
|
||||
dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
|
||||
witness0("network1", 2)
|
||||
if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
|
||||
t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
|
||||
if n, ok := dbs[0].thisNodeNetworks["network1"]; !ok || n.leaving {
|
||||
t.Fatal("The network should not be marked as leaving")
|
||||
}
|
||||
|
||||
// Wait for the propagation on db[1]
|
||||
dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
|
||||
witness1("network1", 2)
|
||||
if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
|
||||
t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
|
||||
if n, ok := dbs[1].thisNodeNetworks["network1"]; !ok || n.leaving {
|
||||
t.Fatal("The network should not be marked as leaving")
|
||||
}
|
||||
|
||||
// Try a quick leave/join
|
||||
@@ -599,7 +632,7 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
|
||||
}
|
||||
for i := 0; i < 2; i++ {
|
||||
dbs[i].Lock()
|
||||
assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
|
||||
assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].thisNodeNetworks["network1"].entriesNumber.Load()), "entries number should match")
|
||||
dbs[i].Unlock()
|
||||
}
|
||||
|
||||
@@ -610,14 +643,14 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
|
||||
assert.NilError(t, err)
|
||||
for i := 0; i < 3; i++ {
|
||||
dbs[i].Lock()
|
||||
assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
|
||||
assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].thisNodeNetworks["network1"].entriesNumber.Load()), "entries number should match")
|
||||
dbs[i].Unlock()
|
||||
}
|
||||
// at this point the entries should had been all deleted
|
||||
time.Sleep(30 * time.Second)
|
||||
for i := 0; i < 3; i++ {
|
||||
dbs[i].Lock()
|
||||
assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
|
||||
assert.Check(t, is.Equal(int64(0), dbs[i].thisNodeNetworks["network1"].entriesNumber.Load()), "entries should had been garbage collected")
|
||||
dbs[i].Unlock()
|
||||
}
|
||||
|
||||
@@ -625,7 +658,7 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
|
||||
time.Sleep(15 * time.Second)
|
||||
for i := 0; i < 3; i++ {
|
||||
dbs[i].Lock()
|
||||
assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
|
||||
assert.Check(t, is.Equal(int64(0), dbs[i].thisNodeNetworks["network1"].entriesNumber.Load()), "entries should had been garbage collected")
|
||||
dbs[i].Unlock()
|
||||
}
|
||||
|
||||
|
||||
@@ -439,8 +439,7 @@ func (nDB *NetworkDB) dbNetworkStats(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
nDB.RLock()
|
||||
networks := nDB.networks[nDB.config.NodeID]
|
||||
network, ok := networks[r.Form["nid"][0]]
|
||||
network, ok := nDB.thisNodeNetworks[r.Form["nid"][0]]
|
||||
|
||||
entries := -1
|
||||
qLen := -1
|
||||
|
||||
Reference in New Issue
Block a user