Merge pull request #49736 from akerouanton/cache-endpoint-in-memory

libnet: Controller: cache networks & endpoints in-memory
This commit is contained in:
Paweł Gronowski
2025-04-07 10:00:36 +00:00
committed by GitHub
11 changed files with 334 additions and 300 deletions

View File

@@ -0,0 +1,14 @@
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
//go:build go1.22
package maputil
func FilterValues[K comparable, V any](in map[K]V, fn func(V) bool) []V {
var out []V
for _, v := range in {
if fn(v) {
out = append(out, v)
}
}
return out
}

View File

@@ -100,6 +100,40 @@ type Controller struct {
diagnosticServer *diagnostic.Server
mu sync.Mutex
// networks is an in-memory cache of Network. Do not use this map unless
// you're sure your code is thread-safe.
//
// The data persistence layer is instantiating new Network objects every
// time it loads an object from its store or in-memory cache. This leads to
// multiple instances representing the same network to concurrently live in
// memory. As such, the Network mutex might be ineffective and not
// correctly protect against data races.
//
// If you want to use this map for new or existing code, you need to make
// sure: 1. the Network object is correctly locked; 2. the lock order
// between Sandbox, Network and Endpoint is the same as the rest of the
// code (in order to avoid deadlocks).
networks map[string]*Network
// networksMu protects the networks map.
networksMu sync.Mutex
// endpoints is an in-memory cache of Endpoint. Do not use this map unless
// you're sure your code is thread-safe.
//
// The data persistence layer is instantiating new Endpoint objects every
// time it loads an object from its store or in-memory cache. This leads to
// multiple instances representing the same endpoint to concurrently live
// in memory. As such, the Endpoint mutex might be ineffective and not
// correctly protect against data races.
//
// If you want to use this map for new or existing code, you need to make
// sure: 1. the Endpoint object is correctly locked; 2. the lock order
// between Sandbox, Network and Endpoint is the same as the rest of the
// code (in order to avoid deadlocks).
endpoints map[string]*Endpoint
// endpointsMu protects the endpoints map.
endpointsMu sync.Mutex
// FIXME(thaJeztah): defOsSbox is always nil on non-Linux: move these fields to Linux-only files.
defOsSboxOnce sync.Once
defOsSbox *osl.Namespace
@@ -118,6 +152,8 @@ func New(cfgOptions ...config.Option) (*Controller, error) {
cfg: cfg,
store: store,
sandboxes: map[string]*Sandbox{},
networks: map[string]*Network{},
endpoints: map[string]*Endpoint{},
svcRecords: make(map[string]*svcInfo),
serviceBindings: make(map[serviceKey]*service),
agentInitDone: make(chan struct{}),
@@ -518,8 +554,6 @@ func (c *Controller) NewNetwork(networkType, name string, id string, options ...
var (
caps driverapi.Capability
err error
skipCfgEpCount bool
)
// Reset network types, force local scope and skip allocation and
@@ -554,13 +588,6 @@ func (c *Controller) NewNetwork(networkType, name string, id string, options ...
if err := configNetwork.applyConfigurationTo(nw); err != nil {
return nil, types.InternalErrorf("Failed to apply configuration: %v", err)
}
defer func() {
if retErr == nil && !skipCfgEpCount {
if err := configNetwork.getEpCnt().IncEndpointCnt(); err != nil {
log.G(context.TODO()).Warnf("Failed to update reference count for configuration network %q on creation of network %q: %v", configNetwork.Name(), nw.name, err)
}
}
}()
}
// At this point the network scope is still unknown if not set by user
@@ -626,11 +653,7 @@ func (c *Controller) NewNetwork(networkType, name string, id string, options ...
//
// To cut a long story short: if this broke anything, you know who to blame :)
if err := c.addNetwork(nw); err != nil {
if _, ok := err.(types.MaskableError); ok { //nolint:gosimple
// This error can be ignored and set this boolean
// value to skip a refcount increment for configOnly networks
skipCfgEpCount = true
} else {
if _, ok := err.(types.MaskableError); !ok { //nolint:gosimple
return nil, err
}
}
@@ -658,28 +681,12 @@ func (c *Controller) NewNetwork(networkType, name string, id string, options ...
}
addToStore:
// First store the endpoint count, then the network. To avoid to
// end up with a datastore containing a network and not an epCnt,
// in case of an ungraceful shutdown during this function call.
epCnt := &endpointCnt{n: nw}
if err := c.updateToStore(context.TODO(), epCnt); err != nil {
if err := c.storeNetwork(context.TODO(), nw); err != nil {
return nil, err
}
defer func() {
if retErr != nil {
if err := c.deleteFromStore(epCnt); err != nil {
log.G(context.TODO()).Warnf("could not rollback from store, epCnt %v on failure (%v): %v", epCnt, retErr, err)
}
}
}()
nw.epCnt = epCnt
if err := c.updateToStore(context.TODO(), nw); err != nil {
return nil, err
}
defer func() {
if retErr != nil {
if err := c.deleteFromStore(nw); err != nil {
if err := c.deleteStoredNetwork(nw); err != nil {
log.G(context.TODO()).Warnf("could not rollback from store, network %v on failure (%v): %v", nw, retErr, err)
}
}

View File

@@ -582,7 +582,7 @@ func (ep *Endpoint) sbJoin(ctx context.Context, sb *Sandbox, options ...Endpoint
return errdefs.System(err)
}
if err := n.getController().updateToStore(ctx, ep); err != nil {
if err := n.getController().storeEndpoint(ctx, ep); err != nil {
return err
}
@@ -762,7 +762,7 @@ func (ep *Endpoint) rename(name string) error {
ep.mu.Unlock()
// Update the store with the updated name
if err := ep.getNetwork().getController().updateToStore(context.TODO(), ep); err != nil {
if err := ep.getNetwork().getController().storeEndpoint(context.TODO(), ep); err != nil {
return err
}
@@ -798,7 +798,7 @@ func (ep *Endpoint) UpdateDNSNames(dnsNames []string) error {
}
// Update the store with the updated name
if err := c.updateToStore(context.TODO(), ep); err != nil {
if err := c.storeEndpoint(context.TODO(), ep); err != nil {
return err
}
@@ -904,7 +904,7 @@ func (ep *Endpoint) sbLeave(ctx context.Context, sb *Sandbox, force bool) error
// spurious logs when cleaning up the sandbox when the daemon
// ungracefully exits and restarts before completing sandbox
// detach but after store has been updated.
if err := n.getController().updateToStore(ctx, ep); err != nil {
if err := n.getController().storeEndpoint(ctx, ep); err != nil {
return err
}
@@ -1024,14 +1024,14 @@ func (ep *Endpoint) Delete(ctx context.Context, force bool) error {
}
}
if err = n.getController().deleteFromStore(ep); err != nil {
if err = n.getController().deleteStoredEndpoint(ep); err != nil {
return err
}
defer func() {
if err != nil && !force {
ep.dbExists = false
if e := n.getController().updateToStore(context.WithoutCancel(ctx), ep); e != nil {
if e := n.getController().storeEndpoint(context.WithoutCancel(ctx), ep); e != nil {
log.G(ctx).Warnf("failed to recreate endpoint in store %s : %v", name, e)
}
}
@@ -1047,10 +1047,6 @@ func (ep *Endpoint) Delete(ctx context.Context, force bool) error {
ep.releaseAddress()
if err := n.getEpCnt().DecEndpointCnt(); err != nil {
log.G(ctx).Warnf("failed to decrement endpoint count for ep %s: %v", ep.ID(), err)
}
return nil
}
@@ -1399,20 +1395,6 @@ func (c *Controller) cleanupLocalEndpoints() error {
log.G(context.TODO()).Warnf("Could not delete local endpoint %s during endpoint cleanup: %v", ep.name, err)
}
}
epl, err = n.getEndpointsFromStore()
if err != nil {
log.G(context.TODO()).Warnf("Could not get list of endpoints in network %s for count update: %v", n.name, err)
continue
}
epCnt := n.getEpCnt().EndpointCnt()
if epCnt != uint64(len(epl)) {
log.G(context.TODO()).Infof("Fixing inconsistent endpoint_cnt for network %s. Expected=%d, Actual=%d", n.name, len(epl), epCnt)
if err := n.getEpCnt().setCnt(uint64(len(epl))); err != nil {
log.G(context.TODO()).WithField("network", n.name).WithError(err).Warn("Error while fixing inconsistent endpoint_cnt for network")
}
}
}
return nil

View File

@@ -1,173 +0,0 @@
package libnetwork
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/docker/docker/libnetwork/datastore"
)
type endpointCnt struct {
n *Network
Count uint64
dbIndex uint64
dbExists bool
sync.Mutex
}
const epCntKeyPrefix = "endpoint_count"
func (ec *endpointCnt) Key() []string {
ec.Lock()
defer ec.Unlock()
return []string{epCntKeyPrefix, ec.n.id}
}
func (ec *endpointCnt) KeyPrefix() []string {
ec.Lock()
defer ec.Unlock()
return []string{epCntKeyPrefix, ec.n.id}
}
func (ec *endpointCnt) Value() []byte {
ec.Lock()
defer ec.Unlock()
b, err := json.Marshal(ec)
if err != nil {
return nil
}
return b
}
func (ec *endpointCnt) SetValue(value []byte) error {
ec.Lock()
defer ec.Unlock()
return json.Unmarshal(value, &ec)
}
func (ec *endpointCnt) Index() uint64 {
ec.Lock()
defer ec.Unlock()
return ec.dbIndex
}
func (ec *endpointCnt) SetIndex(index uint64) {
ec.Lock()
ec.dbIndex = index
ec.dbExists = true
ec.Unlock()
}
func (ec *endpointCnt) Exists() bool {
ec.Lock()
defer ec.Unlock()
return ec.dbExists
}
func (ec *endpointCnt) Skip() bool {
ec.Lock()
defer ec.Unlock()
return !ec.n.persist
}
func (ec *endpointCnt) New() datastore.KVObject {
ec.Lock()
defer ec.Unlock()
return &endpointCnt{
n: ec.n,
}
}
func (ec *endpointCnt) CopyTo(o datastore.KVObject) error {
ec.Lock()
defer ec.Unlock()
dstEc := o.(*endpointCnt)
dstEc.n = ec.n
dstEc.Count = ec.Count
dstEc.dbExists = ec.dbExists
dstEc.dbIndex = ec.dbIndex
return nil
}
func (ec *endpointCnt) EndpointCnt() uint64 {
ec.Lock()
defer ec.Unlock()
return ec.Count
}
func (ec *endpointCnt) updateStore() error {
c := ec.n.getController()
// make a copy of count and n to avoid being overwritten by store.GetObject
count := ec.EndpointCnt()
n := ec.n
for {
if err := c.updateToStore(context.TODO(), ec); err == nil || err != datastore.ErrKeyModified {
return err
}
if err := c.store.GetObject(ec); err != nil {
return fmt.Errorf("could not update the kvobject to latest on endpoint count update: %v", err)
}
ec.Lock()
ec.Count = count
ec.n = n
ec.Unlock()
}
}
func (ec *endpointCnt) setCnt(cnt uint64) error {
ec.Lock()
ec.Count = cnt
ec.Unlock()
return ec.updateStore()
}
func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error {
store := ec.n.getController().store
tmp := &endpointCnt{n: ec.n}
if err := store.GetObject(tmp); err != nil {
return err
}
retry:
ec.Lock()
if inc {
ec.Count++
} else {
if ec.Count > 0 {
ec.Count--
}
}
ec.Unlock()
if err := ec.n.getController().updateToStore(context.TODO(), ec); err != nil {
if err == datastore.ErrKeyModified {
if err := store.GetObject(ec); err != nil {
return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err)
}
goto retry
}
return err
}
return nil
}
func (ec *endpointCnt) IncEndpointCnt() error {
return ec.atomicIncDecEpCnt(true)
}
func (ec *endpointCnt) DecEndpointCnt() error {
return ec.atomicIncDecEpCnt(false)
}

View File

@@ -0,0 +1,66 @@
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
//go:build go1.22
package libnetwork
import (
"context"
"github.com/docker/docker/internal/maputil"
)
// storeEndpoint inserts or updates the endpoint in the store and the in-memory
// cache maintained by the Controller.
//
// This method is thread-safe.
func (c *Controller) storeEndpoint(ctx context.Context, ep *Endpoint) error {
if err := c.updateToStore(ctx, ep); err != nil {
return err
}
c.cacheEndpoint(ep)
return nil
}
// deleteStoredEndpoint deletes the endpoint from the store and the in-memory
// cache maintained by the Controller.
//
// This method is thread-safe.
func (c *Controller) deleteStoredEndpoint(ep *Endpoint) error {
if err := c.deleteFromStore(ep); err != nil {
return err
}
c.endpointsMu.Lock()
defer c.endpointsMu.Unlock()
delete(c.endpoints, ep.id)
return nil
}
// cacheEndpoint caches the endpoint in the in-memory cache of endpoints
// maintained by the Controller.
//
// This method is thread-safe.
func (c *Controller) cacheEndpoint(ep *Endpoint) {
c.endpointsMu.Lock()
defer c.endpointsMu.Unlock()
c.endpoints[ep.id] = ep
}
// findEndpoints looks for all endpoints matching the filter from the in-memory
// cache of endpoints maintained by the Controller.
//
// This method is thread-safe, but do not use it unless you're sure your code
// uses the returned endpoints in thread-safe way (see the comment on
// Controller.endpoints).
func (c *Controller) findEndpoints(filter func(ep *Endpoint) bool) []*Endpoint {
c.endpointsMu.Lock()
defer c.endpointsMu.Unlock()
return maputil.FilterValues(c.endpoints, filter)
}
func filterEndpointByNetworkId(expected string) func(ep *Endpoint) bool {
return func(ep *Endpoint) bool {
return ep.network != nil && ep.network.id == expected
}
}

View File

@@ -0,0 +1,46 @@
package libnetwork
import (
"context"
"testing"
"github.com/docker/docker/libnetwork/config"
"gotest.tools/v3/assert"
)
func TestEndpointStore(t *testing.T) {
configOption := config.OptionDataDir(t.TempDir())
c, err := New(configOption)
assert.NilError(t, err)
defer c.Stop()
// Insert a first endpoint
nw := &Network{id: "testNetwork"}
ep1 := &Endpoint{network: nw, id: "testEndpoint1"}
err = c.storeEndpoint(context.Background(), ep1)
assert.NilError(t, err)
// Then a second endpoint
ep2 := &Endpoint{network: nw, id: "testEndpoint2"}
err = c.storeEndpoint(context.Background(), ep2)
assert.NilError(t, err)
// Check that we can find both endpoints
found := c.findEndpoints(filterEndpointByNetworkId("testNetwork"))
assert.Equal(t, len(found), 2)
assert.Equal(t, found[0], ep1)
assert.Equal(t, found[1], ep2)
// Delete the first endpoint
err = c.deleteStoredEndpoint(ep1)
assert.NilError(t, err)
// Check that we can only find the second endpoint
found = c.findEndpoints(filterEndpointByNetworkId("testNetwork"))
assert.Equal(t, len(found), 1)
assert.Equal(t, found[0], ep2)
// Store the second endpoint again
err = c.storeEndpoint(context.Background(), ep2)
assert.NilError(t, err)
}

View File

@@ -190,7 +190,6 @@ type Network struct {
ipamV6Info []*IpamInfo
enableIPv4 bool
enableIPv6 bool
epCnt *endpointCnt
generic options.Generic
dbIndex uint64
dbExists bool
@@ -542,13 +541,6 @@ func (n *Network) CopyTo(o datastore.KVObject) error {
return nil
}
func (n *Network) getEpCnt() *endpointCnt {
n.mu.Lock()
defer n.mu.Unlock()
return n.epCnt
}
func (n *Network) validateAdvertiseAddrConfig() error {
var errs []error
_, err := n.validatedAdvertiseAddrNMsgs()
@@ -1040,15 +1032,20 @@ func (n *Network) delete(force bool, rmLBEndpoint bool) error {
return &ActiveEndpointsError{name: n.name, id: n.id}
}
if !force && n.configOnly {
refNws := c.findNetworks(filterNetworkByConfigFrom(n.name))
if len(refNws) > 0 {
return types.ForbiddenErrorf("configuration network %q is in use", n.Name())
}
}
// Check that the network is empty
var emptyCount uint64
var emptyCount int
if n.hasLoadBalancerEndpoint() {
emptyCount = 1
}
if !force && n.getEpCnt().EndpointCnt() > emptyCount {
if n.configOnly {
return types.ForbiddenErrorf("configuration network %q is in use", n.Name())
}
eps := c.findEndpoints(filterEndpointByNetworkId(n.id))
if !force && len(eps) > emptyCount {
return &ActiveEndpointsError{name: n.name, id: n.id}
}
@@ -1062,11 +1059,6 @@ func (n *Network) delete(force bool, rmLBEndpoint bool) error {
// continue deletion when force is true even on error
log.G(context.TODO()).Warnf("Error deleting load balancer sandbox: %v", err)
}
// Reload the network from the store to update the epcnt.
n, err = c.getNetworkFromStore(id)
if err != nil {
return errdefs.NotFound(fmt.Errorf("unknown network %s id %s", name, id))
}
}
// Up to this point, errors that we returned were recoverable.
@@ -1076,21 +1068,10 @@ func (n *Network) delete(force bool, rmLBEndpoint bool) error {
// Mark the network for deletion
n.inDelete = true
if err = c.updateToStore(context.TODO(), n); err != nil {
if err = c.storeNetwork(context.TODO(), n); err != nil {
return fmt.Errorf("error marking network %s (%s) for deletion: %v", n.Name(), n.ID(), err)
}
if n.ConfigFrom() != "" {
if t, err := c.getConfigNetwork(n.ConfigFrom()); err == nil {
if err := t.getEpCnt().DecEndpointCnt(); err != nil {
log.G(context.TODO()).Warnf("Failed to update reference count for configuration network %q on removal of network %q: %v",
t.Name(), n.Name(), err)
}
} else {
log.G(context.TODO()).Warnf("Could not find configuration network %q during removal of network %q", n.configFrom, n.Name())
}
}
if n.configOnly {
goto removeFromStore
}
@@ -1127,17 +1108,7 @@ func (n *Network) delete(force bool, rmLBEndpoint bool) error {
}
removeFromStore:
// deleteFromStore performs an atomic delete operation and the
// Network.epCnt will help prevent any possible
// race between endpoint join and network delete
if err = c.deleteFromStore(n.getEpCnt()); err != nil {
if !force {
return fmt.Errorf("error deleting network endpoint count from store: %v", err)
}
log.G(context.TODO()).Debugf("Error deleting endpoint count from store for stale network %s (%s) for deletion: %v", n.Name(), n.ID(), err)
}
if err = c.deleteFromStore(n); err != nil {
if err = c.deleteStoredNetwork(n); err != nil {
return fmt.Errorf("error deleting network from store: %v", err)
}
@@ -1271,14 +1242,14 @@ func (n *Network) createEndpoint(ctx context.Context, name string, options ...En
}
}()
// We should perform updateToStore call right after addEndpoint
// We should perform storeEndpoint call right after addEndpoint
// in order to have iface properly configured
if err = n.getController().updateToStore(ctx, ep); err != nil {
if err = n.getController().storeEndpoint(ctx, ep); err != nil {
return nil, err
}
defer func() {
if err != nil {
if e := n.getController().deleteFromStore(ep); e != nil {
if e := n.getController().deleteStoredEndpoint(ep); e != nil {
log.G(ctx).Warnf("error rolling back endpoint %s from store: %v", name, e)
}
}
@@ -1293,11 +1264,6 @@ func (n *Network) createEndpoint(ctx context.Context, name string, options ...En
}()
}
// Increment endpoint count to indicate completion of endpoint addition
if err = n.getEpCnt().IncEndpointCnt(); err != nil {
return nil, err
}
return ep, nil
}

View File

@@ -0,0 +1,66 @@
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
//go:build go1.22
package libnetwork
import (
"context"
"github.com/docker/docker/internal/maputil"
)
// storeNetwork inserts or updates the network in the store and the in-memory
// cache maintained by the Controller.
//
// This method is thread-safe.
func (c *Controller) storeNetwork(ctx context.Context, n *Network) error {
if err := c.updateToStore(ctx, n); err != nil {
return err
}
c.cacheNetwork(n)
return nil
}
// deleteStoredNetwork deletes the network from the store and the in-memory
// cache maintained by the Controller.
//
// This method is thread-safe.
func (c *Controller) deleteStoredNetwork(n *Network) error {
if err := c.deleteFromStore(n); err != nil {
return err
}
c.networksMu.Lock()
defer c.networksMu.Unlock()
delete(c.networks, n.id)
return nil
}
// cacheNetwork caches the network in the in-memory cache of networks
// maintained by the Controller.
//
// This method is thread-safe.
func (c *Controller) cacheNetwork(n *Network) {
c.networksMu.Lock()
defer c.networksMu.Unlock()
c.networks[n.ID()] = n
}
// findNetworks looks for all networks matching the filter from the in-memory
// cache of networks maintained by the Controller.
//
// This method is thread-safe, but do not use it unless you're sure your code
// uses the returned networks in thread-safe way (see the comment on
// Controller.networks).
func (c *Controller) findNetworks(filter func(nw *Network) bool) []*Network {
c.networksMu.Lock()
defer c.networksMu.Unlock()
return maputil.FilterValues(c.networks, filter)
}
func filterNetworkByConfigFrom(expected string) func(nw *Network) bool {
return func(nw *Network) bool {
return nw.configFrom == expected
}
}

View File

@@ -0,0 +1,76 @@
package libnetwork
import (
"context"
"slices"
"testing"
"github.com/docker/docker/libnetwork/config"
"gotest.tools/v3/assert"
)
func TestNetworkStore(t *testing.T) {
configOption := config.OptionDataDir(t.TempDir())
c, err := New(configOption)
assert.NilError(t, err)
defer c.Stop()
// Insert a first network
nw1 := &Network{id: "testNetwork1", configFrom: "config-network"}
err = c.storeNetwork(context.Background(), nw1)
assert.NilError(t, err)
// Then a second network
nw2 := &Network{id: "testNetwork2"}
err = c.storeNetwork(context.Background(), nw2)
assert.NilError(t, err)
netSorter := func(a, b *Network) int {
if a.name < b.name {
return -1
}
if a.name > b.name {
return 1
}
return 0
}
for _, tc := range []struct {
name string
filter func(nw *Network) bool
expNetworks []*Network
}{
{
name: "no filter",
filter: func(nw *Network) bool { return true },
expNetworks: []*Network{nw1, nw2},
},
{
name: "filter by configFrom",
filter: filterNetworkByConfigFrom("config-network"),
expNetworks: []*Network{nw1},
},
} {
t.Run(tc.name, func(t *testing.T) {
found := c.findNetworks(tc.filter)
assert.Equal(t, len(found), len(tc.expNetworks))
slices.SortFunc(found, netSorter)
for i, nw := range tc.expNetworks {
assert.Check(t, found[i] == nw, "got: %s; expected: %s", found[i].name, nw.name)
}
})
}
// Delete the first network
err = c.deleteStoredNetwork(nw1)
assert.NilError(t, err)
// Check that we can only find the second network
found := c.findNetworks(func(nw *Network) bool { return true })
assert.Equal(t, len(found), 1)
assert.Check(t, found[0] == nw2)
// Store the second network again
err = c.storeNetwork(context.Background(), nw2)
assert.NilError(t, err)
}

View File

@@ -231,6 +231,8 @@ func (c *Controller) sandboxRestore(activeSandboxes map[string]interface{}) erro
},
sandboxID: sbs.ID,
}
c.cacheEndpoint(ep)
c.cacheNetwork(ep.network)
} else {
ep, err = n.getEndpointFromStore(eps.Eid)
if err != nil {
@@ -240,6 +242,7 @@ func (c *Controller) sandboxRestore(activeSandboxes map[string]interface{}) erro
network: n,
sandboxID: sbs.ID,
}
c.cacheEndpoint(ep)
}
}
if _, ok := activeSandboxes[sb.ID()]; ok && err != nil {

View File

@@ -3,7 +3,6 @@ package libnetwork
import (
"context"
"fmt"
"strings"
"github.com/containerd/log"
"github.com/docker/docker/libnetwork/datastore"
@@ -31,15 +30,7 @@ func (c *Controller) getNetworks() ([]*Network, error) {
for _, kvo := range kvol {
n := kvo.(*Network)
n.ctrlr = c
ec := &endpointCnt{n: n}
err = c.store.GetObject(ec)
if err != nil && !n.inDelete {
log.G(context.TODO()).Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
continue
}
n.epCnt = ec
c.cacheNetwork(n)
if n.scope == "" {
n.scope = scope.Local
}
@@ -60,22 +51,10 @@ func (c *Controller) getNetworksFromStore(ctx context.Context) []*Network { // F
return nil
}
kvep, err := c.store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
if err != nil && err != datastore.ErrKeyNotFound {
log.G(ctx).Warnf("failed to get endpoint_count map from store: %v", err)
}
for _, kvo := range kvol {
n := kvo.(*Network)
n.mu.Lock()
n.ctrlr = c
ec := &endpointCnt{n: n}
// Trim the leading & trailing "/" to make it consistent across all stores
if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
ec = val.(*endpointCnt)
ec.n = n
n.epCnt = ec
}
if n.scope == "" {
n.scope = scope.Local
}
@@ -92,6 +71,7 @@ func (n *Network) getEndpointFromStore(eid string) (*Endpoint, error) {
if err != nil {
return nil, fmt.Errorf("could not find endpoint %s: %w", eid, err)
}
n.ctrlr.cacheEndpoint(ep)
return ep, nil
}
@@ -110,6 +90,7 @@ func (n *Network) getEndpointsFromStore() ([]*Endpoint, error) {
for _, kvo := range kvol {
ep := kvo.(*Endpoint)
epl = append(epl, ep)
n.ctrlr.cacheEndpoint(ep)
}
return epl, nil