diff --git a/libnetwork/controller.go b/libnetwork/controller.go index 2e485a7336..156232e982 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -100,6 +100,23 @@ type Controller struct { diagnosticServer *diagnostic.Server mu sync.Mutex + // endpoints is an in-memory cache of Endpoint. Do not use this map unless + // you're sure your code is thread-safe. + // + // The data persistence layer is instantiating new Endpoint objects every + // time it loads an object from its store or in-memory cache. This leads to + // multiple instances representing the same endpoint to concurrently live + // in memory. As such, the Endpoint mutex might be ineffective and not + // correctly protect against data races. + // + // If you want to use this map for new or existing code, you need to make + // sure: 1. the Endpoint object is correctly locked; 2. the lock order + // between Sandbox, Network and Endpoint is the same as the rest of the + // code (in order to avoid deadlocks). + endpoints map[string]*Endpoint + // endpointsMu protects the endpoints map. + endpointsMu sync.Mutex + // FIXME(thaJeztah): defOsSbox is always nil on non-Linux: move these fields to Linux-only files. defOsSboxOnce sync.Once defOsSbox *osl.Namespace @@ -118,6 +135,7 @@ func New(cfgOptions ...config.Option) (*Controller, error) { cfg: cfg, store: store, sandboxes: map[string]*Sandbox{}, + endpoints: map[string]*Endpoint{}, svcRecords: make(map[string]*svcInfo), serviceBindings: make(map[serviceKey]*service), agentInitDone: make(chan struct{}), diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 3ac2c2112a..c100fd0235 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -582,7 +582,7 @@ func (ep *Endpoint) sbJoin(ctx context.Context, sb *Sandbox, options ...Endpoint return errdefs.System(err) } - if err := n.getController().updateToStore(ctx, ep); err != nil { + if err := n.getController().storeEndpoint(ctx, ep); err != nil { return err } @@ -762,7 +762,7 @@ func (ep *Endpoint) rename(name string) error { ep.mu.Unlock() // Update the store with the updated name - if err := ep.getNetwork().getController().updateToStore(context.TODO(), ep); err != nil { + if err := ep.getNetwork().getController().storeEndpoint(context.TODO(), ep); err != nil { return err } @@ -798,7 +798,7 @@ func (ep *Endpoint) UpdateDNSNames(dnsNames []string) error { } // Update the store with the updated name - if err := c.updateToStore(context.TODO(), ep); err != nil { + if err := c.storeEndpoint(context.TODO(), ep); err != nil { return err } @@ -904,7 +904,7 @@ func (ep *Endpoint) sbLeave(ctx context.Context, sb *Sandbox, force bool) error // spurious logs when cleaning up the sandbox when the daemon // ungracefully exits and restarts before completing sandbox // detach but after store has been updated. - if err := n.getController().updateToStore(ctx, ep); err != nil { + if err := n.getController().storeEndpoint(ctx, ep); err != nil { return err } @@ -1024,14 +1024,14 @@ func (ep *Endpoint) Delete(ctx context.Context, force bool) error { } } - if err = n.getController().deleteFromStore(ep); err != nil { + if err = n.getController().deleteStoredEndpoint(ep); err != nil { return err } defer func() { if err != nil && !force { ep.dbExists = false - if e := n.getController().updateToStore(context.WithoutCancel(ctx), ep); e != nil { + if e := n.getController().storeEndpoint(context.WithoutCancel(ctx), ep); e != nil { log.G(ctx).Warnf("failed to recreate endpoint in store %s : %v", name, e) } } diff --git a/libnetwork/endpoint_store.go b/libnetwork/endpoint_store.go new file mode 100644 index 0000000000..3f2fd133f2 --- /dev/null +++ b/libnetwork/endpoint_store.go @@ -0,0 +1,41 @@ +package libnetwork + +import "context" + +// storeEndpoint inserts or updates the endpoint in the store and the in-memory +// cache maintained by the Controller. +// +// This method is thread-safe. +func (c *Controller) storeEndpoint(ctx context.Context, ep *Endpoint) error { + if err := c.updateToStore(ctx, ep); err != nil { + return err + } + c.cacheEndpoint(ep) + return nil +} + +// deleteStoredEndpoint deletes the endpoint from the store and the in-memory +// cache maintained by the Controller. +// +// This method is thread-safe. +func (c *Controller) deleteStoredEndpoint(ep *Endpoint) error { + if err := c.deleteFromStore(ep); err != nil { + return err + } + + c.endpointsMu.Lock() + defer c.endpointsMu.Unlock() + delete(c.endpoints, ep.id) + + return nil +} + +// cacheEndpoint caches the endpoint in the in-memory cache of endpoints +// maintained by the Controller. +// +// This method is thread-safe. +func (c *Controller) cacheEndpoint(ep *Endpoint) { + c.endpointsMu.Lock() + defer c.endpointsMu.Unlock() + c.endpoints[ep.id] = ep +} diff --git a/libnetwork/endpoint_store_test.go b/libnetwork/endpoint_store_test.go new file mode 100644 index 0000000000..7372832d6d --- /dev/null +++ b/libnetwork/endpoint_store_test.go @@ -0,0 +1,35 @@ +package libnetwork + +import ( + "context" + "testing" + + "github.com/docker/docker/libnetwork/config" + "gotest.tools/v3/assert" +) + +func TestEndpointStore(t *testing.T) { + configOption := config.OptionDataDir(t.TempDir()) + c, err := New(configOption) + assert.NilError(t, err) + defer c.Stop() + + // Insert a first endpoint + nw := &Network{id: "testNetwork"} + ep1 := &Endpoint{network: nw, id: "testEndpoint1"} + err = c.storeEndpoint(context.Background(), ep1) + assert.NilError(t, err) + + // Then a second endpoint + ep2 := &Endpoint{network: nw, id: "testEndpoint2"} + err = c.storeEndpoint(context.Background(), ep2) + assert.NilError(t, err) + + // Delete the first endpoint + err = c.deleteStoredEndpoint(ep1) + assert.NilError(t, err) + + // Store the second endpoint again + err = c.storeEndpoint(context.Background(), ep2) + assert.NilError(t, err) +} diff --git a/libnetwork/network.go b/libnetwork/network.go index c29c5ea9cc..5577f7fca9 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -1271,14 +1271,14 @@ func (n *Network) createEndpoint(ctx context.Context, name string, options ...En } }() - // We should perform updateToStore call right after addEndpoint + // We should perform storeEndpoint call right after addEndpoint // in order to have iface properly configured - if err = n.getController().updateToStore(ctx, ep); err != nil { + if err = n.getController().storeEndpoint(ctx, ep); err != nil { return nil, err } defer func() { if err != nil { - if e := n.getController().deleteFromStore(ep); e != nil { + if e := n.getController().deleteStoredEndpoint(ep); e != nil { log.G(ctx).Warnf("error rolling back endpoint %s from store: %v", name, e) } } diff --git a/libnetwork/sandbox_store.go b/libnetwork/sandbox_store.go index 3239591d2b..7736e477fe 100644 --- a/libnetwork/sandbox_store.go +++ b/libnetwork/sandbox_store.go @@ -231,6 +231,7 @@ func (c *Controller) sandboxRestore(activeSandboxes map[string]interface{}) erro }, sandboxID: sbs.ID, } + c.cacheEndpoint(ep) } else { ep, err = n.getEndpointFromStore(eps.Eid) if err != nil { @@ -240,6 +241,7 @@ func (c *Controller) sandboxRestore(activeSandboxes map[string]interface{}) erro network: n, sandboxID: sbs.ID, } + c.cacheEndpoint(ep) } } if _, ok := activeSandboxes[sb.ID()]; ok && err != nil { diff --git a/libnetwork/store.go b/libnetwork/store.go index 7d0e03e778..30cd8a0943 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -92,6 +92,7 @@ func (n *Network) getEndpointFromStore(eid string) (*Endpoint, error) { if err != nil { return nil, fmt.Errorf("could not find endpoint %s: %w", eid, err) } + n.ctrlr.cacheEndpoint(ep) return ep, nil } @@ -110,6 +111,7 @@ func (n *Network) getEndpointsFromStore() ([]*Endpoint, error) { for _, kvo := range kvol { ep := kvo.(*Endpoint) epl = append(epl, ep) + n.ctrlr.cacheEndpoint(ep) } return epl, nil