Compare commits

...

15 Commits

Author SHA1 Message Date
Paweł Gronowski
6a1fb46d48 Merge pull request #50169 from robmry/revert_overlay_refactoring
[28.x]: Revert overlay bug fixes / refactoring
2025-06-13 15:49:07 +00:00
Rob Murray
7acb079403 Revert "libn/networkdb: don't exceed broadcast size limit"
This reverts commit dacf445614.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:49 +01:00
Rob Murray
0df31cf585 Revert "libn/networkdb: fix data race in GetTableByNetwork"
This reverts commit ec65f2d21b.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:48 +01:00
Rob Murray
83b2fc245d Revert "Fix possible overlapping IPs when ingressNA == nil"
This reverts commit 56ad941564.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:46 +01:00
Rob Murray
e079583ab4 Revert "libnetwork/networkdb: use correct index in GetTableByNetwork"
This reverts commit d5c370dee6.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:45 +01:00
Rob Murray
cfd5e5e4d4 Revert "libn/networkdb: b'cast watch events from local POV"
This reverts commit c68671d908.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:44 +01:00
Rob Murray
576cf73add Revert "libn/networkdb: record tombstones for all deletes"
This reverts commit ada8bc3695.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:43 +01:00
Rob Murray
2297ae3e64 Revert "libn/networkdb: Watch() without race conditions"
This reverts commit a3aea15257.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:41 +01:00
Rob Murray
cc60ec8d3c Revert "libn/networkdb: stop table events from racing network leaves"
This reverts commit 270a4d41dc.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:40 +01:00
Rob Murray
b5b349dbd6 Revert "libn/osl: drop unused AddNeighbor force parameter"
This reverts commit 3bdf99d127.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:39 +01:00
Rob Murray
35916f0869 Revert "libn/osl: refactor func (*Namespace) AddNeighbor"
This reverts commit b6d76eb572.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:38 +01:00
Rob Murray
3eb59ba5a2 Revert "libnetwork/osl: remove superfluous locks in Namespace"
This reverts commit 9866738736.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:37 +01:00
Rob Murray
5d6ae34753 Revert "libnetwork/osl: stop tracking neighbor entries"
This reverts commit 0d6e7cd983.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:36 +01:00
Rob Murray
ea818a7f6f Revert "libnetwork/internal/setmatrix: make keys generic"
This reverts commit 0317f773a6.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:33 +01:00
Rob Murray
78ccc20545 Revert "libn/d/overlay: use netip types more"
This reverts commit d188df0039.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:26 +01:00
24 changed files with 509 additions and 788 deletions

View File

@@ -213,35 +213,36 @@ func (e *executor) Configure(ctx context.Context, node *api.Node) error {
if ingressNA == nil {
e.backend.ReleaseIngress()
} else {
options := network.CreateOptions{
Driver: ingressNA.Network.DriverState.Name,
IPAM: &network.IPAM{
Driver: ingressNA.Network.IPAM.Driver.Name,
},
Options: ingressNA.Network.DriverState.Options,
Ingress: true,
}
return e.backend.GetAttachmentStore().ResetAttachments(attachments)
}
for _, ic := range ingressNA.Network.IPAM.Configs {
c := network.IPAMConfig{
Subnet: ic.Subnet,
IPRange: ic.Range,
Gateway: ic.Gateway,
}
options.IPAM.Config = append(options.IPAM.Config, c)
}
options := network.CreateOptions{
Driver: ingressNA.Network.DriverState.Name,
IPAM: &network.IPAM{
Driver: ingressNA.Network.IPAM.Driver.Name,
},
Options: ingressNA.Network.DriverState.Options,
Ingress: true,
}
_, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
ID: ingressNA.Network.ID,
CreateRequest: network.CreateRequest{
Name: ingressNA.Network.Spec.Annotations.Name,
CreateOptions: options,
},
}, ingressNA.Addresses[0])
if err != nil {
return err
for _, ic := range ingressNA.Network.IPAM.Configs {
c := network.IPAMConfig{
Subnet: ic.Subnet,
IPRange: ic.Range,
Gateway: ic.Gateway,
}
options.IPAM.Config = append(options.IPAM.Config, c)
}
_, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
ID: ingressNA.Network.ID,
CreateRequest: network.CreateRequest{
Name: ingressNA.Network.Spec.Annotations.Name,
CreateOptions: options,
},
}, ingressNA.Addresses[0])
if err != nil {
return err
}
var (

View File

@@ -777,6 +777,23 @@ func (n *Network) addDriverWatches() {
agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
agent.mu.Unlock()
go c.handleTableEvents(ch, n.handleDriverTableEvent)
d, err := n.driver(false)
if err != nil {
log.G(context.TODO()).Errorf("Could not resolve driver %s while walking driver table: %v", n.networkType, err)
return
}
err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
// skip the entries that are mark for deletion, this is safe because this function is
// called at initialization time so there is no state to delete
if nid == n.ID() && !deleted {
d.EventNotify(driverapi.Create, nid, table.name, key, value)
}
return false
})
if err != nil {
log.G(context.TODO()).WithError(err).Warn("Error while walking networkdb")
}
}
}
@@ -894,7 +911,7 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
err := proto.Unmarshal(value, &epRec)
if err != nil {
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
log.G(context.TODO()).Errorf("Failed to unmarshal service table value: %v", err)
return
}
@@ -907,54 +924,53 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
serviceAliases := epRec.Aliases
taskAliases := epRec.TaskAliases
logger := log.G(context.TODO()).WithFields(log.Fields{
"nid": nid,
"eid": eid,
"T": fmt.Sprintf("%T", ev),
"R": epRec,
})
if containerName == "" || ip == nil {
logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
log.G(context.TODO()).Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
}
logger.Debug("handleEpTableEvent")
switch ev.(type) {
case networkdb.CreateEvent, networkdb.UpdateEvent:
case networkdb.CreateEvent:
log.G(context.TODO()).Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if epRec.ServiceDisabled {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
logger.WithError(err).Error("failed disabling service binding")
return
}
} else {
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logger.WithError(err).Error("failed adding service binding")
return
}
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
log.G(context.TODO()).Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logger.WithError(err).Errorf("failed adding container name resolution")
log.G(context.TODO()).Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
case networkdb.DeleteEvent:
log.G(context.TODO()).Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
logger.WithError(err).Error("failed removing service binding")
log.G(context.TODO()).Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logger.WithError(err).Errorf("failed removing container name resolution")
log.G(context.TODO()).Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
case networkdb.UpdateEvent:
log.G(context.TODO()).Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
// We currently should only get these to inform us that an endpoint
// is disabled. Report if otherwise.
if svcID == "" || !epRec.ServiceDisabled {
log.G(context.TODO()).Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
return
}
// This is a remote task that is part of a service that is now disabled
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
log.G(context.TODO()).Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
}
}

View File

