package networkdb //go:generate protoc -I=. -I=../../../vendor/ --gogofaster_out=import_path=github.com/docker/docker/daemon/libnetwork/networkdb:. networkdb.proto import ( "context" cryptorand "crypto/rand" "fmt" "math/rand/v2" "net/netip" "os" "slices" "strings" "sync" "sync/atomic" "time" "github.com/containerd/log" "github.com/docker/go-events" iradix "github.com/hashicorp/go-immutable-radix/v2" "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" "github.com/moby/moby/v2/daemon/internal/stringid" "github.com/moby/moby/v2/daemon/libnetwork/types" ) const ( byTable int = 1 + iota byNetwork ) // NetworkDB instance drives the networkdb cluster and acts the broker // for cluster-scoped and network-scoped gossip and watches. type NetworkDB struct { // The clocks MUST be the first things // in this struct due to Golang issue #599. // Global lamport clock for node network attach events. networkClock serf.LamportClock // Global lamport clock for table events. tableClock serf.LamportClock sync.RWMutex // NetworkDB configuration. config *Config // All the tree index (byTable, byNetwork) that we maintain // the db. indexes map[int]*iradix.Tree[*entry] // Memberlist we use to drive the cluster. memberlist *memberlist.Memberlist // List of all peer nodes in the cluster not-limited to any // network. nodes map[string]*node // An approximation of len(nodes) that can be accessed without // synchronization. estNodes atomic.Int32 // List of all peer nodes which have failed failedNodes map[string]*node // List of all peer nodes which have left leftNodes map[string]*node // A multi-dimensional map of network/node attachments for peer nodes. // The first key is a node name and the second key is a network ID for // the network that node is participating in. networks map[string]map[string]*network // A map of this node's network attachments. thisNodeNetworks map[string]*thisNodeNetwork // A map of nodes which are participating in a given // network. The key is a network ID. networkNodes map[string][]string // A table of ack channels for every node from which we are // waiting for an ack. bulkSyncAckTbl map[string]chan struct{} // Broadcast queue for network event gossip. networkBroadcasts *memberlist.TransmitLimitedQueue // Broadcast queue for node event gossip. nodeBroadcasts *memberlist.TransmitLimitedQueue // A central context to stop all go routines running on // behalf of the NetworkDB instance. ctx context.Context cancelCtx context.CancelFunc // A central broadcaster for all local watchers watching table // events. broadcaster *events.Broadcaster // List of all tickers which needed to be stopped when // cleaning up. tickers []*time.Ticker // Reference to the memberlist's keyring to add & remove keys keyring *memberlist.Keyring // bootStrapIP is the list of IPs that can be used to bootstrap // the gossip. bootStrapIP []string // lastStatsTimestamp is the last timestamp when the stats got printed lastStatsTimestamp time.Time // lastHealthTimestamp is the last timestamp when the health score got printed lastHealthTimestamp time.Time rngMu sync.Mutex rng *rand.Rand } // PeerInfo represents the peer (gossip cluster) nodes of a network type PeerInfo struct { Name string IP netip.Addr } // PeerClusterInfo represents the peer (gossip cluster) nodes type PeerClusterInfo struct { PeerInfo } type node struct { memberlist.Node ltime serf.LamportTime // Number of hours left before the reaper removes the node reapTime time.Duration } // network describes the node/network attachment. type network struct { // Lamport time for the latest state of the entry. ltime serf.LamportTime // Node leave is in progress. leaving bool // Number of seconds still left before a deleted network entry gets // removed from networkDB reapTime time.Duration } // thisNodeNetwork describes a network attachment on the local node. type thisNodeNetwork struct { network // Gets set to true after the first bulk sync happens inSync bool // The broadcast queue for this network's table event gossip // for entries owned by this node. tableBroadcasts *memberlist.TransmitLimitedQueue // The broadcast queue for this network's table event gossip // relayed from other nodes. // // Messages in this queue are broadcasted when there is space available // in the gossip packet after filling it with tableBroadcast messages. // Relayed messages are broadcasted at a lower priority than messages // originating from this node to ensure that local messages are always // broadcasted in a timely manner, irrespective of how many messages // from other nodes are queued for rebroadcasting. tableRebroadcasts *memberlist.TransmitLimitedQueue // Number of gossip messages sent related to this network during the last stats collection period qMessagesSent atomic.Int64 // Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network. // Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod // interval entriesNumber atomic.Int64 // An approximation of len(nDB.networkNodes[nid]) that can be accessed // without synchronization. networkNodes atomic.Int32 } // Config represents the configuration of the networkdb instance and // can be passed by the caller. type Config struct { // NodeID is the node unique identifier of the node when is part of the cluster NodeID string // Hostname is the node hostname. Hostname string // BindAddr is the IP on which networkdb listens. It can be // 0.0.0.0 to listen on all addresses on the host. BindAddr string // AdvertiseAddr is the node's IP address that we advertise for // cluster communication. AdvertiseAddr string // BindPort is the local node's port to which we bind to for // cluster communication. BindPort int // Keys to be added to the Keyring of the memberlist. Key at index // 0 is the primary key Keys [][]byte // PacketBufferSize is the maximum number of bytes that memberlist will // put in a packet (this will be for UDP packets by default with a NetTransport). // A safe value for this is typically 1400 bytes (which is the default). However, // depending on your network's MTU (Maximum Transmission Unit) you may // be able to increase this to get more content into each gossip packet. PacketBufferSize int // reapEntryInterval duration of a deleted entry before being garbage collected reapEntryInterval time.Duration // reapNetworkInterval duration of a deleted network before being garbage collected // NOTE this MUST always be higher than reapEntryInterval reapNetworkInterval time.Duration // rejoinClusterDuration represents retryJoin timeout used by rejoinClusterBootStrap. // Default is 10sec. rejoinClusterDuration time.Duration // rejoinClusterInterval represents interval on which rejoinClusterBootStrap runs. // Default is 60sec. rejoinClusterInterval time.Duration // StatsPrintPeriod the period to use to print queue stats // Default is 5min StatsPrintPeriod time.Duration // HealthPrintPeriod the period to use to print the health score // Default is 1min HealthPrintPeriod time.Duration } // entry defines a table entry type entry struct { // node from which this entry was learned. node string // Lamport time for the most recent update to the entry ltime serf.LamportTime // Opaque value store in the entry value []byte // Deleting the entry is in progress. All entries linger in // the cluster for certain amount of time after deletion. deleting bool // Number of seconds still left before a deleted table entry gets // removed from networkDB reapTime time.Duration } // DefaultConfig returns a NetworkDB config with default values func DefaultConfig() *Config { hostname, _ := os.Hostname() return &Config{ NodeID: stringid.TruncateID(stringid.GenerateRandomID()), Hostname: hostname, BindAddr: "0.0.0.0", PacketBufferSize: 1400, StatsPrintPeriod: 5 * time.Minute, HealthPrintPeriod: 1 * time.Minute, reapEntryInterval: 30 * time.Minute, rejoinClusterDuration: 10 * time.Second, rejoinClusterInterval: 60 * time.Second, } } // New creates a new instance of NetworkDB using the Config passed by // the caller. func New(c *Config) (*NetworkDB, error) { nDB := newNetworkDB(c) log.G(context.TODO()).Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c) if err := nDB.clusterInit(); err != nil { return nil, err } return nDB, nil } func newNetworkDB(c *Config) *NetworkDB { // The garbage collection logic for entries leverage the presence of the network. // For this reason the expiration time of the network is put slightly higher than the entry expiration so that // there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network. c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod var rngSeed [32]byte _, _ = cryptorand.Read(rngSeed[:]) // Documented never to return an error return &NetworkDB{ config: c, indexes: map[int]*iradix.Tree[*entry]{ byTable: iradix.New[*entry](), byNetwork: iradix.New[*entry](), }, networks: make(map[string]map[string]*network), thisNodeNetworks: make(map[string]*thisNodeNetwork), nodes: make(map[string]*node), failedNodes: make(map[string]*node), leftNodes: make(map[string]*node), networkNodes: make(map[string][]string), bulkSyncAckTbl: make(map[string]chan struct{}), broadcaster: events.NewBroadcaster(), rng: rand.New(rand.NewChaCha8(rngSeed)), //gosec:disable G404 -- not used in a security sensitive context } } // Join joins this NetworkDB instance with a list of peer NetworkDB // instances passed by the caller in the form of addr:port func (nDB *NetworkDB) Join(members []string) error { nDB.Lock() nDB.bootStrapIP = append([]string(nil), members...) log.G(context.TODO()).Infof("The new bootstrap node list is:%v", nDB.bootStrapIP) nDB.Unlock() return nDB.clusterJoin(members) } // Close destroys this NetworkDB instance by leave the cluster, // stopping timers, canceling goroutines etc. func (nDB *NetworkDB) Close() { if err := nDB.clusterLeave(); err != nil { log.G(context.TODO()).Errorf("%v(%v) Could not close DB: %v", nDB.config.Hostname, nDB.config.NodeID, err) } // Avoid (*Broadcaster).run goroutine leak nDB.broadcaster.Close() } // ClusterPeers returns all the gossip cluster peers. func (nDB *NetworkDB) ClusterPeers() []PeerInfo { nDB.RLock() defer nDB.RUnlock() peers := make([]PeerInfo, 0, len(nDB.nodes)) for _, node := range nDB.nodes { ip, _ := netip.AddrFromSlice(node.Node.Addr) peers = append(peers, PeerInfo{ Name: node.Name, IP: ip.Unmap(), }) } return peers } // Peers returns the gossip peers for a given network. func (nDB *NetworkDB) Peers(nid string) []PeerInfo { nDB.RLock() defer nDB.RUnlock() peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid])) for _, nodeName := range nDB.networkNodes[nid] { if node, ok := nDB.nodes[nodeName]; ok { ip, _ := netip.AddrFromSlice(node.Node.Addr) peers = append(peers, PeerInfo{ Name: node.Name, IP: ip.Unmap(), }) } else { // Added for testing purposes, this condition should never happen else mean that the network list // is out of sync with the node list peers = append(peers, PeerInfo{Name: nodeName}) } } return peers } // GetEntry retrieves the value of a table entry in a given (network, // table, key) tuple func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) { nDB.RLock() defer nDB.RUnlock() v, err := nDB.getEntry(tname, nid, key) if err != nil { return nil, err } if v != nil && v.deleting { return nil, types.NotFoundErrorf("entry in table %s network id %s and key %s deleted and pending garbage collection", tname, nid, key) } // note: this panics if a nil entry was stored in the table; after // discussion, we decided to not gracefully handle this situation as // this would be an unexpected situation; // see https://github.com/moby/moby/pull/48157#discussion_r1674428635 return v.value, nil } func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) { e, ok := nDB.indexes[byTable].Get(fmt.Appendf(nil, "/%s/%s/%s", tname, nid, key)) if !ok { return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key) } return e, nil } // CreateEntry creates a table entry in NetworkDB for given (network, // table, key) tuple and if the NetworkDB is part of the cluster // propagates this event to the cluster. It is an error to create an // entry for the same tuple for which there is already an existing // entry unless the current entry is deleting state. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { nDB.Lock() oldEntry, err := nDB.getEntry(tname, nid, key) if err == nil || (oldEntry != nil && !oldEntry.deleting) { nDB.Unlock() return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key) } entry := &entry{ ltime: nDB.tableClock.Increment(), node: nDB.config.NodeID, value: value, } nDB.createOrUpdateEntry(nid, tname, key, entry) nDB.Unlock() if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil { return fmt.Errorf("cannot send create event for table %s, %v", tname, err) } return nil } // UpdateEntry updates a table entry in NetworkDB for given (network, // table, key) tuple and if the NetworkDB is part of the cluster // propagates this event to the cluster. It is an error to update a // non-existent entry. func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error { nDB.Lock() if _, err := nDB.getEntry(tname, nid, key); err != nil { nDB.Unlock() return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key) } entry := &entry{ ltime: nDB.tableClock.Increment(), node: nDB.config.NodeID, value: value, } nDB.createOrUpdateEntry(nid, tname, key, entry) nDB.Unlock() if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil { return fmt.Errorf("cannot send table update event: %v", err) } return nil } // TableElem elem type TableElem struct { Value []byte owner string } // GetTableByNetwork walks the networkdb by the give table and network id and // returns a map of keys and values func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem { nDB.RLock() root := nDB.indexes[byTable].Root() nDB.RUnlock() entries := make(map[string]*TableElem) root.WalkPrefix(fmt.Appendf(nil, "/%s/%s", tname, nid), func(k []byte, v *entry) bool { if v.deleting { return false } key := string(k) key = key[strings.LastIndex(key, "/")+1:] entries[key] = &TableElem{Value: v.value, owner: v.node} return false }) return entries } // DeleteEntry deletes a table entry in NetworkDB for given (network, // table, key) tuple and if the NetworkDB is part of the cluster // propagates this event to the cluster. func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { nDB.Lock() oldEntry, err := nDB.getEntry(tname, nid, key) if err != nil || oldEntry == nil || oldEntry.deleting { nDB.Unlock() return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+ "does not exist or is already being deleted", tname, nid, key) } entry := &entry{ ltime: nDB.tableClock.Increment(), node: nDB.config.NodeID, value: oldEntry.value, deleting: true, reapTime: nDB.config.reapEntryInterval, } nDB.createOrUpdateEntry(nid, tname, key, entry) nDB.Unlock() if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil { return fmt.Errorf("cannot send table delete event: %v", err) } return nil } func (nDB *NetworkDB) deleteNodeFromNetworks(deletedNode string) { for nid, nodes := range nDB.networkNodes { updatedNodes := make([]string, 0, len(nodes)) for _, node := range nodes { if node == deletedNode { continue } updatedNodes = append(updatedNodes, node) } nDB.networkNodes[nid] = updatedNodes } delete(nDB.networks, deletedNode) } // deleteNodeNetworkEntries deletes all table entries for a network owned by // node from the local store. func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { nDB.indexes[byNetwork].Root().WalkPrefix([]byte("/"+nid), func(path []byte, oldEntry *entry) bool { // Do nothing if the entry is owned by a remote node that is not leaving the network // because the event is triggered for a node that does not own this entry. if oldEntry.node != node { return false } params := strings.Split(string(path[1:]), "/") nwID, tName, key := params[0], params[1], params[2] nDB.deleteEntry(nwID, tName, key) // Notify to the upper layer only entries not already marked for deletion if !oldEntry.deleting { nDB.broadcaster.Write(WatchEvent{ Table: tName, NetworkID: nwID, Key: key, Prev: oldEntry.value, }) } return false }) } // deleteNodeTableEntries deletes all table entries owned by node from the local // store, across all networks. func (nDB *NetworkDB) deleteNodeTableEntries(node string) { nDB.indexes[byTable].Root().Walk(func(path []byte, oldEntry *entry) bool { if oldEntry.node != node { return false } params := strings.Split(string(path[1:]), "/") tName, nwID, key := params[0], params[1], params[2] nDB.deleteEntry(nwID, tName, key) if !oldEntry.deleting { nDB.broadcaster.Write(WatchEvent{ Table: tName, NetworkID: nwID, Key: key, Prev: oldEntry.value, }) } return false }) } // WalkTable walks a single table in NetworkDB and invokes the passed // function for each entry in the table passing the network, key, // value. The walk stops if the passed function returns a true. func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error { nDB.RLock() root := nDB.indexes[byTable].Root() nDB.RUnlock() root.WalkPrefix([]byte("/"+tname), func(path []byte, v *entry) bool { params := strings.Split(string(path[1:]), "/") nid := params[1] key := params[2] return fn(nid, key, v.value, v.deleting) }) return nil } // JoinNetwork joins this node to a given network and propagates this // event across the cluster. This triggers this node joining the // sub-cluster of this network and participates in the network-scoped // gossip and bulk sync for this network. func (nDB *NetworkDB) JoinNetwork(nid string) error { ltime := nDB.networkClock.Increment() nDB.Lock() n, ok := nDB.thisNodeNetworks[nid] if ok { if !n.leaving { nDB.Unlock() return fmt.Errorf("networkdb: network %s is already joined", nid) } n.network = network{ltime: ltime} n.inSync = false } else { n = &thisNodeNetwork{ network: network{ltime: ltime}, tableBroadcasts: &memberlist.TransmitLimitedQueue{ RetransmitMult: 4, }, tableRebroadcasts: &memberlist.TransmitLimitedQueue{ RetransmitMult: 4, }, } numNodes := func() int { return int(n.networkNodes.Load()) } n.tableBroadcasts.NumNodes = numNodes n.tableRebroadcasts.NumNodes = numNodes } nDB.addNetworkNode(nid, nDB.config.NodeID) if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil { nDB.Unlock() return fmt.Errorf("failed to send join network event for %s: %v", nid, err) } networkNodes := nDB.networkNodes[nid] n.networkNodes.Store(int32(len(networkNodes))) nDB.thisNodeNetworks[nid] = n nDB.Unlock() log.G(context.TODO()).Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid) if _, err := nDB.bulkSync(networkNodes, true); err != nil { log.G(context.TODO()).Errorf("Error bulk syncing while joining network %s: %v", nid, err) } // Mark the network as being synced // note this is a best effort, we are not checking the result of the bulk sync nDB.Lock() n.inSync = true nDB.Unlock() return nil } // LeaveNetwork leaves this node from a given network and propagates // this event across the cluster. This triggers this node leaving the // sub-cluster of this network and as a result will no longer // participate in the network-scoped gossip and bulk sync for this // network. Also remove all the table entries for this network from // networkdb func (nDB *NetworkDB) LeaveNetwork(nid string) error { ltime := nDB.networkClock.Increment() if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil { return fmt.Errorf("failed to send leave network event for %s: %v", nid, err) } nDB.Lock() defer nDB.Unlock() // Remove myself from the list of the nodes participating to the network nDB.deleteNetworkNode(nid, nDB.config.NodeID) // Mark all the local entries for deletion // so that if we rejoin the network // before another node has received the network-leave notification, // the old entries owned by us will still be purged as expected. // Delete all the remote entries from our local store // without leaving any tombstone. // This ensures that we will accept the CREATE events // for entries owned by remote nodes // if we later rejoin the network. nDB.indexes[byNetwork].Root().WalkPrefix([]byte("/"+nid), func(path []byte, oldEntry *entry) bool { owned := oldEntry.node == nDB.config.NodeID if owned && oldEntry.deleting { return false } params := strings.Split(string(path[1:]), "/") nwID, tName, key := params[0], params[1], params[2] if owned { newEntry := &entry{ ltime: nDB.tableClock.Increment(), node: oldEntry.node, value: oldEntry.value, deleting: true, reapTime: nDB.config.reapEntryInterval, } nDB.createOrUpdateEntry(nwID, tName, key, newEntry) } else { nDB.deleteEntry(nwID, tName, key) } if !oldEntry.deleting { nDB.broadcaster.Write(WatchEvent{ Table: tName, NetworkID: nwID, Key: key, Prev: oldEntry.value, }) } return false }) n, ok := nDB.thisNodeNetworks[nid] if !ok { return fmt.Errorf("could not find network %s while trying to leave", nid) } log.G(context.TODO()).Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid) n.ltime = ltime n.reapTime = nDB.config.reapNetworkInterval n.leaving = true return nil } // addNetworkNode adds the node to the list of nodes which participate // in the passed network only if it is not already present. Caller // should hold the NetworkDB lock while calling this func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) { if slices.Contains(nDB.networkNodes[nid], nodeName) { return } nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName) if n, ok := nDB.thisNodeNetworks[nid]; ok { n.networkNodes.Store(int32(len(nDB.networkNodes[nid]))) } } // Deletes the node from the list of nodes which participate in the // passed network. Caller should hold the NetworkDB lock while calling // this func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) { nodes, ok := nDB.networkNodes[nid] if !ok || len(nodes) == 0 { return } newNodes := make([]string, 0, len(nodes)-1) for _, name := range nodes { if name == nodeName { continue } newNodes = append(newNodes, name) } nDB.networkNodes[nid] = newNodes if n, ok := nDB.thisNodeNetworks[nid]; ok { n.networkNodes.Store(int32(len(newNodes))) } } // findCommonNetworks find the networks that both this node and the // passed node have joined. func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string { nDB.RLock() defer nDB.RUnlock() var networks []string for nid := range nDB.thisNodeNetworks { if n, ok := nDB.networks[nodeName][nid]; ok { if !n.leaving { networks = append(networks, nid) } } } return networks } func (nDB *NetworkDB) updateLocalNetworkTime() { nDB.Lock() defer nDB.Unlock() ltime := nDB.networkClock.Increment() for _, n := range nDB.thisNodeNetworks { n.ltime = ltime } } // createOrUpdateEntry this function handles the creation or update of entries into the local // tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated) func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, v *entry) (okTable bool, okNetwork bool) { nDB.indexes[byTable], _, okTable = nDB.indexes[byTable].Insert(fmt.Appendf(nil, "/%s/%s/%s", tname, nid, key), v) nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Insert(fmt.Appendf(nil, "/%s/%s/%s", nid, tname, key), v) if !okNetwork { // Add only if it is an insert not an update n, ok := nDB.thisNodeNetworks[nid] if ok { n.entriesNumber.Add(1) } } return okTable, okNetwork } // deleteEntry this function handles the deletion of entries into the local tree store. // It is also used to keep in sync the entries number of the network (all tables are aggregated) func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwork bool) { nDB.indexes[byTable], _, okTable = nDB.indexes[byTable].Delete(fmt.Appendf(nil, "/%s/%s/%s", tname, nid, key)) nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Delete(fmt.Appendf(nil, "/%s/%s/%s", nid, tname, key)) if okNetwork { // Remove only if the delete is successful n, ok := nDB.thisNodeNetworks[nid] if ok { n.entriesNumber.Add(-1) } } return okTable, okNetwork }