mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
libnet: Controller: add ctx to store methods
Signed-off-by: Albin Kerouanton <albinker@gmail.com>
This commit is contained in:
@@ -656,7 +656,7 @@ addToStore:
|
||||
// end up with a datastore containing a network and not an epCnt,
|
||||
// in case of an ungraceful shutdown during this function call.
|
||||
epCnt := &endpointCnt{n: nw}
|
||||
if err := c.updateToStore(epCnt); err != nil {
|
||||
if err := c.updateToStore(context.TODO(), epCnt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
@@ -668,7 +668,7 @@ addToStore:
|
||||
}()
|
||||
|
||||
nw.epCnt = epCnt
|
||||
if err := c.updateToStore(nw); err != nil {
|
||||
if err := c.updateToStore(context.TODO(), nw); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
@@ -967,7 +967,7 @@ func (c *Controller) NewSandbox(ctx context.Context, containerID string, options
|
||||
}
|
||||
}()
|
||||
|
||||
if err := sb.storeUpdate(); err != nil {
|
||||
if err := sb.storeUpdate(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to update the store state of sandbox: %v", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -750,7 +750,7 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
|
||||
return err
|
||||
}
|
||||
|
||||
return d.storeUpdate(config)
|
||||
return d.storeUpdate(context.TODO(), config)
|
||||
}
|
||||
|
||||
func (d *driver) checkConflict(config *networkConfiguration) error {
|
||||
@@ -1183,7 +1183,7 @@ func (d *driver) CreateEndpoint(ctx context.Context, nid, eid string, ifInfo dri
|
||||
}
|
||||
}
|
||||
|
||||
if err = d.storeUpdate(endpoint); err != nil {
|
||||
if err = d.storeUpdate(ctx, endpoint); err != nil {
|
||||
return fmt.Errorf("failed to save bridge endpoint %.7s to store: %v", endpoint.id, err)
|
||||
}
|
||||
|
||||
@@ -1443,7 +1443,7 @@ func (d *driver) ProgramExternalConnectivity(ctx context.Context, nid, eid strin
|
||||
// be bound to the local proxy, or to the host (for UDP packets), and won't be redirected to the new endpoints.
|
||||
clearConntrackEntries(d.nlh, endpoint)
|
||||
|
||||
if err = d.storeUpdate(endpoint); err != nil {
|
||||
if err = d.storeUpdate(ctx, endpoint); err != nil {
|
||||
return fmt.Errorf("failed to update bridge endpoint %.7s to store: %v", endpoint.id, err)
|
||||
}
|
||||
|
||||
@@ -1481,7 +1481,7 @@ func (d *driver) RevokeExternalConnectivity(nid, eid string) error {
|
||||
// to bad NATing.
|
||||
clearConntrackEntries(d.nlh, endpoint)
|
||||
|
||||
if err = d.storeUpdate(endpoint); err != nil {
|
||||
if err = d.storeUpdate(context.TODO(), endpoint); err != nil {
|
||||
return fmt.Errorf("failed to update bridge endpoint %.7s to store: %v", endpoint.id, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,9 @@ import (
|
||||
"github.com/docker/docker/libnetwork/datastore"
|
||||
"github.com/docker/docker/libnetwork/netlabel"
|
||||
"github.com/docker/docker/libnetwork/types"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -95,9 +98,13 @@ func (d *driver) populateEndpoints() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *driver) storeUpdate(kvObject datastore.KVObject) error {
|
||||
func (d *driver) storeUpdate(ctx context.Context, kvObject datastore.KVObject) error {
|
||||
ctx, span := otel.Tracer("").Start(ctx, "libnetwork.drivers.bridge.storeUpdate", trace.WithAttributes(
|
||||
attribute.String("kvObject", fmt.Sprintf("%+v", kvObject.Key()))))
|
||||
defer span.End()
|
||||
|
||||
if d.store == nil {
|
||||
log.G(context.TODO()).Warnf("bridge store not initialized. kv object %s is not added to the store", datastore.Key(kvObject.Key()...))
|
||||
log.G(ctx).Warnf("bridge store not initialized. kv object %s is not added to the store", datastore.Key(kvObject.Key()...))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -562,7 +562,7 @@ func (ep *Endpoint) sbJoin(ctx context.Context, sb *Sandbox, options ...Endpoint
|
||||
return errdefs.System(err)
|
||||
}
|
||||
|
||||
if err = n.getController().updateToStore(ep); err != nil {
|
||||
if err = n.getController().updateToStore(ctx, ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -646,7 +646,7 @@ func (ep *Endpoint) rename(name string) error {
|
||||
ep.mu.Unlock()
|
||||
|
||||
// Update the store with the updated name
|
||||
if err := ep.getNetwork().getController().updateToStore(ep); err != nil {
|
||||
if err := ep.getNetwork().getController().updateToStore(context.TODO(), ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -682,7 +682,7 @@ func (ep *Endpoint) UpdateDNSNames(dnsNames []string) error {
|
||||
}
|
||||
|
||||
// Update the store with the updated name
|
||||
if err := c.updateToStore(ep); err != nil {
|
||||
if err := c.updateToStore(context.TODO(), ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -777,7 +777,7 @@ func (ep *Endpoint) sbLeave(sb *Sandbox, force bool) error {
|
||||
// spurious logs when cleaning up the sandbox when the daemon
|
||||
// ungracefully exits and restarts before completing sandbox
|
||||
// detach but after store has been updated.
|
||||
if err := n.getController().updateToStore(ep); err != nil {
|
||||
if err := n.getController().updateToStore(context.TODO(), ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -860,7 +860,7 @@ func (ep *Endpoint) Delete(force bool) error {
|
||||
defer func() {
|
||||
if err != nil && !force {
|
||||
ep.dbExists = false
|
||||
if e := n.getController().updateToStore(ep); e != nil {
|
||||
if e := n.getController().updateToStore(context.WithoutCancel(context.TODO()), ep); e != nil {
|
||||
log.G(context.TODO()).Warnf("failed to recreate endpoint in store %s : %v", name, e)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package libnetwork
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -110,7 +111,7 @@ func (ec *endpointCnt) updateStore() error {
|
||||
count := ec.EndpointCnt()
|
||||
n := ec.n
|
||||
for {
|
||||
if err := c.updateToStore(ec); err == nil || err != datastore.ErrKeyModified {
|
||||
if err := c.updateToStore(context.TODO(), ec); err == nil || err != datastore.ErrKeyModified {
|
||||
return err
|
||||
}
|
||||
if err := c.store.GetObject(ec); err != nil {
|
||||
@@ -148,7 +149,7 @@ retry:
|
||||
}
|
||||
ec.Unlock()
|
||||
|
||||
if err := ec.n.getController().updateToStore(ec); err != nil {
|
||||
if err := ec.n.getController().updateToStore(context.TODO(), ec); err != nil {
|
||||
if err == datastore.ErrKeyModified {
|
||||
if err := store.GetObject(ec); err != nil {
|
||||
return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err)
|
||||
|
||||
@@ -1016,7 +1016,7 @@ func (n *Network) delete(force bool, rmLBEndpoint bool) error {
|
||||
|
||||
// Mark the network for deletion
|
||||
n.inDelete = true
|
||||
if err = c.updateToStore(n); err != nil {
|
||||
if err = c.updateToStore(context.TODO(), n); err != nil {
|
||||
return fmt.Errorf("error marking network %s (%s) for deletion: %v", n.Name(), n.ID(), err)
|
||||
}
|
||||
|
||||
@@ -1211,7 +1211,7 @@ func (n *Network) createEndpoint(ctx context.Context, name string, options ...En
|
||||
|
||||
// We should perform updateToStore call right after addEndpoint
|
||||
// in order to have iface properly configured
|
||||
if err = n.getController().updateToStore(ep); err != nil {
|
||||
if err = n.getController().updateToStore(ctx, ep); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
|
||||
@@ -638,7 +638,7 @@ func (sb *Sandbox) clearNetworkResources(origEp *Endpoint) error {
|
||||
// not bother updating the store. The sandbox object will be
|
||||
// deleted anyway
|
||||
if !inDelete {
|
||||
return sb.storeUpdate()
|
||||
return sb.storeUpdate(context.TODO())
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -375,7 +375,7 @@ func (sb *Sandbox) populateNetworkResources(ctx context.Context, ep *Endpoint) e
|
||||
// not bother updating the store. The sandbox object will be
|
||||
// deleted anyway
|
||||
if !inDelete {
|
||||
return sb.storeUpdate()
|
||||
return sb.storeUpdate(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -122,7 +122,7 @@ func (sbs *sbState) CopyTo(o datastore.KVObject) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *Sandbox) storeUpdate() error {
|
||||
func (sb *Sandbox) storeUpdate(ctx context.Context) error {
|
||||
sbs := &sbState{
|
||||
c: sb.controller,
|
||||
ID: sb.id,
|
||||
@@ -146,7 +146,7 @@ retry:
|
||||
})
|
||||
}
|
||||
|
||||
err := sb.controller.updateToStore(sbs)
|
||||
err := sb.controller.updateToStore(ctx, sbs)
|
||||
if err == datastore.ErrKeyModified {
|
||||
// When we get ErrKeyModified it is sufficient to just
|
||||
// go back and retry. No need to get the object from
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/containerd/log"
|
||||
"github.com/docker/docker/libnetwork/datastore"
|
||||
"github.com/docker/docker/libnetwork/scope"
|
||||
"go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
func (c *Controller) getNetworkFromStore(nid string) (*Network, error) {
|
||||
@@ -114,7 +115,10 @@ func (n *Network) getEndpointsFromStore() ([]*Endpoint, error) {
|
||||
return epl, nil
|
||||
}
|
||||
|
||||
func (c *Controller) updateToStore(kvObject datastore.KVObject) error {
|
||||
func (c *Controller) updateToStore(ctx context.Context, kvObject datastore.KVObject) error {
|
||||
ctx, span := otel.Tracer("").Start(ctx, "libnetwork.Controller.updateToStore")
|
||||
defer span.End()
|
||||
|
||||
if err := c.store.PutObjectAtomic(kvObject); err != nil {
|
||||
if err == datastore.ErrKeyModified {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user