@@ -11,7 +11,6 @@ import (
"fmt"
"hash/fnv"
"net"
"net/netip"
"strconv"
"sync"
"syscall"
@@ -92,7 +91,7 @@ func (s *spi) String() string {
}
type encrMap struct {
nodes map[netip.Addr][]*spi
nodes map[string][]*spi
sync.Mutex
}
@@ -102,7 +101,7 @@ func (e *encrMap) String() string {
b := new(bytes.Buffer)
for k, v := range e.nodes {
b.WriteString("\n")
b.WriteString(k.String())
b.WriteString(k)
b.WriteString(":")
b.WriteString("[")
for _, s := range v {
@@ -114,7 +113,7 @@ func (e *encrMap) String() string {
return b.String()
}
func (d *driver) checkEncryption(nid string, rIP netip.Addr, isLocal, add bool) error {
func (d *driver) checkEncryption(nid string, rIP net.IP, isLocal, add bool) error {
log.G(context.TODO()).Debugf("checkEncryption(%.7s, %v, %t)", nid, rIP, isLocal)
n := d.network(nid)
@@ -128,13 +127,13 @@ func (d *driver) checkEncryption(nid string, rIP netip.Addr, isLocal, add bool)
lIP := d.bindAddress
aIP := d.advertiseAddress
nodes := map[netip.Addr]struct{}{}
nodes := map[string]net.IP{}
switch {
case isLocal:
if err := d.peerDbNetworkWalk(nid, func(_ netip.Addr, _ net.HardwareAddr, pEntry *peerEntry) bool {
if aIP != pEntry.vtep {
nodes[pEntry.vtep] = struct{}{}
if err := d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
if !aIP.Equal(pEntry.vtep) {
nodes[pEntry.vtep.String()] = pEntry.vtep
}
return false
}); err != nil {
@@ -142,14 +141,14 @@ func (d *driver) checkEncryption(nid string, rIP netip.Addr, isLocal, add bool)
}
default:
if len(d.network(nid).endpoints) > 0 {
nodes[rIP] = struct{}{}
nodes[rIP.String()] = rIP
}
}
log.G(context.TODO()).Debugf("List of nodes: %s", nodes)
if add {
for rIP := range nodes {
for _, rIP := range nodes {
if err := setupEncryption(lIP, aIP, rIP, d.secMap, d.keys); err != nil {
log.G(context.TODO()).Warnf("Failed to program network encryption between %s and %s: %v", lIP, rIP, err)
}
@@ -167,18 +166,19 @@ func (d *driver) checkEncryption(nid string, rIP netip.Addr, isLocal, add bool)
// setupEncryption programs the encryption parameters for secure communication
// between the local node and a remote node.
func setupEncryption(localIP, advIP, remoteIP netip.Addr, em *encrMap, keys []*key) error {
func setupEncryption(localIP, advIP, remoteIP net.IP, em *encrMap, keys []*key) error {
log.G(context.TODO()).Debugf("Programming encryption between %s and %s", localIP, remoteIP)
rIPs := remoteIP.String()
indices := make([]*spi, 0, len(keys))
for i, k := range keys {
spis := &spi{buildSPI(advIP.AsSlice(), remoteIP.AsSlice(), k.tag), buildSPI(remoteIP.AsSlice(), advIP.AsSlice(), k.tag)}
spis := &spi{buildSPI(advIP, remoteIP, k.tag), buildSPI(remoteIP, advIP, k.tag)}
dir := reverse
if i == 0 {
dir = bidir
}
fSA, rSA, err := programSA(localIP.AsSlice(), remoteIP.AsSlice(), spis, k, dir, true)
fSA, rSA, err := programSA(localIP, remoteIP, spis, k, dir, true)
if err != nil {
log.G(context.TODO()).Warn(err)
}
@@ -193,15 +193,15 @@ func setupEncryption(localIP, advIP, remoteIP netip.Addr, em *encrMap, keys []*k
}
em.Lock()
em.nodes[remoteIP] = indices
em.nodes[rIPs] = indices
em.Unlock()
return nil
}
func removeEncryption(localIP, remoteIP netip.Addr, em *encrMap) error {
func removeEncryption(localIP, remoteIP net.IP, em *encrMap) error {
em.Lock()
indices, ok := em.nodes[remoteIP]
indices, ok := em.nodes[remoteIP.String()]
em.Unlock()
if !ok {
return nil
@@ -211,7 +211,7 @@ func removeEncryption(localIP, remoteIP netip.Addr, em *encrMap) error {
if i == 0 {
dir = bidir
}
fSA, rSA, err := programSA(localIP.AsSlice(), remoteIP.AsSlice(), idxs, nil, dir, false)
fSA, rSA, err := programSA(localIP, remoteIP, idxs, nil, dir, false)
if err != nil {
log.G(context.TODO()).Warn(err)
}
@@ -478,7 +478,7 @@ func buildAeadAlgo(k *key, s int) *netlink.XfrmStateAlgo {
}
}
func (d *driver) secMapWalk(f func(netip.Addr, []*spi) ([]*spi, bool)) error {
func (d *driver) secMapWalk(f func(string, []*spi) ([]*spi, bool)) error {
d.secMap.Lock()
for node, indices := range d.secMap.nodes {
idxs, stop := f(node, indices)
@@ -499,7 +499,7 @@ func (d *driver) setKeys(keys []*key) error {
// Accept the encryption keys and clear any stale encryption map
d.Lock()
d.keys = keys
d.secMap = &encrMap{nodes: map[netip.Addr][]*spi{}}
d.secMap = &encrMap{nodes: map[string][]*spi{}}
d.Unlock()
log.G(context.TODO()).Debugf("Initial encryption keys: %v", keys)
return nil
@@ -548,8 +548,9 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
return types.InvalidParameterErrorf("attempting to both make a key (index %d) primary and delete it", priIdx)
}
d.secMapWalk(func(rIP netip.Addr, spis []*spi) ([]*spi, bool) {
return updateNodeKey(lIP.AsSlice(), aIP.AsSlice(), rIP.AsSlice(), spis, d.keys, newIdx, priIdx, delIdx), false
d.secMapWalk(func(rIPs string, spis []*spi) ([]*spi, bool) {
rIP := net.ParseIP(rIPs)
return updateNodeKey(lIP, aIP, rIP, spis, d.keys, newIdx, priIdx, delIdx), false
})
// swap primary

View File

@@ -6,12 +6,10 @@ import (
"context"
"fmt"
"net"
"net/netip"
"syscall"
"github.com/containerd/log"
"github.com/docker/docker/libnetwork/driverapi"
"github.com/docker/docker/libnetwork/internal/netiputil"
"github.com/docker/docker/libnetwork/netlabel"
"github.com/docker/docker/libnetwork/ns"
"github.com/docker/docker/libnetwork/osl"
@@ -107,7 +105,7 @@ func (d *driver) Join(ctx context.Context, nid, eid string, sboxKey string, jinf
if sub == s {
continue
}
if err = jinfo.AddStaticRoute(netiputil.ToIPNet(sub.subnetIP), types.NEXTHOP, s.gwIP.Addr().AsSlice()); err != nil {
if err = jinfo.AddStaticRoute(sub.subnetIP, types.NEXTHOP, s.gwIP.IP); err != nil {
log.G(ctx).Errorf("Adding subnet %s static route in network %q failed\n", s.subnetIP, n.id)
}
}
@@ -119,9 +117,9 @@ func (d *driver) Join(ctx context.Context, nid, eid string, sboxKey string, jinf
}
}
d.peerAdd(nid, eid, ep.addr, ep.mac, d.advertiseAddress, true)
d.peerAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, d.advertiseAddress, true)
if err = d.checkEncryption(nid, netip.Addr{}, true, true); err != nil {
if err = d.checkEncryption(nid, nil, true, true); err != nil {
log.G(ctx).Warn(err)
}
@@ -174,34 +172,34 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
// Ignore local peers. We already know about them and they
// should not be added to vxlan fdb.
if addr, _ := netip.ParseAddr(peer.TunnelEndpointIP); addr == d.advertiseAddress {
if net.ParseIP(peer.TunnelEndpointIP).Equal(d.advertiseAddress) {
return
}
addr, err := netip.ParsePrefix(peer.EndpointIP)
addr, err := types.ParseCIDR(peer.EndpointIP)
if err != nil {
log.G(context.TODO()).WithError(err).Errorf("Invalid peer IP %s received in event notify", peer.EndpointIP)
log.G(context.TODO()).Errorf("Invalid peer IP %s received in event notify", peer.EndpointIP)
return
}
mac, err := net.ParseMAC(peer.EndpointMAC)
if err != nil {
log.G(context.TODO()).WithError(err).Errorf("Invalid mac %s received in event notify", peer.EndpointMAC)
log.G(context.TODO()).Errorf("Invalid mac %s received in event notify", peer.EndpointMAC)
return
}
vtep, err := netip.ParseAddr(peer.TunnelEndpointIP)
if err != nil {
log.G(context.TODO()).WithError(err).Errorf("Invalid VTEP %s received in event notify", peer.TunnelEndpointIP)
vtep := net.ParseIP(peer.TunnelEndpointIP)
if vtep == nil {
log.G(context.TODO()).Errorf("Invalid VTEP %s received in event notify", peer.TunnelEndpointIP)
return
}
if etype == driverapi.Delete {
d.peerDelete(nid, eid, addr, mac, vtep, false)
d.peerDelete(nid, eid, addr.IP, addr.Mask, mac, vtep, false)
return
}
d.peerAdd(nid, eid, addr, mac, vtep, false)
d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, false)
}
// Leave method is invoked when a Sandbox detaches from an endpoint.
@@ -221,7 +219,7 @@ func (d *driver) Leave(nid, eid string) error {
return types.InternalMaskableErrorf("could not find endpoint with id %s", eid)
}
d.peerDelete(nid, eid, ep.addr, ep.mac, d.advertiseAddress, true)
d.peerDelete(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, d.advertiseAddress, true)
n.leaveSandbox()

View File

@@ -6,11 +6,9 @@ import (
"context"
"fmt"
"net"
"net/netip"
"github.com/containerd/log"
"github.com/docker/docker/libnetwork/driverapi"
"github.com/docker/docker/libnetwork/internal/netiputil"
"github.com/docker/docker/libnetwork/netutils"
"github.com/docker/docker/libnetwork/ns"
)
@@ -22,7 +20,7 @@ type endpoint struct {
nid string
ifName string
mac net.HardwareAddr
addr netip.Prefix
addr *net.IPNet
}
func (n *network) endpoint(eid string) *endpoint {
@@ -63,13 +61,12 @@ func (d *driver) CreateEndpoint(_ context.Context, nid, eid string, ifInfo drive
}
ep := &endpoint{
id: eid,
nid: n.id,
mac: ifInfo.MacAddress(),
id: eid,
nid: n.id,
addr: ifInfo.Address(),
mac: ifInfo.MacAddress(),
}
var ok bool
ep.addr, ok = netiputil.ToPrefix(ifInfo.Address())
if !ok {
if ep.addr == nil {
return fmt.Errorf("create endpoint was not passed interface IP address")
}
@@ -78,7 +75,7 @@ func (d *driver) CreateEndpoint(_ context.Context, nid, eid string, ifInfo drive
}
if ep.mac == nil {
ep.mac = netutils.GenerateMACFromIP(ep.addr.Addr().AsSlice())
ep.mac = netutils.GenerateMACFromIP(ep.addr.IP)
if err := ifInfo.SetMacAddress(ep.mac); err != nil {
return err
}

View File

@@ -6,7 +6,7 @@ import (
"context"
"errors"
"fmt"
"net/netip"
"net"
"os"
"path/filepath"
"runtime"
@@ -18,7 +18,6 @@ import (
"github.com/docker/docker/internal/nlwrap"
"github.com/docker/docker/libnetwork/driverapi"
"github.com/docker/docker/libnetwork/drivers/overlay/overlayutils"
"github.com/docker/docker/libnetwork/internal/netiputil"
"github.com/docker/docker/libnetwork/netlabel"
"github.com/docker/docker/libnetwork/ns"
"github.com/docker/docker/libnetwork/osl"
@@ -43,8 +42,8 @@ type subnet struct {
brName string
vni uint32
initErr error
subnetIP netip.Prefix
gwIP netip.Prefix
subnetIP *net.IPNet
gwIP *net.IPNet
}
type network struct {
@@ -139,9 +138,11 @@ func (d *driver) CreateNetwork(ctx context.Context, id string, option map[string
}
for i, ipd := range ipV4Data {
s := &subnet{vni: vnis[i]}
s.subnetIP, _ = netiputil.ToPrefix(ipd.Pool)
s.gwIP, _ = netiputil.ToPrefix(ipd.Gateway)
s := &subnet{
subnetIP: ipd.Pool,
gwIP: ipd.Gateway,
vni: vnis[i],
}
n.subnets = append(n.subnets, s)
}
@@ -426,7 +427,7 @@ func (n *network) setupSubnetSandbox(s *subnet, brName, vxlanName string) error
// create a bridge and vxlan device for this subnet and move it to the sandbox
sbox := n.sbox
if err := sbox.AddInterface(context.TODO(), brName, "br", "", osl.WithIPv4Address(netiputil.ToIPNet(s.gwIP)), osl.WithIsBridge(true)); err != nil {
if err := sbox.AddInterface(context.TODO(), brName, "br", "", osl.WithIPv4Address(s.gwIP), osl.WithIsBridge(true)); err != nil {
return fmt.Errorf("bridge creation in sandbox failed for subnet %q: %v", s.subnetIP.String(), err)
}
@@ -613,13 +614,15 @@ func (n *network) sandbox() *osl.Namespace {
}
// getSubnetforIP returns the subnet to which the given IP belongs
func (n *network) getSubnetforIP(ip netip.Prefix) *subnet {
func (n *network) getSubnetforIP(ip *net.IPNet) *subnet {
for _, s := range n.subnets {
// first check if the mask lengths are the same
if s.subnetIP.Bits() != ip.Bits() {
i, _ := s.subnetIP.Mask.Size()
j, _ := ip.Mask.Size()
if i != j {
continue
}
if s.subnetIP.Contains(ip.Addr()) {
if s.subnetIP.Contains(ip.IP) {
return s
}
}

View File

@@ -7,7 +7,7 @@ package overlay
import (
"context"
"fmt"
"net/netip"
"net"
"sync"
"github.com/containerd/log"
@@ -28,7 +28,7 @@ const (
var _ discoverapi.Discover = (*driver)(nil)
type driver struct {
bindAddress, advertiseAddress netip.Addr
bindAddress, advertiseAddress net.IP
config map[string]interface{}
peerDb peerNetworkMap
@@ -48,7 +48,7 @@ func Register(r driverapi.Registerer, config map[string]interface{}) error {
peerDb: peerNetworkMap{
mp: map[string]*peerMap{},
},
secMap: &encrMap{nodes: map[netip.Addr][]*spi{}},
secMap: &encrMap{nodes: map[string][]*spi{}},
config: config,
}
return r.RegisterDriver(NetworkType, d, driverapi.Capability{
@@ -78,17 +78,16 @@ func (d *driver) isIPv6Transport() (bool, error) {
// from the address family of our own advertise address. This is a
// reasonable inference to make as Linux VXLAN links do not support
// mixed-address-family remote peers.
if !d.advertiseAddress.IsValid() {
if d.advertiseAddress == nil {
return false, fmt.Errorf("overlay: cannot determine address family of transport: the local data-plane address is not currently known")
}
return d.advertiseAddress.Is6(), nil
return d.advertiseAddress.To4() == nil, nil
}
func (d *driver) nodeJoin(data discoverapi.NodeDiscoveryData) error {
if data.Self {
advAddr, _ := netip.ParseAddr(data.Address)
bindAddr, _ := netip.ParseAddr(data.BindAddress)
if !advAddr.IsValid() {
advAddr, bindAddr := net.ParseIP(data.Address), net.ParseIP(data.BindAddress)
if advAddr == nil {
return fmt.Errorf("invalid discovery data")
}
d.Lock()

View File

@@ -7,7 +7,6 @@ import (
"context"
"fmt"
"net"
"net/netip"
"sync"
"syscall"
@@ -18,16 +17,52 @@ import (
const ovPeerTable = "overlay_peer_table"
type peerKey struct {
peerIP net.IP
peerMac net.HardwareAddr
}
type peerEntry struct {
eid string
vtep netip.Addr
prefixBits int // number of 1-bits in network mask of peerIP
vtep net.IP
peerIPMask net.IPMask
isLocal bool
}
func (p *peerEntry) MarshalDB() peerEntryDB {
ones, bits := p.peerIPMask.Size()
return peerEntryDB{
eid: p.eid,
vtep: p.vtep.String(),
peerIPMaskOnes: ones,
peerIPMaskBits: bits,
isLocal: p.isLocal,
}
}
// This the structure saved into the set (SetMatrix), due to the implementation of it
// the value inserted in the set has to be Hashable so the []byte had to be converted into
// strings
type peerEntryDB struct {
eid string
vtep string
peerIPMaskOnes int
peerIPMaskBits int
isLocal bool
}
func (p *peerEntryDB) UnMarshalDB() peerEntry {
return peerEntry{
eid: p.eid,
vtep: net.ParseIP(p.vtep),
peerIPMask: net.CIDRMask(p.peerIPMaskOnes, p.peerIPMaskBits),
isLocal: p.isLocal,
}
}
type peerMap struct {
// set of peerEntry, note the values have to be objects and not pointers to maintain the proper equality checks
mp setmatrix.SetMatrix[ipmac, peerEntry]
mp setmatrix.SetMatrix[peerEntryDB]
sync.Mutex
}
@@ -37,7 +72,28 @@ type peerNetworkMap struct {
sync.Mutex
}
func (d *driver) peerDbWalk(f func(string, netip.Addr, net.HardwareAddr, *peerEntry) bool) error {
func (pKey peerKey) String() string {
return fmt.Sprintf("%s %s", pKey.peerIP, pKey.peerMac)
}
func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error {
ipB, err := state.Token(true, nil)
if err != nil {
return err
}
pKey.peerIP = net.ParseIP(string(ipB))
macB, err := state.Token(true, nil)
if err != nil {
return err
}
pKey.peerMac, err = net.ParseMAC(string(macB))
return err
}
func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error {
d.peerDb.Lock()
nids := []string{}
for nid := range d.peerDb.mp {
@@ -46,14 +102,14 @@ func (d *driver) peerDbWalk(f func(string, netip.Addr, net.HardwareAddr, *peerEn
d.peerDb.Unlock()
for _, nid := range nids {
d.peerDbNetworkWalk(nid, func(peerIP netip.Addr, peerMac net.HardwareAddr, pEntry *peerEntry) bool {
return f(nid, peerIP, peerMac, pEntry)
d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
return f(nid, pKey, pEntry)
})
}
return nil
}
func (d *driver) peerDbNetworkWalk(nid string, f func(netip.Addr, net.HardwareAddr, *peerEntry) bool) error {
func (d *driver) peerDbNetworkWalk(nid string, f func(*peerKey, *peerEntry) bool) error {
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
d.peerDb.Unlock()
@@ -62,18 +118,22 @@ func (d *driver) peerDbNetworkWalk(nid string, f func(netip.Addr, net.HardwareAd
return nil
}
mp := map[ipmac]peerEntry{}
mp := map[string]peerEntry{}
pMap.Lock()
for _, pKey := range pMap.mp.Keys() {
entryDBList, ok := pMap.mp.Get(pKey)
for _, pKeyStr := range pMap.mp.Keys() {
entryDBList, ok := pMap.mp.Get(pKeyStr)
if ok {
mp[pKey] = entryDBList[0]
mp[pKeyStr] = entryDBList[0].UnMarshalDB()
}
}
pMap.Unlock()
for pKey, pEntry := range mp {
if f(pKey.ip, pKey.mac.HardwareAddr(), &pEntry) {
for pKeyStr, pEntry := range mp {
var pKey peerKey
if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil {
log.G(context.TODO()).Warnf("Peer key scan on network %s failed: %v", nid, err)
}
if f(&pKey, &pEntry) {
return nil
}
}
@@ -81,14 +141,12 @@ func (d *driver) peerDbNetworkWalk(nid string, f func(netip.Addr, net.HardwareAd
return nil
}
func (d *driver) peerDbSearch(nid string, peerIP netip.Addr) (netip.Addr, net.HardwareAddr, *peerEntry, error) {
var peerIPMatched netip.Addr
var peerMacMatched net.HardwareAddr
func (d *driver) peerDbSearch(nid string, peerIP net.IP) (*peerKey, *peerEntry, error) {
var pKeyMatched *peerKey
var pEntryMatched *peerEntry
err := d.peerDbNetworkWalk(nid, func(ip netip.Addr, mac net.HardwareAddr, pEntry *peerEntry) bool {
if ip == peerIP {
peerIPMatched = ip
peerMacMatched = mac
err := d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
if pKey.peerIP.Equal(peerIP) {
pKeyMatched = pKey
pEntryMatched = pEntry
return true
}
@@ -96,17 +154,17 @@ func (d *driver) peerDbSearch(nid string, peerIP netip.Addr) (netip.Addr, net.Ha
return false
})
if err != nil {
return netip.Addr{}, nil, nil, fmt.Errorf("peerdb search for peer ip %q failed: %v", peerIP, err)
return nil, nil, fmt.Errorf("peerdb search for peer ip %q failed: %v", peerIP, err)
}
if !peerIPMatched.IsValid() || pEntryMatched == nil {
return netip.Addr{}, nil, nil, fmt.Errorf("peer ip %q not found in peerdb", peerIP)
if pKeyMatched == nil || pEntryMatched == nil {
return nil, nil, fmt.Errorf("peer ip %q not found in peerdb", peerIP)
}
return peerIPMatched, peerMacMatched, pEntryMatched, nil
return pKeyMatched, pEntryMatched, nil
}
func (d *driver) peerDbAdd(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, isLocal bool) (bool, int) {
func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, isLocal bool) (bool, int) {
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
if !ok {
@@ -115,27 +173,30 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP netip.Prefix, peerMac net.Har
}
d.peerDb.Unlock()
pKey := ipmacOf(peerIP.Addr(), peerMac)
pKey := peerKey{
peerIP: peerIP,
peerMac: peerMac,
}
pEntry := peerEntry{
eid: eid,
vtep: vtep,
prefixBits: peerIP.Bits(),
peerIPMask: peerIPMask,
isLocal: isLocal,
}
pMap.Lock()
defer pMap.Unlock()
b, i := pMap.mp.Insert(pKey, pEntry)
b, i := pMap.mp.Insert(pKey.String(), pEntry.MarshalDB())
if i != 1 {
// Transient case, there is more than one endpoint that is using the same IP,MAC pair
s, _ := pMap.mp.String(pKey)
s, _ := pMap.mp.String(pKey.String())
log.G(context.TODO()).Warnf("peerDbAdd transient condition - Key:%s cardinality:%d db state:%s", pKey.String(), i, s)
}
return b, i
}
func (d *driver) peerDbDelete(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, isLocal bool) (bool, int) {
func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, isLocal bool) (bool, int) {
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
if !ok {
@@ -144,22 +205,25 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP netip.Prefix, peerMac net.
}
d.peerDb.Unlock()
pKey := ipmacOf(peerIP.Addr(), peerMac)
pKey := peerKey{
peerIP: peerIP,
peerMac: peerMac,
}
pEntry := peerEntry{
eid: eid,
vtep: vtep,
prefixBits: peerIP.Bits(),
peerIPMask: peerIPMask,
isLocal: isLocal,
}
pMap.Lock()
defer pMap.Unlock()
b, i := pMap.mp.Remove(pKey, pEntry)
b, i := pMap.mp.Remove(pKey.String(), pEntry.MarshalDB())
if i != 0 {
// Transient case, there is more than one endpoint that is using the same IP,MAC pair
s, _ := pMap.mp.String(pKey)
log.G(context.TODO()).Warnf("peerDbDelete transient condition - Key:%s cardinality:%d db state:%s", pKey, i, s)
s, _ := pMap.mp.String(pKey.String())
log.G(context.TODO()).Warnf("peerDbDelete transient condition - Key:%s cardinality:%d db state:%s", pKey.String(), i, s)
}
return b, i
}
@@ -184,28 +248,28 @@ func (d *driver) initSandboxPeerDB(nid string) {
}
func (d *driver) peerInitOp(nid string) error {
return d.peerDbNetworkWalk(nid, func(peerIP netip.Addr, peerMac net.HardwareAddr, pEntry *peerEntry) bool {
return d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
// Local entries do not need to be added
if pEntry.isLocal {
return false
}
d.peerAddOp(nid, pEntry.eid, netip.PrefixFrom(peerIP, pEntry.prefixBits), peerMac, pEntry.vtep, false, pEntry.isLocal)
d.peerAddOp(nid, pEntry.eid, pKey.peerIP, pEntry.peerIPMask, pKey.peerMac, pEntry.vtep, false, pEntry.isLocal)
// return false to loop on all entries
return false
})
}
func (d *driver) peerAdd(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, localPeer bool) {
func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, localPeer bool) {
d.peerOpMu.Lock()
defer d.peerOpMu.Unlock()
err := d.peerAddOp(nid, eid, peerIP, peerMac, vtep, true, localPeer)
err := d.peerAddOp(nid, eid, peerIP, peerIPMask, peerMac, vtep, true, localPeer)
if err != nil {
log.G(context.TODO()).WithError(err).Warn("Peer add operation failed")
}
}
func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, updateDB, localPeer bool) error {
func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDB, localPeer bool) error {
if err := validateID(nid, eid); err != nil {
return err
}
@@ -213,7 +277,7 @@ func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.Har
var dbEntries int
var inserted bool
if updateDB {
inserted, dbEntries = d.peerDbAdd(nid, eid, peerIP, peerMac, vtep, localPeer)
inserted, dbEntries = d.peerDbAdd(nid, eid, peerIP, peerIPMask, peerMac, vtep, localPeer)
if !inserted {
log.G(context.TODO()).Warnf("Entry already present in db: nid:%s eid:%s peerIP:%v peerMac:%v isLocal:%t vtep:%v",
nid, eid, peerIP, peerMac, localPeer, vtep)
@@ -238,9 +302,14 @@ func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.Har
return nil
}
s := n.getSubnetforIP(peerIP)
IP := &net.IPNet{
IP: peerIP,
Mask: peerIPMask,
}
s := n.getSubnetforIP(IP)
if s == nil {
return fmt.Errorf("couldn't find the subnet %q in network %q", peerIP.String(), n.id)
return fmt.Errorf("couldn't find the subnet %q in network %q", IP.String(), n.id)
}
if err := n.joinSandbox(s, false); err != nil {
@@ -252,7 +321,7 @@ func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.Har
}
// Add neighbor entry for the peer IP
if err := sbox.AddNeighbor(peerIP.Addr().AsSlice(), peerMac, osl.WithLinkName(s.vxlanName)); err != nil {
if err := sbox.AddNeighbor(peerIP, peerMac, false, osl.WithLinkName(s.vxlanName)); err != nil {
if _, ok := err.(osl.NeighborSearchError); ok && dbEntries > 1 {
// We are in the transient case so only the first configuration is programmed into the kernel
// Upon deletion if the active configuration is deleted the next one from the database will be restored
@@ -263,28 +332,28 @@ func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.Har
}
// Add fdb entry to the bridge for the peer mac
if err := sbox.AddNeighbor(vtep.AsSlice(), peerMac, osl.WithLinkName(s.vxlanName), osl.WithFamily(syscall.AF_BRIDGE)); err != nil {
if err := sbox.AddNeighbor(vtep, peerMac, false, osl.WithLinkName(s.vxlanName), osl.WithFamily(syscall.AF_BRIDGE)); err != nil {
return fmt.Errorf("could not add fdb entry for nid:%s eid:%s into the sandbox:%v", nid, eid, err)
}
return nil
}
func (d *driver) peerDelete(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, localPeer bool) {
func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, localPeer bool) {
d.peerOpMu.Lock()
defer d.peerOpMu.Unlock()
err := d.peerDeleteOp(nid, eid, peerIP, peerMac, vtep, localPeer)
err := d.peerDeleteOp(nid, eid, peerIP, peerIPMask, peerMac, vtep, localPeer)
if err != nil {
log.G(context.TODO()).WithError(err).Warn("Peer delete operation failed")
}
}
func (d *driver) peerDeleteOp(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, localPeer bool) error {
func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, localPeer bool) error {
if err := validateID(nid, eid); err != nil {
return err
}
deleted, dbEntries := d.peerDbDelete(nid, eid, peerIP, peerMac, vtep, localPeer)
deleted, dbEntries := d.peerDbDelete(nid, eid, peerIP, peerIPMask, peerMac, vtep, localPeer)
if !deleted {
log.G(context.TODO()).Warnf("Entry was not in db: nid:%s eid:%s peerIP:%v peerMac:%v isLocal:%t vtep:%v",
nid, eid, peerIP, peerMac, localPeer, vtep)
@@ -306,12 +375,8 @@ func (d *driver) peerDeleteOp(nid, eid string, peerIP netip.Prefix, peerMac net.
// Local peers do not have any local configuration to delete
if !localPeer {
s := n.getSubnetforIP(peerIP)
if s == nil {
return fmt.Errorf("could not find the subnet %q in network %q", peerIP.String(), n.id)
}
// Remove fdb entry to the bridge for the peer mac
if err := sbox.DeleteNeighbor(vtep.AsSlice(), peerMac, osl.WithLinkName(s.vxlanName)); err != nil {
if err := sbox.DeleteNeighbor(vtep, peerMac); err != nil {
if _, ok := err.(osl.NeighborSearchError); ok && dbEntries > 0 {
// We fall in here if there is a transient state and if the neighbor that is being deleted
// was never been configured into the kernel (we allow only 1 configuration at the time per <ip,mac> mapping)
@@ -321,7 +386,7 @@ func (d *driver) peerDeleteOp(nid, eid string, peerIP netip.Prefix, peerMac net.
}
// Delete neighbor entry for the peer IP
if err := sbox.DeleteNeighbor(peerIP.Addr().AsSlice(), peerMac, osl.WithLinkName(s.vxlanName), osl.WithFamily(syscall.AF_BRIDGE)); err != nil {
if err := sbox.DeleteNeighbor(peerIP, peerMac); err != nil {
return fmt.Errorf("could not delete neighbor entry for nid:%s eid:%s into the sandbox:%v", nid, eid, err)
}
}
@@ -333,12 +398,12 @@ func (d *driver) peerDeleteOp(nid, eid string, peerIP netip.Prefix, peerMac net.
// If there is still an entry into the database and the deletion went through without errors means that there is now no
// configuration active in the kernel.
// Restore one configuration for the <ip,mac> directly from the database, note that is guaranteed that there is one
peerIPAddr, peerMac, peerEntry, err := d.peerDbSearch(nid, peerIP.Addr())
peerKey, peerEntry, err := d.peerDbSearch(nid, peerIP)
if err != nil {
log.G(context.TODO()).Errorf("peerDeleteOp unable to restore a configuration for nid:%s ip:%v mac:%v err:%s", nid, peerIP, peerMac, err)
return err
}
return d.peerAddOp(nid, peerEntry.eid, netip.PrefixFrom(peerIPAddr, peerEntry.prefixBits), peerMac, peerEntry.vtep, false, peerEntry.isLocal)
return d.peerAddOp(nid, peerEntry.eid, peerIP, peerEntry.peerIPMask, peerKey.peerMac, peerEntry.vtep, false, peerEntry.isLocal)
}
func (d *driver) peerFlush(nid string) {
@@ -361,7 +426,7 @@ func (d *driver) peerFlushOp(nid string) error {
}
func (d *driver) peerDBUpdateSelf() {
d.peerDbWalk(func(nid string, _ netip.Addr, _ net.HardwareAddr, pEntry *peerEntry) bool {
d.peerDbWalk(func(nid string, pkey *peerKey, pEntry *peerEntry) bool {
if pEntry.isLocal {
pEntry.vtep = d.advertiseAddress
}

View File

@@ -0,0 +1,32 @@
//go:build linux
package overlay
import (
"net"
"testing"
)
func TestPeerMarshal(t *testing.T) {
_, ipNet, _ := net.ParseCIDR("192.168.0.1/24")
p := &peerEntry{
eid: "eid",
isLocal: true,
peerIPMask: ipNet.Mask,
vtep: ipNet.IP,
}
entryDB := p.MarshalDB()
x := entryDB.UnMarshalDB()
if x.eid != p.eid {
t.Fatalf("Incorrect Unmarshalling for eid: %v != %v", x.eid, p.eid)
}
if x.isLocal != p.isLocal {
t.Fatalf("Incorrect Unmarshalling for isLocal: %v != %v", x.isLocal, p.isLocal)
}
if x.peerIPMask.String() != p.peerIPMask.String() {
t.Fatalf("Incorrect Unmarshalling for eid: %v != %v", x.peerIPMask, p.peerIPMask)
}
if x.vtep.String() != p.vtep.String() {
t.Fatalf("Incorrect Unmarshalling for eid: %v != %v", x.vtep, p.vtep)
}
}

View File

@@ -1,52 +0,0 @@
package overlay
// Handy utility types for making unhashable values hashable.
import (
"net"
"net/netip"
)
// macAddr is a hashable encoding of a MAC address.
type macAddr uint64
// macAddrOf converts a net.HardwareAddr to a macAddr.
func macAddrOf(mac net.HardwareAddr) macAddr {
if len(mac) != 6 {
return 0
}
return macAddr(mac[0])<<40 | macAddr(mac[1])<<32 | macAddr(mac[2])<<24 |
macAddr(mac[3])<<16 | macAddr(mac[4])<<8 | macAddr(mac[5])
}
// HardwareAddr converts a macAddr back to a net.HardwareAddr.
func (p macAddr) HardwareAddr() net.HardwareAddr {
mac := [6]byte{
byte(p >> 40), byte(p >> 32), byte(p >> 24),
byte(p >> 16), byte(p >> 8), byte(p),
}
return mac[:]
}
// String returns p.HardwareAddr().String().
func (p macAddr) String() string {
return p.HardwareAddr().String()
}
// ipmac is a hashable tuple of an IP address and a MAC address suitable for use as a map key.
type ipmac struct {
ip netip.Addr
mac macAddr
}
// ipmacOf is a convenience constructor for creating an ipmac from a [net.HardwareAddr].
func ipmacOf(ip netip.Addr, mac net.HardwareAddr) ipmac {
return ipmac{
ip: ip,
mac: macAddrOf(mac),
}
}
func (i ipmac) String() string {
return i.ip.String() + " " + i.mac.String()
}

View File

@@ -1,29 +0,0 @@
package overlay
import (
"net"
"net/netip"
"testing"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
func TestMACAddrOf(t *testing.T) {
want := net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}
assert.DeepEqual(t, macAddrOf(want).HardwareAddr(), want)
}
func TestIPMACOf(t *testing.T) {
assert.Check(t, is.Equal(ipmacOf(netip.Addr{}, nil), ipmac{}))
assert.Check(t, is.Equal(
ipmacOf(
netip.MustParseAddr("11.22.33.44"),
net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06},
),
ipmac{
ip: netip.MustParseAddr("11.22.33.44"),
mac: macAddrOf(net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}),
},
))
}

View File

@@ -13,14 +13,14 @@ import (
// The zero value is an empty set matrix ready to use.
//
// SetMatrix values are safe for concurrent use.
type SetMatrix[K, V comparable] struct {
matrix map[K]mapset.Set[V]
type SetMatrix[T comparable] struct {
matrix map[string]mapset.Set[T]
mu sync.Mutex
}
// Get returns the members of the set for a specific key as a slice.
func (s *SetMatrix[K, V]) Get(key K) ([]V, bool) {
func (s *SetMatrix[T]) Get(key string) ([]T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -31,7 +31,7 @@ func (s *SetMatrix[K, V]) Get(key K) ([]V, bool) {
}
// Contains is used to verify if an element is in a set for a specific key.
func (s *SetMatrix[K, V]) Contains(key K, value V) (containsElement, setExists bool) {
func (s *SetMatrix[T]) Contains(key string, value T) (containsElement, setExists bool) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -43,13 +43,13 @@ func (s *SetMatrix[K, V]) Contains(key K, value V) (containsElement, setExists b
// Insert inserts the value in the set of a key and returns whether the value is
// inserted (was not already in the set) and the number of elements in the set.
func (s *SetMatrix[K, V]) Insert(key K, value V) (inserted bool, cardinality int) {
func (s *SetMatrix[T]) Insert(key string, value T) (inserted bool, cardinality int) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
if !ok {
if s.matrix == nil {
s.matrix = make(map[K]mapset.Set[V])
s.matrix = make(map[string]mapset.Set[T])
}
s.matrix[key] = mapset.NewThreadUnsafeSet(value)
return true, 1
@@ -59,7 +59,7 @@ func (s *SetMatrix[K, V]) Insert(key K, value V) (inserted bool, cardinality int
}
// Remove removes the value in the set for a specific key.
func (s *SetMatrix[K, V]) Remove(key K, value V) (removed bool, cardinality int) {
func (s *SetMatrix[T]) Remove(key string, value T) (removed bool, cardinality int) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -80,7 +80,7 @@ func (s *SetMatrix[K, V]) Remove(key K, value V) (removed bool, cardinality int)
}
// Cardinality returns the number of elements in the set for a key.
func (s *SetMatrix[K, V]) Cardinality(key K) (cardinality int, ok bool) {
func (s *SetMatrix[T]) Cardinality(key string) (cardinality int, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -93,7 +93,7 @@ func (s *SetMatrix[K, V]) Cardinality(key K) (cardinality int, ok bool) {
// String returns the string version of the set.
// The empty string is returned if there is no set for key.
func (s *SetMatrix[K, V]) String(key K) (v string, ok bool) {
func (s *SetMatrix[T]) String(key string) (v string, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -104,10 +104,10 @@ func (s *SetMatrix[K, V]) String(key K) (v string, ok bool) {
}
// Keys returns all the keys in the map.
func (s *SetMatrix[K, V]) Keys() []K {
func (s *SetMatrix[T]) Keys() []string {
s.mu.Lock()
defer s.mu.Unlock()
keys := make([]K, 0, len(s.matrix))
keys := make([]string, 0, len(s.matrix))
for k := range s.matrix {
keys = append(keys, k)
}

View File

@@ -9,7 +9,7 @@ import (
)
func TestSetSerialInsertDelete(t *testing.T) {
var s SetMatrix[string, string]
var s SetMatrix[string]
b, i := s.Insert("a", "1")
if !b || i != 1 {
@@ -135,7 +135,7 @@ func TestSetSerialInsertDelete(t *testing.T) {
}
}
func insertDeleteRotuine(ctx context.Context, endCh chan int, s *SetMatrix[string, string], key, value string) {
func insertDeleteRotuine(ctx context.Context, endCh chan int, s *SetMatrix[string], key, value string) {
for {
select {
case <-ctx.Done():
@@ -158,7 +158,7 @@ func insertDeleteRotuine(ctx context.Context, endCh chan int, s *SetMatrix[strin
}
func TestSetParallelInsertDelete(t *testing.T) {
var s SetMatrix[string, string]
var s SetMatrix[string]
parallelRoutines := 6
endCh := make(chan int)
// Let the routines running and competing for 10s

View File

@@ -457,7 +457,7 @@ func getSvcRecords(t *testing.T, n *Network, key string) (addrs []netip.Addr, fo
sr, ok := n.ctrlr.svcRecords[n.id]
assert.Assert(t, ok)
lookup := func(svcMap *setmatrix.SetMatrix[string, svcMapEntry]) bool {
lookup := func(svcMap *setmatrix.SetMatrix[svcMapEntry]) bool {
mapEntryList, ok := svcMap.Get(key)
if !ok {
return false

View File

@@ -57,9 +57,9 @@ type svcMapEntry struct {
}
type svcInfo struct {
svcMap setmatrix.SetMatrix[string, svcMapEntry]
svcIPv6Map setmatrix.SetMatrix[string, svcMapEntry]
ipMap setmatrix.SetMatrix[string, ipInfo]
svcMap setmatrix.SetMatrix[svcMapEntry]
svcIPv6Map setmatrix.SetMatrix[svcMapEntry]
ipMap setmatrix.SetMatrix[ipInfo]
service map[string][]servicePorts
}
@@ -1370,7 +1370,7 @@ func (n *Network) updateSvcRecord(ctx context.Context, ep *Endpoint, isAdd bool)
}
}
func addIPToName(ipMap *setmatrix.SetMatrix[string, ipInfo], name, serviceID string, ip net.IP) {
func addIPToName(ipMap *setmatrix.SetMatrix[ipInfo], name, serviceID string, ip net.IP) {
reverseIP := netutils.ReverseIP(ip.String())
ipMap.Insert(reverseIP, ipInfo{
name: name,
@@ -1378,7 +1378,7 @@ func addIPToName(ipMap *setmatrix.SetMatrix[string, ipInfo], name, serviceID str
})
}
func delIPToName(ipMap *setmatrix.SetMatrix[string, ipInfo], name, serviceID string, ip net.IP) {
func delIPToName(ipMap *setmatrix.SetMatrix[ipInfo], name, serviceID string, ip net.IP) {
reverseIP := netutils.ReverseIP(ip.String())
ipMap.Remove(reverseIP, ipInfo{
name: name,
@@ -1386,7 +1386,7 @@ func delIPToName(ipMap *setmatrix.SetMatrix[string, ipInfo], name, serviceID str
})
}
func addNameToIP(svcMap *setmatrix.SetMatrix[string, svcMapEntry], name, serviceID string, epIP net.IP) {
func addNameToIP(svcMap *setmatrix.SetMatrix[svcMapEntry], name, serviceID string, epIP net.IP) {
// Since DNS name resolution is case-insensitive, Use the lower-case form
// of the name as the key into svcMap
lowerCaseName := strings.ToLower(name)
@@ -1396,7 +1396,7 @@ func addNameToIP(svcMap *setmatrix.SetMatrix[string, svcMapEntry], name, service
})
}
func delNameToIP(svcMap *setmatrix.SetMatrix[string, svcMapEntry], name, serviceID string, epIP net.IP) {
func delNameToIP(svcMap *setmatrix.SetMatrix[svcMapEntry], name, serviceID string, epIP net.IP) {
lowerCaseName := strings.ToLower(name)
svcMap.Remove(lowerCaseName, svcMapEntry{
ip: epIP.String(),

View File

@@ -148,13 +148,8 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
// Update our local clock if the received messages has newer time.
nDB.tableClock.Witness(tEvent.LTime)
nDB.Lock()
// Hold the lock until after we broadcast the event to watchers so that
// the new watch receives either the synthesized event or the event we
// broadcast, never both.
defer nDB.Unlock()
// Ignore the table events for networks that are in the process of going away
nDB.RLock()
networks := nDB.networks[nDB.config.NodeID]
network, ok := networks[tEvent.NetworkID]
// Check if the owner of the event is still part of the network
@@ -166,24 +161,33 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
break
}
}
nDB.RUnlock()
if !ok || network.leaving || !nodePresent {
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
return false
}
var entryPresent bool
prev, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
nDB.Lock()
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
if err == nil {
entryPresent = true
// We have the latest state. Ignore the event
// since it is stale.
if prev.ltime >= tEvent.LTime {
if e.ltime >= tEvent.LTime {
nDB.Unlock()
return false
}
} else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
nDB.Unlock()
// We don't know the entry, the entry is being deleted and the message is an async message
// In this case the safest approach is to ignore it, it is possible that the queue grew so much to
// exceed the garbage collection time (the residual reap time that is in the message is not being
// updated, to avoid inserting too many messages in the queue).
// Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
return false
}
e := &entry{
e = &entry{
ltime: tEvent.LTime,
node: tEvent.NodeName,
value: tEvent.Value,
@@ -200,55 +204,35 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
e.reapTime = nDB.config.reapEntryInterval
}
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
nDB.Unlock()
if !entryPresent && tEvent.Type == TableEventTypeDelete {
// We will rebroadcast the message for an unknown entry if all the conditions are met:
// 1) the message was received from a bulk sync
// 2) we had already synced this network (during the network join)
// 3) the residual reapTime is higher than 1/6 of the total reapTime.
//
// If the residual reapTime is lower or equal to 1/6 of the total reapTime
// don't bother broadcasting it around as most likely the cluster is already aware of it.
// This also reduces the possibility that deletion of entries close to their garbage collection
// ends up circling around forever.
//
// The safest approach is to not rebroadcast async messages for unknown entries.
// It is possible that the queue grew so much to exceed the garbage collection time
// (the residual reap time that is in the message is not being updated, to avoid
// inserting too many messages in the queue).
if err != nil && tEvent.Type == TableEventTypeDelete {
// Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
// We had saved the state so to speed up convergence and be able to avoid accepting create events.
// Now we will rebroadcast the message if 2 conditions are met:
// 1) we had already synced this network (during the network join)
// 2) the residual reapTime is higher than 1/6 of the total reapTime.
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it
// This also reduce the possibility that deletion of entries close to their garbage collection ends up circling around
// forever
// log.G(ctx).Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
return isBulkSync && network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
}
var op opType
value := tEvent.Value
switch tEvent.Type {
case TableEventTypeCreate, TableEventTypeUpdate:
// Gossip messages could arrive out-of-order so it is possible
// for an entry's UPDATE event to be received before its CREATE
// event. The local watchers should not need to care about such
// nuances. Broadcast events to watchers based only on what
// changed in the local NetworkDB state.
case TableEventTypeCreate:
op = opCreate
if entryPresent && !prev.deleting {
op = opUpdate
}
case TableEventTypeUpdate:
op = opUpdate
case TableEventTypeDelete:
if !entryPresent || prev.deleting {
goto SkipBroadcast
}
op = opDelete
// Broadcast the value most recently observed by watchers,
// which may be different from the value in the DELETE event
// (e.g. if the DELETE event was received out-of-order).
value = prev.value
default:
// TODO(thaJeztah): make switch exhaustive; add networkdb.TableEventTypeInvalid
}
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, value))
SkipBroadcast:
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
return network.inSync
}
@@ -424,12 +408,7 @@ func (d *delegate) NotifyMsg(buf []byte) {
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
for _, m := range msgs {
limit -= overhead + len(m)
}
if limit > 0 {
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
}
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
return msgs
}

View File

@@ -252,27 +252,14 @@ func DefaultConfig() *Config {
// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
nDB := new(c)
log.G(context.TODO()).Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
if err := nDB.clusterInit(); err != nil {
return nil, err
}
return nDB, nil
}
func new(c *Config) *NetworkDB {
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod
return &NetworkDB{
config: c,
indexes: map[int]*iradix.Tree[*entry]{
byTable: iradix.New[*entry](),
byNetwork: iradix.New[*entry](),
},
nDB := &NetworkDB{
config: c,
indexes: make(map[int]*iradix.Tree[*entry]),
networks: make(map[string]map[string]*network),
nodes: make(map[string]*node),
failedNodes: make(map[string]*node),
@@ -281,6 +268,16 @@ func new(c *Config) *NetworkDB {
bulkSyncAckTbl: make(map[string]chan struct{}),
broadcaster: events.NewBroadcaster(),
}
nDB.indexes[byTable] = iradix.New[*entry]()
nDB.indexes[byNetwork] = iradix.New[*entry]()
log.G(context.TODO()).Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
if err := nDB.clusterInit(); err != nil {
return nil, err
}
return nDB, nil
}
// Join joins this NetworkDB instance with a list of peer NetworkDB
@@ -432,11 +429,8 @@ type TableElem struct {
// GetTableByNetwork walks the networkdb by the give table and network id and
// returns a map of keys and values
func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem {
nDB.RLock()
root := nDB.indexes[byTable].Root()
nDB.RUnlock()
entries := make(map[string]*TableElem)
root.WalkPrefix([]byte(fmt.Sprintf("/%s/%s", tname, nid)), func(k []byte, v *entry) bool {
nDB.indexes[byTable].Root().WalkPrefix([]byte(fmt.Sprintf("/%s/%s", tname, nid)), func(k []byte, v *entry) bool {
if v.deleting {
return false
}
@@ -591,14 +585,21 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
// value. The walk stops if the passed function returns a true.
func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
nDB.RLock()
root := nDB.indexes[byTable].Root()
values := make(map[string]*entry)
nDB.indexes[byTable].Root().WalkPrefix([]byte("/"+tname), func(path []byte, v *entry) bool {
values[string(path)] = v
return false
})
nDB.RUnlock()
root.WalkPrefix([]byte("/"+tname), func(path []byte, v *entry) bool {
params := strings.Split(string(path[1:]), "/")
for k, v := range values {
params := strings.Split(k[1:], "/")
nid := params[1]
key := params[2]
return fn(nid, key, v.value, v.deleting)
})
if fn(nid, key, v.value, v.deleting) {
return nil
}
}
return nil
}

View File

@@ -430,22 +430,6 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) {
dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
// Shake out any data races.
done := make(chan struct{})
defer close(done)
for _, db := range dbs {
go func(db *NetworkDB) {
for {
select {
case <-done:
return
default:
}
_ = db.GetTableByNetwork("test_table", "network1")
}
}(db)
}
for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
if i == j {

View File

@@ -2,7 +2,6 @@ package networkdb
import (
"net"
"strings"
"github.com/docker/go-events"
)
@@ -43,8 +42,7 @@ type DeleteEvent event
// network or any combination of the tuple. If any of the
// filter is an empty string it acts as a wildcard for that
// field. Watch returns a channel of events, where the events will be
// sent. The watch channel is initialized with synthetic create events for all
// the existing table entries not owned by this node which match the filters.
// sent.
func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) {
var matcher events.Matcher
@@ -79,45 +77,6 @@ func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) {
sink = events.NewFilter(sink, matcher)
}
// Synthesize events for all the existing table entries not owned by
// this node so that the watcher receives all state without racing with
// any concurrent mutations to the table.
nDB.RLock()
defer nDB.RUnlock()
if tname == "" {
var prefix []byte
if nid != "" {
prefix = []byte("/" + nid + "/")
} else {
prefix = []byte("/")
}
nDB.indexes[byNetwork].Root().WalkPrefix(prefix, func(path []byte, v *entry) bool {
if !v.deleting && v.node != nDB.config.NodeID {
tuple := strings.SplitN(string(path[1:]), "/", 3)
if len(tuple) == 3 {
entryNid, entryTname, key := tuple[0], tuple[1], tuple[2]
sink.Write(makeEvent(opCreate, entryTname, entryNid, key, v.value))
}
}
return false
})
} else {
prefix := []byte("/" + tname + "/")
if nid != "" {
prefix = append(prefix, []byte(nid+"/")...)
}
nDB.indexes[byTable].Root().WalkPrefix(prefix, func(path []byte, v *entry) bool {
if !v.deleting && v.node != nDB.config.NodeID {
tuple := strings.SplitN(string(path[1:]), "/", 3)
if len(tuple) == 3 {
entryTname, entryNid, key := tuple[0], tuple[1], tuple[2]
sink.Write(makeEvent(opCreate, entryTname, entryNid, key, v.value))
}
}
return false
})
}
nDB.broadcaster.Add(sink)
return ch, func() {
nDB.broadcaster.Remove(sink)

View File

@@ -1,273 +0,0 @@
package networkdb
import (
"net"
"testing"
"time"
"github.com/docker/go-events"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
func TestWatch_out_of_order(t *testing.T) {
nDB := new(DefaultConfig())
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
assert.Assert(t, nDB.JoinNetwork("network1"))
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
Name: "node1",
Addr: net.IPv4(1, 2, 3, 4),
})
d := &delegate{nDB}
msgs := messageBuffer{t: t}
appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1")
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: 1,
NodeName: "node1",
NetworkID: "network1",
})
appendTableEvent(1, TableEventTypeCreate, "tombstone1", []byte("a"))
appendTableEvent(2, TableEventTypeDelete, "tombstone1", []byte("b"))
appendTableEvent(3, TableEventTypeCreate, "key1", []byte("value1"))
d.NotifyMsg(msgs.Compound())
msgs.Reset()
nDB.CreateEntry("table1", "network1", "local1", []byte("should not see me in watch events"))
watch, cancel := nDB.Watch("table1", "network1")
defer cancel()
got := drainChannel(watch.C)
assert.Check(t, is.DeepEqual(got, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key1", Value: []byte("value1")}),
}))
// Receive events from node1, with events not received or received out of order
// Create, (hidden update), delete
appendTableEvent(4, TableEventTypeCreate, "key2", []byte("a"))
appendTableEvent(6, TableEventTypeDelete, "key2", []byte("b"))
// (Hidden recreate), delete
appendTableEvent(8, TableEventTypeDelete, "key2", []byte("c"))
// (Hidden recreate), update
appendTableEvent(10, TableEventTypeUpdate, "key2", []byte("d"))
// Update, create
appendTableEvent(11, TableEventTypeUpdate, "key3", []byte("b"))
appendTableEvent(10, TableEventTypeCreate, "key3", []byte("a"))
// (Hidden create), update, update
appendTableEvent(13, TableEventTypeUpdate, "key4", []byte("b"))
appendTableEvent(14, TableEventTypeUpdate, "key4", []byte("c"))
// Delete, create
appendTableEvent(16, TableEventTypeDelete, "key5", []byte("a"))
appendTableEvent(15, TableEventTypeCreate, "key5", []byte("a"))
// (Hidden recreate), delete
appendTableEvent(18, TableEventTypeDelete, "key5", []byte("b"))
d.NotifyMsg(msgs.Compound())
msgs.Reset()
got = drainChannel(watch.C)
assert.Check(t, is.DeepEqual(got, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")}),
// Delete value should match last observed value,
// irrespective of the content of the delete event over the wire.
DeleteEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")}),
// Updates to previously-deleted keys should be observed as creates.
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("d")}),
// Out-of-order update events should be observed as creates.
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key3", Value: []byte("b")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key4", Value: []byte("b")}),
UpdateEvent(event{Table: "table1", NetworkID: "network1", Key: "key4", Value: []byte("c")}),
// key5 should not appear in the events.
}))
}
func TestWatch_filters(t *testing.T) {
nDB := new(DefaultConfig())
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
assert.Assert(t, nDB.JoinNetwork("network1"))
assert.Assert(t, nDB.JoinNetwork("network2"))
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
Name: "node1",
Addr: net.IPv4(1, 2, 3, 4),
})
var ltime serf.LamportClock
msgs := messageBuffer{t: t}
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: "network1",
})
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: "network2",
})
for _, nid := range []string{"network1", "network2"} {
for _, tname := range []string{"table1", "table2"} {
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeCreate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".dead",
Value: []byte("deaddead"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeDelete,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".dead",
Value: []byte("deaddead"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeCreate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".update",
Value: []byte("initial"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeCreate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname,
Value: []byte("a"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeUpdate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".update",
Value: []byte("updated"),
})
}
}
(&delegate{nDB}).NotifyMsg(msgs.Compound())
watchAll, cancel := nDB.Watch("", "")
defer cancel()
watchNetwork1Tables, cancel := nDB.Watch("", "network1")
defer cancel()
watchTable1AllNetworks, cancel := nDB.Watch("table1", "")
defer cancel()
watchTable1Network1, cancel := nDB.Watch("table1", "network1")
defer cancel()
var gotAll, gotNetwork1Tables, gotTable1AllNetworks, gotTable1Network1 []events.Event
L:
for {
select {
case ev := <-watchAll.C:
gotAll = append(gotAll, ev)
case ev := <-watchNetwork1Tables.C:
gotNetwork1Tables = append(gotNetwork1Tables, ev)
case ev := <-watchTable1AllNetworks.C:
gotTable1AllNetworks = append(gotTable1AllNetworks, ev)
case ev := <-watchTable1Network1.C:
gotTable1Network1 = append(gotTable1Network1, ev)
case <-time.After(time.Second):
break L
}
}
assert.Check(t, is.DeepEqual(gotAll, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")}),
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table2", NetworkID: "network2", Key: "network2.table2", Value: []byte("a")}),
CreateEvent(event{Table: "table2", NetworkID: "network2", Key: "network2.table2.update", Value: []byte("updated")}),
}))
assert.Check(t, is.DeepEqual(gotNetwork1Tables, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")}),
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")}),
}))
assert.Check(t, is.DeepEqual(gotTable1AllNetworks, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")}),
}))
assert.Check(t, is.DeepEqual(gotTable1Network1, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
}))
}
func drainChannel(ch <-chan events.Event) []events.Event {
var events []events.Event
for {
select {
case ev := <-ch:
events = append(events, ev)
case <-time.After(time.Second):
return events
}
}
}
type messageBuffer struct {
t *testing.T
msgs [][]byte
}
func (mb *messageBuffer) Append(typ MessageType, msg any) {
mb.t.Helper()
buf, err := encodeMessage(typ, msg)
if err != nil {
mb.t.Fatalf("failed to encode message: %v", err)
}
mb.msgs = append(mb.msgs, buf)
}
func (mb *messageBuffer) Compound() []byte {
return makeCompoundMessage(mb.msgs)
}
func (mb *messageBuffer) Reset() {
mb.msgs = nil
}
func tableEventHelper(mb *messageBuffer, nodeName, networkID, tableName string) func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
return func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
mb.t.Helper()
mb.Append(MessageTypeTableEvent, &TableEvent{
Type: typ,
LTime: ltime,
NodeName: nodeName,
NetworkID: networkID,
TableName: tableName,
Key: key,
Value: value,
})
}
}

View File

@@ -804,20 +804,24 @@ func (n *Namespace) prepAdvertiseAddrs(ctx context.Context, i *Interface, ifInde
// original name and moving it out of the sandbox.
func (n *Namespace) RemoveInterface(i *Interface) error {
close(i.stopCh)
n.mu.Lock()
isDefault := n.isDefault
nlh := n.nlHandle
n.mu.Unlock()
// Find the network interface identified by the DstName attribute.
iface, err := n.nlHandle.LinkByName(i.DstName())
iface, err := nlh.LinkByName(i.DstName())
if err != nil {
return err
}
// Down the interface before configuring
if err := n.nlHandle.LinkSetDown(iface); err != nil {
if err := nlh.LinkSetDown(iface); err != nil {
return err
}
// TODO(aker): Why are we doing this? This would fail if the initial interface set up failed before the "dest interface" was moved into its own namespace; see https://github.com/moby/moby/pull/46315/commits/108595c2fe852a5264b78e96f9e63cda284990a6#r1331253578
err = n.nlHandle.LinkSetName(iface, i.SrcName())
err = nlh.LinkSetName(iface, i.SrcName())
if err != nil {
log.G(context.TODO()).Debugf("LinkSetName failed for interface %s: %v", i.SrcName(), err)
return err
@@ -825,13 +829,13 @@ func (n *Namespace) RemoveInterface(i *Interface) error {
// if it is a bridge just delete it.
if i.Bridge() {
if err := n.nlHandle.LinkDel(iface); err != nil {
if err := nlh.LinkDel(iface); err != nil {
return fmt.Errorf("failed deleting bridge %q: %v", i.SrcName(), err)
}
} else if !n.isDefault {
} else if !isDefault {
// Move the network interface to caller namespace.
// TODO(aker): What's this really doing? There are no calls to LinkDel in this package: is this code really used? (Interface.Remove() has 3 callers); see https://github.com/moby/moby/pull/46315/commits/108595c2fe852a5264b78e96f9e63cda284990a6#r1331265335
if err := n.nlHandle.LinkSetNsFd(iface, ns.ParseHandlerInt()); err != nil {
if err := nlh.LinkSetNsFd(iface, ns.ParseHandlerInt()); err != nil {
log.G(context.TODO()).Debugf("LinkSetNsFd failed for interface %s: %v", i.SrcName(), err)
return err
}

View File

@@ -232,6 +232,7 @@ type Namespace struct {
defRoute4SrcName string
defRoute6SrcName string
staticRoutes []*types.StaticRoute
neighbors []*neigh
isDefault bool // isDefault is true when Namespace represents the host network namespace. It is safe to access it concurrently.
ipv6LoEnabledOnce sync.Once
ipv6LoEnabledCached bool

View File

@@ -1,12 +1,12 @@
package osl
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"os"
"strings"
"github.com/containerd/log"
"github.com/vishvananda/netlink"
@@ -14,115 +14,145 @@ import (
// NeighborSearchError indicates that the neighbor is already present
type NeighborSearchError struct {
ip net.IP
mac net.HardwareAddr
linkName string
present bool
ip net.IP
mac net.HardwareAddr
present bool
}
func (n NeighborSearchError) Error() string {
var b strings.Builder
b.WriteString("neighbor entry ")
if n.present {
b.WriteString("already exists ")
} else {
b.WriteString("not found ")
}
b.WriteString("for IP ")
b.WriteString(n.ip.String())
b.WriteString(", mac ")
b.WriteString(n.mac.String())
if n.linkName != "" {
b.WriteString(", link ")
b.WriteString(n.linkName)
}
return b.String()
return fmt.Sprintf("Search neighbor failed for IP %v, mac %v, present in db:%t", n.ip, n.mac, n.present)
}
// DeleteNeighbor deletes a neighbor entry from the sandbox.
//
// To delete an entry inserted by [AddNeighbor] the caller must provide the same
// parameters used to add it.
func (n *Namespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, options ...NeighOption) error {
nlnh, linkName, err := n.nlNeigh(dstIP, dstMac, options...)
if err != nil {
return err
type neigh struct {
dstIP net.IP
dstMac net.HardwareAddr
linkName string
linkDst string
family int
}
func (n *Namespace) findNeighbor(dstIP net.IP, dstMac net.HardwareAddr) *neigh {
n.mu.Lock()
defer n.mu.Unlock()
for _, nh := range n.neighbors {
if nh.dstIP.Equal(dstIP) && bytes.Equal(nh.dstMac, dstMac) {
return nh
}
}
if err := n.nlHandle.NeighDel(nlnh); err != nil {
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
"error": err,
}).Warn("error deleting neighbor entry")
if errors.Is(err, os.ErrNotExist) {
return NeighborSearchError{dstIP, dstMac, linkName, false}
return nil
}
// DeleteNeighbor deletes neighbor entry from the sandbox.
func (n *Namespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr) error {
nh := n.findNeighbor(dstIP, dstMac)
if nh == nil {
return NeighborSearchError{dstIP, dstMac, false}
}
n.mu.Lock()
nlh := n.nlHandle
n.mu.Unlock()
var linkIndex int
if nh.linkDst != "" {
iface, err := nlh.LinkByName(nh.linkDst)
if err != nil {
return fmt.Errorf("could not find interface with destination name %s: %v", nh.linkDst, err)
}
return fmt.Errorf("could not delete neighbor %+v: %w", nlnh, err)
linkIndex = iface.Attrs().Index
}
nlnh := &netlink.Neigh{
LinkIndex: linkIndex,
IP: dstIP,
State: netlink.NUD_PERMANENT,
Family: nh.family,
}
if nh.family > 0 {
nlnh.HardwareAddr = dstMac
nlnh.Flags = netlink.NTF_SELF
}
// If the kernel deletion fails for the neighbor entry still remove it
// from the namespace cache, otherwise kernel update can fail if the
// neighbor moves back to the same host again.
if err := nlh.NeighDel(nlnh); err != nil && !errors.Is(err, os.ErrNotExist) {
log.G(context.TODO()).Warnf("Deleting neighbor IP %s, mac %s failed, %v", dstIP, dstMac, err)
}
// Delete the dynamic entry in the bridge
if nlnh.Family > 0 {
nlnh.Flags = netlink.NTF_MASTER
if err := n.nlHandle.NeighDel(nlnh); err != nil && !errors.Is(err, os.ErrNotExist) {
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
"error": err,
}).Warn("error deleting dynamic neighbor entry")
if nh.family > 0 {
if err := nlh.NeighDel(&netlink.Neigh{
LinkIndex: linkIndex,
IP: dstIP,
Family: nh.family,
HardwareAddr: dstMac,
Flags: netlink.NTF_MASTER,
}); err != nil && !errors.Is(err, os.ErrNotExist) {
log.G(context.TODO()).WithError(err).Warn("error while deleting neighbor entry")
}
}
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
}).Debug("Neighbor entry deleted")
n.mu.Lock()
for i, neighbor := range n.neighbors {
if neighbor.dstIP.Equal(dstIP) && bytes.Equal(neighbor.dstMac, dstMac) {
n.neighbors = append(n.neighbors[:i], n.neighbors[i+1:]...)
break
}
}
n.mu.Unlock()
log.G(context.TODO()).Debugf("Neighbor entry deleted for IP %v, mac %v", dstIP, dstMac)
return nil
}
// AddNeighbor adds a neighbor entry into the sandbox.
func (n *Namespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, options ...NeighOption) error {
nlnh, linkName, err := n.nlNeigh(dstIP, dstMac, options...)
if err != nil {
return err
}
func (n *Namespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, force bool, options ...NeighOption) error {
var (
iface netlink.Link
err error
neighborAlreadyPresent bool
)
if err := n.nlHandle.NeighAdd(nlnh); err != nil {
if errors.Is(err, os.ErrExist) {
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
"neigh": fmt.Sprintf("%+v", nlnh),
}).Warn("Neighbor entry already present")
return NeighborSearchError{dstIP, dstMac, linkName, true}
} else {
return fmt.Errorf("could not add neighbor entry %+v: %w", nlnh, err)
// If the namespace already has the neighbor entry but the AddNeighbor is called
// because of a miss notification (force flag) program the kernel anyway.
nh := n.findNeighbor(dstIP, dstMac)
if nh != nil {
neighborAlreadyPresent = true
log.G(context.TODO()).Warnf("Neighbor entry already present for IP %v, mac %v neighbor:%+v forceUpdate:%t", dstIP, dstMac, nh, force)
if !force {
return NeighborSearchError{dstIP, dstMac, true}
}
}
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
}).Debug("Neighbor entry added")
nh = &neigh{
dstIP: dstIP,
dstMac: dstMac,
}
return nil
}
type neigh struct {
linkName string
family int
}
func (n *Namespace) nlNeigh(dstIP net.IP, dstMac net.HardwareAddr, options ...NeighOption) (*netlink.Neigh, string, error) {
var nh neigh
nh.processNeighOptions(options...)
if nh.linkName != "" {
nh.linkDst = n.findDst(nh.linkName, false)
if nh.linkDst == "" {
return fmt.Errorf("could not find the interface with name %s", nh.linkName)
}
}
n.mu.Lock()
nlh := n.nlHandle
n.mu.Unlock()
if nh.linkDst != "" {
iface, err = nlh.LinkByName(nh.linkDst)
if err != nil {
return fmt.Errorf("could not find interface with destination name %s: %v", nh.linkDst, err)
}
}
nlnh := &netlink.Neigh{
IP: dstIP,
HardwareAddr: dstMac,
@@ -134,17 +164,22 @@ func (n *Namespace) nlNeigh(dstIP net.IP, dstMac net.HardwareAddr, options ...Ne
nlnh.Flags = netlink.NTF_SELF
}
if nh.linkName != "" {
linkDst := n.findDst(nh.linkName, false)
if linkDst == "" {
return nil, nh.linkName, fmt.Errorf("could not find the interface with name %s", nh.linkName)
}
iface, err := n.nlHandle.LinkByName(linkDst)
if err != nil {
return nil, nh.linkName, fmt.Errorf("could not find interface with destination name %s: %w", linkDst, err)
}
if nh.linkDst != "" {
nlnh.LinkIndex = iface.Attrs().Index
}
return nlnh, nh.linkName, nil
if err := nlh.NeighSet(nlnh); err != nil {
return fmt.Errorf("could not add neighbor entry:%+v error:%v", nlnh, err)
}
if neighborAlreadyPresent {
return nil
}
n.mu.Lock()
n.neighbors = append(n.neighbors, nh)
n.mu.Unlock()
log.G(context.TODO()).Debugf("Neighbor entry added for IP:%v, mac:%v on ifc:%s", dstIP, dstMac, nh.linkName)
return nil
}

View File

@@ -57,7 +57,7 @@ type service struct {
// associated with it. At stable state the endpoint ID expected is 1
// but during transition and service change it is possible to have
// temporary more than 1
ipToEndpoint setmatrix.SetMatrix[string, string]
ipToEndpoint setmatrix.SetMatrix[string]
deleted bool