mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
libnet: Controller: cache endpoints in-memory
The `Controller`'s store is used by: - `deleteFromStore` - `getEndpointFromStore` - `getEndpointsFromStore` - `updateToStore` - … and other methods that can't store / delete / retrieve an Endpoint Calls to `updateToStore` and `deleteFromStore` have been replaced with `upsertEndpoint` and `deleteEndpoint`. Both `getEndpointFromStore` and `getEndpointsFromStore` call `cacheEndpoint` to ensure endpoints loaded from the datastore are kept in-memory. Finally, `sandboxRestore` was instantiating `Endpoint` itself. These are cached too. Signed-off-by: Albin Kerouanton <albinker@gmail.com>
This commit is contained in:
@@ -100,6 +100,23 @@ type Controller struct {
|
||||
diagnosticServer *diagnostic.Server
|
||||
mu sync.Mutex
|
||||
|
||||
// endpoints is an in-memory cache of Endpoint. Do not use this map unless
|
||||
// you're sure your code is thread-safe.
|
||||
//
|
||||
// The data persistence layer is instantiating new Endpoint objects every
|
||||
// time it loads an object from its store or in-memory cache. This leads to
|
||||
// multiple instances representing the same endpoint to concurrently live
|
||||
// in memory. As such, the Endpoint mutex might be ineffective and not
|
||||
// correctly protect against data races.
|
||||
//
|
||||
// If you want to use this map for new or existing code, you need to make
|
||||
// sure: 1. the Endpoint object is correctly locked; 2. the lock order
|
||||
// between Sandbox, Network and Endpoint is the same as the rest of the
|
||||
// code (in order to avoid deadlocks).
|
||||
endpoints map[string]*Endpoint
|
||||
// endpointsMu protects the endpoints map.
|
||||
endpointsMu sync.Mutex
|
||||
|
||||
// FIXME(thaJeztah): defOsSbox is always nil on non-Linux: move these fields to Linux-only files.
|
||||
defOsSboxOnce sync.Once
|
||||
defOsSbox *osl.Namespace
|
||||
@@ -118,6 +135,7 @@ func New(cfgOptions ...config.Option) (*Controller, error) {
|
||||
cfg: cfg,
|
||||
store: store,
|
||||
sandboxes: map[string]*Sandbox{},
|
||||
endpoints: map[string]*Endpoint{},
|
||||
svcRecords: make(map[string]*svcInfo),
|
||||
serviceBindings: make(map[serviceKey]*service),
|
||||
agentInitDone: make(chan struct{}),
|
||||
|
||||
@@ -582,7 +582,7 @@ func (ep *Endpoint) sbJoin(ctx context.Context, sb *Sandbox, options ...Endpoint
|
||||
return errdefs.System(err)
|
||||
}
|
||||
|
||||
if err := n.getController().updateToStore(ctx, ep); err != nil {
|
||||
if err := n.getController().storeEndpoint(ctx, ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -762,7 +762,7 @@ func (ep *Endpoint) rename(name string) error {
|
||||
ep.mu.Unlock()
|
||||
|
||||
// Update the store with the updated name
|
||||
if err := ep.getNetwork().getController().updateToStore(context.TODO(), ep); err != nil {
|
||||
if err := ep.getNetwork().getController().storeEndpoint(context.TODO(), ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -798,7 +798,7 @@ func (ep *Endpoint) UpdateDNSNames(dnsNames []string) error {
|
||||
}
|
||||
|
||||
// Update the store with the updated name
|
||||
if err := c.updateToStore(context.TODO(), ep); err != nil {
|
||||
if err := c.storeEndpoint(context.TODO(), ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -904,7 +904,7 @@ func (ep *Endpoint) sbLeave(ctx context.Context, sb *Sandbox, force bool) error
|
||||
// spurious logs when cleaning up the sandbox when the daemon
|
||||
// ungracefully exits and restarts before completing sandbox
|
||||
// detach but after store has been updated.
|
||||
if err := n.getController().updateToStore(ctx, ep); err != nil {
|
||||
if err := n.getController().storeEndpoint(ctx, ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1024,14 +1024,14 @@ func (ep *Endpoint) Delete(ctx context.Context, force bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
if err = n.getController().deleteFromStore(ep); err != nil {
|
||||
if err = n.getController().deleteStoredEndpoint(ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil && !force {
|
||||
ep.dbExists = false
|
||||
if e := n.getController().updateToStore(context.WithoutCancel(ctx), ep); e != nil {
|
||||
if e := n.getController().storeEndpoint(context.WithoutCancel(ctx), ep); e != nil {
|
||||
log.G(ctx).Warnf("failed to recreate endpoint in store %s : %v", name, e)
|
||||
}
|
||||
}
|
||||
|
||||
41
libnetwork/endpoint_store.go
Normal file
41
libnetwork/endpoint_store.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package libnetwork
|
||||
|
||||
import "context"
|
||||
|
||||
// storeEndpoint inserts or updates the endpoint in the store and the in-memory
|
||||
// cache maintained by the Controller.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
func (c *Controller) storeEndpoint(ctx context.Context, ep *Endpoint) error {
|
||||
if err := c.updateToStore(ctx, ep); err != nil {
|
||||
return err
|
||||
}
|
||||
c.cacheEndpoint(ep)
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteStoredEndpoint deletes the endpoint from the store and the in-memory
|
||||
// cache maintained by the Controller.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
func (c *Controller) deleteStoredEndpoint(ep *Endpoint) error {
|
||||
if err := c.deleteFromStore(ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.endpointsMu.Lock()
|
||||
defer c.endpointsMu.Unlock()
|
||||
delete(c.endpoints, ep.id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cacheEndpoint caches the endpoint in the in-memory cache of endpoints
|
||||
// maintained by the Controller.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
func (c *Controller) cacheEndpoint(ep *Endpoint) {
|
||||
c.endpointsMu.Lock()
|
||||
defer c.endpointsMu.Unlock()
|
||||
c.endpoints[ep.id] = ep
|
||||
}
|
||||
35
libnetwork/endpoint_store_test.go
Normal file
35
libnetwork/endpoint_store_test.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package libnetwork
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/docker/libnetwork/config"
|
||||
"gotest.tools/v3/assert"
|
||||
)
|
||||
|
||||
func TestEndpointStore(t *testing.T) {
|
||||
configOption := config.OptionDataDir(t.TempDir())
|
||||
c, err := New(configOption)
|
||||
assert.NilError(t, err)
|
||||
defer c.Stop()
|
||||
|
||||
// Insert a first endpoint
|
||||
nw := &Network{id: "testNetwork"}
|
||||
ep1 := &Endpoint{network: nw, id: "testEndpoint1"}
|
||||
err = c.storeEndpoint(context.Background(), ep1)
|
||||
assert.NilError(t, err)
|
||||
|
||||
// Then a second endpoint
|
||||
ep2 := &Endpoint{network: nw, id: "testEndpoint2"}
|
||||
err = c.storeEndpoint(context.Background(), ep2)
|
||||
assert.NilError(t, err)
|
||||
|
||||
// Delete the first endpoint
|
||||
err = c.deleteStoredEndpoint(ep1)
|
||||
assert.NilError(t, err)
|
||||
|
||||
// Store the second endpoint again
|
||||
err = c.storeEndpoint(context.Background(), ep2)
|
||||
assert.NilError(t, err)
|
||||
}
|
||||
@@ -1271,14 +1271,14 @@ func (n *Network) createEndpoint(ctx context.Context, name string, options ...En
|
||||
}
|
||||
}()
|
||||
|
||||
// We should perform updateToStore call right after addEndpoint
|
||||
// We should perform storeEndpoint call right after addEndpoint
|
||||
// in order to have iface properly configured
|
||||
if err = n.getController().updateToStore(ctx, ep); err != nil {
|
||||
if err = n.getController().storeEndpoint(ctx, ep); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if e := n.getController().deleteFromStore(ep); e != nil {
|
||||
if e := n.getController().deleteStoredEndpoint(ep); e != nil {
|
||||
log.G(ctx).Warnf("error rolling back endpoint %s from store: %v", name, e)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -231,6 +231,7 @@ func (c *Controller) sandboxRestore(activeSandboxes map[string]interface{}) erro
|
||||
},
|
||||
sandboxID: sbs.ID,
|
||||
}
|
||||
c.cacheEndpoint(ep)
|
||||
} else {
|
||||
ep, err = n.getEndpointFromStore(eps.Eid)
|
||||
if err != nil {
|
||||
@@ -240,6 +241,7 @@ func (c *Controller) sandboxRestore(activeSandboxes map[string]interface{}) erro
|
||||
network: n,
|
||||
sandboxID: sbs.ID,
|
||||
}
|
||||
c.cacheEndpoint(ep)
|
||||
}
|
||||
}
|
||||
if _, ok := activeSandboxes[sb.ID()]; ok && err != nil {
|
||||
|
||||
@@ -92,6 +92,7 @@ func (n *Network) getEndpointFromStore(eid string) (*Endpoint, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not find endpoint %s: %w", eid, err)
|
||||
}
|
||||
n.ctrlr.cacheEndpoint(ep)
|
||||
return ep, nil
|
||||
}
|
||||
|
||||
@@ -110,6 +111,7 @@ func (n *Network) getEndpointsFromStore() ([]*Endpoint, error) {
|
||||
for _, kvo := range kvol {
|
||||
ep := kvo.(*Endpoint)
|
||||
epl = append(epl, ep)
|
||||
n.ctrlr.cacheEndpoint(ep)
|
||||
}
|
||||
|
||||
return epl, nil
|
||||
|
||||
Reference in New Issue
Block a user