diff --git a/daemon/network.go b/daemon/network.go index 70b62df838..5b54d5d296 100644 --- a/daemon/network.go +++ b/daemon/network.go @@ -360,7 +360,7 @@ func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string n, err := c.NewNetwork(driver, create.Name, id, nwOptions...) if err != nil { - if _, ok := err.(libnetwork.ErrDataStoreNotInitialized); ok { + if errors.Is(err, libnetwork.ErrDataStoreNotInitialized) { //nolint: revive return nil, errors.New("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.") } diff --git a/libnetwork/controller.go b/libnetwork/controller.go index 9e745bcd2d..3d9f1fd8d8 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -88,7 +88,7 @@ type Controller struct { drvRegistry *drvregistry.DrvRegistry sandboxes sandboxTable cfg *config.Config - stores []datastore.DataStore + store datastore.DataStore extKeyListener net.Listener watchCh chan *Endpoint unWatchCh chan *Endpoint @@ -130,7 +130,7 @@ func New(cfgOptions ...config.Option) (*Controller, error) { return nil, err } - drvRegistry, err := drvregistry.New(c.getStore(datastore.LocalScope), c.getStore(datastore.GlobalScope), c.RegisterDriver, nil, c.cfg.PluginGetter) + drvRegistry, err := drvregistry.New(c.getStore(), nil, c.RegisterDriver, nil, c.cfg.PluginGetter) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func New(cfgOptions ...config.Option) (*Controller, error) { } } - if err = initIPAMDrivers(drvRegistry, nil, c.getStore(datastore.GlobalScope), c.cfg.DefaultAddressPool); err != nil { + if err = initIPAMDrivers(drvRegistry, nil, nil, c.cfg.DefaultAddressPool); err != nil { return nil, err } @@ -696,7 +696,7 @@ var joinCluster NetworkWalker = func(nw Network) bool { } func (c *Controller) reservePools() { - networks, err := c.getNetworksForScope(datastore.LocalScope) + networks, err := c.getNetworks() if err != nil { logrus.Warnf("Could not retrieve networks from local store during ipam allocation for existing networks: %v", err) return diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 9d59a1cacd..e0cab348e3 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -1151,7 +1151,7 @@ func (c *Controller) cleanupLocalEndpoints() { eps[ep.id] = true } } - nl, err := c.getNetworksForScope(datastore.LocalScope) + nl, err := c.getNetworks() if err != nil { logrus.Warnf("Could not get list of networks during endpoint cleanup: %v", err) return diff --git a/libnetwork/endpoint_cnt.go b/libnetwork/endpoint_cnt.go index c4670335ce..38a8d89351 100644 --- a/libnetwork/endpoint_cnt.go +++ b/libnetwork/endpoint_cnt.go @@ -109,7 +109,7 @@ func (ec *endpointCnt) EndpointCnt() uint64 { } func (ec *endpointCnt) updateStore() error { - store := ec.n.getController().getStore(ec.DataScope()) + store := ec.n.getController().getStore() if store == nil { return fmt.Errorf("store not found for scope %s on endpoint count update", ec.DataScope()) } @@ -138,7 +138,7 @@ func (ec *endpointCnt) setCnt(cnt uint64) error { } func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error { - store := ec.n.getController().getStore(ec.DataScope()) + store := ec.n.getController().getStore() if store == nil { return fmt.Errorf("store not found for scope %s", ec.DataScope()) } diff --git a/libnetwork/error.go b/libnetwork/error.go index 5f00709ff9..31fd70e54c 100644 --- a/libnetwork/error.go +++ b/libnetwork/error.go @@ -1,6 +1,7 @@ package libnetwork import ( + "errors" "fmt" ) @@ -186,8 +187,4 @@ func (mr ManagerRedirectError) Maskable() {} // ErrDataStoreNotInitialized is returned if an invalid data scope is passed // for getting data store -type ErrDataStoreNotInitialized string - -func (dsni ErrDataStoreNotInitialized) Error() string { - return fmt.Sprintf("datastore for scope %q is not initialized", string(dsni)) -} +var ErrDataStoreNotInitialized = errors.New("datastore is not initialized") diff --git a/libnetwork/sandbox_store.go b/libnetwork/sandbox_store.go index 866c620470..b71549a1c2 100644 --- a/libnetwork/sandbox_store.go +++ b/libnetwork/sandbox_store.go @@ -187,7 +187,7 @@ func (sb *Sandbox) storeDelete() error { } func (c *Controller) sandboxCleanup(activeSandboxes map[string]interface{}) { - store := c.getStore(datastore.LocalScope) + store := c.getStore() if store == nil { logrus.Error("Could not find local scope store while trying to cleanup sandboxes") return diff --git a/libnetwork/store.go b/libnetwork/store.go index 3c7a324a79..f64924d614 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -22,40 +22,27 @@ func (c *Controller) initStores() error { if c.cfg == nil { return nil } - store, err := datastore.NewDataStore(c.cfg.Scope) + var err error + c.store, err = datastore.NewDataStore(c.cfg.Scope) if err != nil { return err } - c.stores = []datastore.DataStore{store} c.startWatch() return nil } func (c *Controller) closeStores() { - for _, store := range c.getStores() { + if store := c.store; store != nil { store.Close() } } -func (c *Controller) getStore(scope string) datastore.DataStore { +func (c *Controller) getStore() datastore.DataStore { c.mu.Lock() defer c.mu.Unlock() - for _, store := range c.stores { - if store.Scope() == scope { - return store - } - } - - return nil -} - -func (c *Controller) getStores() []datastore.DataStore { - c.mu.Lock() - defer c.mu.Unlock() - - return c.stores + return c.store } func (c *Controller) getNetworkFromStore(nid string) (*network, error) { @@ -67,10 +54,10 @@ func (c *Controller) getNetworkFromStore(nid string) (*network, error) { return nil, ErrNoSuchNetwork(nid) } -func (c *Controller) getNetworksForScope(scope string) ([]*network, error) { +func (c *Controller) getNetworks() ([]*network, error) { var nl []*network - store := c.getStore(scope) + store := c.getStore() if store == nil { return nil, nil } @@ -78,8 +65,7 @@ func (c *Controller) getNetworksForScope(scope string) ([]*network, error) { kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &network{ctrlr: c}) if err != nil && err != datastore.ErrKeyNotFound { - return nil, fmt.Errorf("failed to get networks for scope %s: %v", - scope, err) + return nil, fmt.Errorf("failed to get networks: %w", err) } for _, kvo := range kvol { @@ -95,7 +81,7 @@ func (c *Controller) getNetworksForScope(scope string) ([]*network, error) { n.epCnt = ec if n.scope == "" { - n.scope = scope + n.scope = store.Scope() } nl = append(nl, n) } @@ -103,92 +89,80 @@ func (c *Controller) getNetworksForScope(scope string) ([]*network, error) { return nl, nil } -func (c *Controller) getNetworksFromStore() []*network { +func (c *Controller) getNetworksFromStore() []*network { // FIXME: unify with c.getNetworks() var nl []*network - for _, store := range c.getStores() { - kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &network{ctrlr: c}) - // Continue searching in the next store if no keys found in this store - if err != nil { - if err != datastore.ErrKeyNotFound { - logrus.Debugf("failed to get networks for scope %s: %v", store.Scope(), err) - } - continue + store := c.getStore() + kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &network{ctrlr: c}) + if err != nil { + if err != datastore.ErrKeyNotFound { + logrus.Debugf("failed to get networks from store: %v", err) } + return nil + } - kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{}) - if err != nil && err != datastore.ErrKeyNotFound { - logrus.Warnf("failed to get endpoint_count map for scope %s: %v", store.Scope(), err) - } + kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{}) + if err != nil && err != datastore.ErrKeyNotFound { + logrus.Warnf("failed to get endpoint_count map from store: %v", err) + } - for _, kvo := range kvol { - n := kvo.(*network) - n.mu.Lock() - n.ctrlr = c - ec := &endpointCnt{n: n} - // Trim the leading & trailing "/" to make it consistent across all stores - if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok { - ec = val.(*endpointCnt) - ec.n = n - n.epCnt = ec - } - if n.scope == "" { - n.scope = store.Scope() - } - n.mu.Unlock() - nl = append(nl, n) + for _, kvo := range kvol { + n := kvo.(*network) + n.mu.Lock() + n.ctrlr = c + ec := &endpointCnt{n: n} + // Trim the leading & trailing "/" to make it consistent across all stores + if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok { + ec = val.(*endpointCnt) + ec.n = n + n.epCnt = ec } + if n.scope == "" { + n.scope = store.Scope() + } + n.mu.Unlock() + nl = append(nl, n) } return nl } func (n *network) getEndpointFromStore(eid string) (*Endpoint, error) { - var errors []string - for _, store := range n.ctrlr.getStores() { - ep := &Endpoint{id: eid, network: n} - err := store.GetObject(datastore.Key(ep.Key()...), ep) - // Continue searching in the next store if the key is not found in this store - if err != nil { - if err != datastore.ErrKeyNotFound { - errors = append(errors, fmt.Sprintf("{%s:%v}, ", store.Scope(), err)) - logrus.Debugf("could not find endpoint %s in %s: %v", eid, store.Scope(), err) - } - continue - } - return ep, nil + store := n.ctrlr.getStore() + ep := &Endpoint{id: eid, network: n} + err := store.GetObject(datastore.Key(ep.Key()...), ep) + if err != nil { + return nil, fmt.Errorf("could not find endpoint %s: %w", eid, err) } - return nil, fmt.Errorf("could not find endpoint %s: %v", eid, errors) + return ep, nil } func (n *network) getEndpointsFromStore() ([]*Endpoint, error) { var epl []*Endpoint tmp := Endpoint{network: n} - for _, store := range n.getController().getStores() { - kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &Endpoint{network: n}) - // Continue searching in the next store if no keys found in this store - if err != nil { - if err != datastore.ErrKeyNotFound { - logrus.Debugf("failed to get endpoints for network %s scope %s: %v", - n.Name(), store.Scope(), err) - } - continue + store := n.getController().getStore() + kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &Endpoint{network: n}) + if err != nil { + if err != datastore.ErrKeyNotFound { + return nil, fmt.Errorf("failed to get endpoints for network %s scope %s: %w", + n.Name(), store.Scope(), err) } + return nil, nil + } - for _, kvo := range kvol { - ep := kvo.(*Endpoint) - epl = append(epl, ep) - } + for _, kvo := range kvol { + ep := kvo.(*Endpoint) + epl = append(epl, ep) } return epl, nil } func (c *Controller) updateToStore(kvObject datastore.KVObject) error { - cs := c.getStore(kvObject.DataScope()) + cs := c.getStore() if cs == nil { - return ErrDataStoreNotInitialized(kvObject.DataScope()) + return ErrDataStoreNotInitialized } if err := cs.PutObjectAtomic(kvObject); err != nil { @@ -202,9 +176,9 @@ func (c *Controller) updateToStore(kvObject datastore.KVObject) error { } func (c *Controller) deleteFromStore(kvObject datastore.KVObject) error { - cs := c.getStore(kvObject.DataScope()) + cs := c.getStore() if cs == nil { - return ErrDataStoreNotInitialized(kvObject.DataScope()) + return ErrDataStoreNotInitialized } retry: @@ -258,7 +232,8 @@ func (c *Controller) networkWatchLoop(nw *netWatch, ep *Endpoint, ecCh <-chan da epl, err := ec.n.getEndpointsFromStore() if err != nil { - break + logrus.WithError(err).Debug("error getting endpoints from store") + continue } c.mu.Lock() @@ -356,7 +331,7 @@ func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoi nw.stopCh = make(chan struct{}) c.mu.Unlock() - store := c.getStore(n.DataScope()) + store := c.getStore() if store == nil { return } diff --git a/libnetwork/store_linux_test.go b/libnetwork/store_linux_test.go index 2d06af4281..f6213540fb 100644 --- a/libnetwork/store_linux_test.go +++ b/libnetwork/store_linux_test.go @@ -30,7 +30,7 @@ func TestNoPersist(t *testing.T) { if err != nil { t.Fatalf("Error creating endpoint: %v", err) } - store := ctrl.getStore(datastore.LocalScope).KVStore() + store := ctrl.getStore().KVStore() if exists, _ := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); exists { t.Fatalf("Network with persist=false should not be stored in KV Store") } diff --git a/libnetwork/store_test.go b/libnetwork/store_test.go index 2623c16923..c857aeb63f 100644 --- a/libnetwork/store_test.go +++ b/libnetwork/store_test.go @@ -36,7 +36,7 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con if err != nil { t.Fatalf("Error creating endpoint: %v", err) } - store := ctrl.getStore(datastore.LocalScope).KVStore() + store := ctrl.getStore().KVStore() if exists, err := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); !exists || err != nil { t.Fatalf("Network key should have been created.") }