mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
Merge pull request #50842 from corhere/locked-manager-action-ctx
daemon/cluster: improve use of lockedManagerAction
This commit is contained in:
@@ -432,7 +432,7 @@ func detectLockedError(err error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
|
||||
func (c *Cluster) lockedManagerAction(ctx context.Context, fn func(ctx context.Context, state nodeState) error) error {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
@@ -441,7 +441,6 @@ func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeSta
|
||||
return c.errNoManager(state)
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
ctx, cancel := context.WithTimeout(ctx, swarmRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
func (c *Cluster) GetConfig(input string) (types.Config, error) {
|
||||
var config *swarmapi.Config
|
||||
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
s, err := getConfig(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -29,26 +29,19 @@ func (c *Cluster) GetConfig(input string) (types.Config, error) {
|
||||
|
||||
// GetConfigs returns all configs of a managed swarm cluster.
|
||||
func (c *Cluster) GetConfigs(options swarmbackend.ConfigListOptions) ([]types.Config, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
state := c.currentNodeState()
|
||||
if !state.IsActiveManager() {
|
||||
return nil, c.errNoManager(state)
|
||||
}
|
||||
|
||||
filters, err := newListConfigsFilters(options.Filters)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
ctx, cancel := context.WithTimeout(ctx, swarmRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
r, err := state.controlClient.ListConfigs(ctx,
|
||||
&swarmapi.ListConfigsRequest{Filters: filters},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse))
|
||||
var r *swarmapi.ListConfigsResponse
|
||||
err = c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
var err error
|
||||
r, err = state.controlClient.ListConfigs(ctx,
|
||||
&swarmapi.ListConfigsRequest{Filters: filters},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -65,7 +58,7 @@ func (c *Cluster) GetConfigs(options swarmbackend.ConfigListOptions) ([]types.Co
|
||||
// CreateConfig creates a new config in a managed swarm cluster.
|
||||
func (c *Cluster) CreateConfig(s types.ConfigSpec) (string, error) {
|
||||
var resp *swarmapi.CreateConfigResponse
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
configSpec := convert.ConfigSpecToGRPC(s)
|
||||
|
||||
r, err := state.controlClient.CreateConfig(ctx,
|
||||
@@ -83,7 +76,7 @@ func (c *Cluster) CreateConfig(s types.ConfigSpec) (string, error) {
|
||||
|
||||
// RemoveConfig removes a config from a managed swarm cluster.
|
||||
func (c *Cluster) RemoveConfig(input string) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
config, err := getConfig(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -101,7 +94,7 @@ func (c *Cluster) RemoveConfig(input string) error {
|
||||
// UpdateConfig updates a config in a managed swarm cluster.
|
||||
// Note: this is not exposed to the CLI but is available from the API only
|
||||
func (c *Cluster) UpdateConfig(input string, version uint64, spec types.ConfigSpec) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
config, err := getConfig(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -58,19 +58,12 @@ func filterPredefinedNetworks(networks *[]network.Inspect) {
|
||||
}
|
||||
|
||||
func (c *Cluster) getNetworks(filters *swarmapi.ListNetworksRequest_Filters) ([]network.Inspect, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
state := c.currentNodeState()
|
||||
if !state.IsActiveManager() {
|
||||
return nil, c.errNoManager(state)
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
ctx, cancel := context.WithTimeout(ctx, swarmRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
r, err := state.controlClient.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: filters})
|
||||
var r *swarmapi.ListNetworksResponse
|
||||
err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
var err error
|
||||
r, err = state.controlClient.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: filters})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -88,7 +81,7 @@ func (c *Cluster) getNetworks(filters *swarmapi.ListNetworksRequest_Filters) ([]
|
||||
func (c *Cluster) GetNetwork(input string) (network.Inspect, error) {
|
||||
var nw *swarmapi.Network
|
||||
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
n, err := getNetwork(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -274,7 +267,7 @@ func (c *Cluster) CreateNetwork(s network.CreateRequest) (string, error) {
|
||||
}
|
||||
|
||||
var resp *swarmapi.CreateNetworkResponse
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
networkSpec := convert.BasicNetworkCreateToGRPC(s)
|
||||
r, err := state.controlClient.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
|
||||
if err != nil {
|
||||
@@ -291,7 +284,7 @@ func (c *Cluster) CreateNetwork(s network.CreateRequest) (string, error) {
|
||||
|
||||
// RemoveNetwork removes a cluster network.
|
||||
func (c *Cluster) RemoveNetwork(input string) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
nw, err := getNetwork(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -13,28 +13,21 @@ import (
|
||||
|
||||
// GetNodes returns a list of all nodes known to a cluster.
|
||||
func (c *Cluster) GetNodes(options swarmbackend.NodeListOptions) ([]types.Node, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
state := c.currentNodeState()
|
||||
if !state.IsActiveManager() {
|
||||
return nil, c.errNoManager(state)
|
||||
}
|
||||
|
||||
filters, err := newListNodesFilters(options.Filters)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
ctx, cancel := context.WithTimeout(ctx, swarmRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
r, err := state.controlClient.ListNodes(
|
||||
ctx,
|
||||
&swarmapi.ListNodesRequest{Filters: filters},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
||||
)
|
||||
var r *swarmapi.ListNodesResponse
|
||||
err = c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
var err error
|
||||
r, err = state.controlClient.ListNodes(
|
||||
ctx,
|
||||
&swarmapi.ListNodesRequest{Filters: filters},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
||||
)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -51,7 +44,7 @@ func (c *Cluster) GetNodes(options swarmbackend.NodeListOptions) ([]types.Node,
|
||||
func (c *Cluster) GetNode(input string) (types.Node, error) {
|
||||
var node *swarmapi.Node
|
||||
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
n, err := getNode(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -67,7 +60,7 @@ func (c *Cluster) GetNode(input string) (types.Node, error) {
|
||||
|
||||
// UpdateNode updates existing nodes properties.
|
||||
func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error {
|
||||
return c.lockedManagerAction(func(_ context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(_ context.Context, state nodeState) error {
|
||||
nodeSpec, err := convert.NodeSpecToGRPC(spec)
|
||||
if err != nil {
|
||||
return errdefs.InvalidParameter(err)
|
||||
@@ -98,7 +91,7 @@ func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec)
|
||||
|
||||
// RemoveNode removes a node from a cluster
|
||||
func (c *Cluster) RemoveNode(input string, force bool) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
node, err := getNode(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
func (c *Cluster) GetSecret(input string) (types.Secret, error) {
|
||||
var secret *swarmapi.Secret
|
||||
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
s, err := getSecret(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -29,27 +29,20 @@ func (c *Cluster) GetSecret(input string) (types.Secret, error) {
|
||||
|
||||
// GetSecrets returns all secrets of a managed swarm cluster.
|
||||
func (c *Cluster) GetSecrets(options swarmbackend.SecretListOptions) ([]types.Secret, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
state := c.currentNodeState()
|
||||
if !state.IsActiveManager() {
|
||||
return nil, c.errNoManager(state)
|
||||
}
|
||||
|
||||
filters, err := newListSecretsFilters(options.Filters)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
ctx, cancel := context.WithTimeout(ctx, swarmRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
r, err := state.controlClient.ListSecrets(ctx,
|
||||
&swarmapi.ListSecretsRequest{Filters: filters},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
||||
)
|
||||
var r *swarmapi.ListSecretsResponse
|
||||
err = c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
var err error
|
||||
r, err = state.controlClient.ListSecrets(ctx,
|
||||
&swarmapi.ListSecretsRequest{Filters: filters},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
||||
)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -66,7 +59,7 @@ func (c *Cluster) GetSecrets(options swarmbackend.SecretListOptions) ([]types.Se
|
||||
// CreateSecret creates a new secret in a managed swarm cluster.
|
||||
func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
|
||||
var resp *swarmapi.CreateSecretResponse
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
secretSpec := convert.SecretSpecToGRPC(s)
|
||||
|
||||
r, err := state.controlClient.CreateSecret(ctx,
|
||||
@@ -84,7 +77,7 @@ func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
|
||||
|
||||
// RemoveSecret removes a secret from a managed swarm cluster.
|
||||
func (c *Cluster) RemoveSecret(input string) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
secret, err := getSecret(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -102,7 +95,7 @@ func (c *Cluster) RemoveSecret(input string) error {
|
||||
// UpdateSecret updates a secret in a managed swarm cluster.
|
||||
// Note: this is not exposed to the CLI but is available from the API only
|
||||
func (c *Cluster) UpdateSecret(input string, version uint64, spec types.SecretSpec) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
secret, err := getSecret(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -28,14 +28,6 @@ import (
|
||||
|
||||
// GetServices returns all services of a managed swarm cluster.
|
||||
func (c *Cluster) GetServices(options swarmbackend.ServiceListOptions) ([]swarm.Service, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
state := c.currentNodeState()
|
||||
if !state.IsActiveManager() {
|
||||
return nil, c.errNoManager(state)
|
||||
}
|
||||
|
||||
// We move the accepted filter check here as "mode" filter
|
||||
// is processed in the daemon, not in SwarmKit. So it might
|
||||
// be good to have accepted file check in the same file as
|
||||
@@ -63,97 +55,101 @@ func (c *Cluster) GetServices(options swarmbackend.ServiceListOptions) ([]swarm.
|
||||
Runtimes: options.Filters.Get("runtime"),
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
ctx, cancel := context.WithTimeout(ctx, swarmRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
r, err := state.controlClient.ListServices(
|
||||
ctx,
|
||||
&swarmapi.ListServicesRequest{Filters: filters},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
services := make([]swarm.Service, 0, len(r.Services))
|
||||
|
||||
// if the user requests the service statuses, we'll store the IDs needed
|
||||
// in this slice
|
||||
var serviceIDs []string
|
||||
if options.Status {
|
||||
serviceIDs = make([]string, 0, len(r.Services))
|
||||
}
|
||||
for _, service := range r.Services {
|
||||
if options.Filters.Contains("mode") {
|
||||
var mode string
|
||||
switch service.Spec.GetMode().(type) {
|
||||
case *swarmapi.ServiceSpec_Global:
|
||||
mode = "global"
|
||||
case *swarmapi.ServiceSpec_Replicated:
|
||||
mode = "replicated"
|
||||
case *swarmapi.ServiceSpec_ReplicatedJob:
|
||||
mode = "replicated-job"
|
||||
case *swarmapi.ServiceSpec_GlobalJob:
|
||||
mode = "global-job"
|
||||
}
|
||||
|
||||
if !options.Filters.ExactMatch("mode", mode) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if options.Status {
|
||||
serviceIDs = append(serviceIDs, service.ID)
|
||||
}
|
||||
svcs, err := convert.ServiceFromGRPC(*service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
services = append(services, svcs)
|
||||
}
|
||||
|
||||
if options.Status {
|
||||
// Listing service statuses is a separate call because, while it is the
|
||||
// most common UI operation, it is still just a UI operation, and it
|
||||
// would be improper to include this data in swarm's Service object.
|
||||
// We pay the cost with some complexity here, but this is still way
|
||||
// more efficient than marshalling and unmarshalling all the JSON
|
||||
// needed to list tasks and get this data otherwise client-side
|
||||
resp, err := state.controlClient.ListServiceStatuses(
|
||||
var services []swarm.Service
|
||||
err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
var err error
|
||||
r, err := state.controlClient.ListServices(
|
||||
ctx,
|
||||
&swarmapi.ListServiceStatusesRequest{Services: serviceIDs},
|
||||
&swarmapi.ListServicesRequest{Filters: filters},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
// we'll need to match up statuses in the response with the services in
|
||||
// the list operation. if we did this by operating on two lists, the
|
||||
// result would be quadratic. instead, make a mapping of service IDs to
|
||||
// service statuses so that this is roughly linear. additionally,
|
||||
// convert the status response to an engine api service status here.
|
||||
serviceMap := map[string]*swarm.ServiceStatus{}
|
||||
for _, status := range resp.Statuses {
|
||||
serviceMap[status.ServiceID] = &swarm.ServiceStatus{
|
||||
RunningTasks: status.RunningTasks,
|
||||
DesiredTasks: status.DesiredTasks,
|
||||
CompletedTasks: status.CompletedTasks,
|
||||
services = make([]swarm.Service, 0, len(r.Services))
|
||||
|
||||
// if the user requests the service statuses, we'll store the IDs needed
|
||||
// in this slice
|
||||
var serviceIDs []string
|
||||
if options.Status {
|
||||
serviceIDs = make([]string, 0, len(r.Services))
|
||||
}
|
||||
for _, service := range r.Services {
|
||||
if options.Filters.Contains("mode") {
|
||||
var mode string
|
||||
switch service.Spec.GetMode().(type) {
|
||||
case *swarmapi.ServiceSpec_Global:
|
||||
mode = "global"
|
||||
case *swarmapi.ServiceSpec_Replicated:
|
||||
mode = "replicated"
|
||||
case *swarmapi.ServiceSpec_ReplicatedJob:
|
||||
mode = "replicated-job"
|
||||
case *swarmapi.ServiceSpec_GlobalJob:
|
||||
mode = "global-job"
|
||||
}
|
||||
|
||||
if !options.Filters.ExactMatch("mode", mode) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if options.Status {
|
||||
serviceIDs = append(serviceIDs, service.ID)
|
||||
}
|
||||
svcs, err := convert.ServiceFromGRPC(*service)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
services = append(services, svcs)
|
||||
}
|
||||
|
||||
if options.Status {
|
||||
// Listing service statuses is a separate call because, while it is the
|
||||
// most common UI operation, it is still just a UI operation, and it
|
||||
// would be improper to include this data in swarm's Service object.
|
||||
// We pay the cost with some complexity here, but this is still way
|
||||
// more efficient than marshalling and unmarshalling all the JSON
|
||||
// needed to list tasks and get this data otherwise client-side
|
||||
resp, err := state.controlClient.ListServiceStatuses(
|
||||
ctx,
|
||||
&swarmapi.ListServiceStatusesRequest{Services: serviceIDs},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// we'll need to match up statuses in the response with the services in
|
||||
// the list operation. if we did this by operating on two lists, the
|
||||
// result would be quadratic. instead, make a mapping of service IDs to
|
||||
// service statuses so that this is roughly linear. additionally,
|
||||
// convert the status response to an engine api service status here.
|
||||
serviceMap := map[string]*swarm.ServiceStatus{}
|
||||
for _, status := range resp.Statuses {
|
||||
serviceMap[status.ServiceID] = &swarm.ServiceStatus{
|
||||
RunningTasks: status.RunningTasks,
|
||||
DesiredTasks: status.DesiredTasks,
|
||||
CompletedTasks: status.CompletedTasks,
|
||||
}
|
||||
}
|
||||
|
||||
// because this is a list of values and not pointers, make sure we
|
||||
// actually alter the value when iterating.
|
||||
for i, service := range services {
|
||||
// the return value of the ListServiceStatuses operation is
|
||||
// guaranteed to contain a value in the response for every argument
|
||||
// in the request, so we can safely do this assignment. and even if
|
||||
// it wasn't, and the service ID was for some reason absent from
|
||||
// this map, the resulting value of service.Status would just be
|
||||
// nil -- the same thing it was before
|
||||
service.ServiceStatus = serviceMap[service.ID]
|
||||
services[i] = service
|
||||
}
|
||||
}
|
||||
|
||||
// because this is a list of values and not pointers, make sure we
|
||||
// actually alter the value when iterating.
|
||||
for i, service := range services {
|
||||
// the return value of the ListServiceStatuses operation is
|
||||
// guaranteed to contain a value in the response for every argument
|
||||
// in the request, so we can safely do this assignment. and even if
|
||||
// it wasn't, and the service ID was for some reason absent from
|
||||
// this map, the resulting value of service.Status would just be
|
||||
// nil -- the same thing it was before
|
||||
service.ServiceStatus = serviceMap[service.ID]
|
||||
services[i] = service
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return services, nil
|
||||
@@ -162,7 +158,7 @@ func (c *Cluster) GetServices(options swarmbackend.ServiceListOptions) ([]swarm.
|
||||
// GetService returns a service based on an ID or name.
|
||||
func (c *Cluster) GetService(input string, insertDefaults bool) (swarm.Service, error) {
|
||||
var service *swarmapi.Service
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
s, err := getService(ctx, state.controlClient, input, insertDefaults)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -182,7 +178,7 @@ func (c *Cluster) GetService(input string, insertDefaults bool) (swarm.Service,
|
||||
// CreateService creates a new service in a managed swarm cluster.
|
||||
func (c *Cluster) CreateService(s swarm.ServiceSpec, encodedAuth string, queryRegistry bool) (*swarm.ServiceCreateResponse, error) {
|
||||
var resp *swarm.ServiceCreateResponse
|
||||
err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
err := c.populateNetworkID(ctx, state.controlClient, &s)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -284,7 +280,7 @@ func (c *Cluster) CreateService(s swarm.ServiceSpec, encodedAuth string, queryRe
|
||||
func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec swarm.ServiceSpec, flags swarmbackend.ServiceUpdateOptions, queryRegistry bool) (*swarm.ServiceUpdateResponse, error) {
|
||||
var resp *swarm.ServiceUpdateResponse
|
||||
|
||||
err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
err := c.populateNetworkID(ctx, state.controlClient, &spec)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -414,7 +410,7 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec swa
|
||||
|
||||
// RemoveService removes a service from a managed swarm cluster.
|
||||
func (c *Cluster) RemoveService(input string) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
service, err := getService(ctx, state.controlClient, input, false)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -427,28 +423,6 @@ func (c *Cluster) RemoveService(input string) error {
|
||||
|
||||
// ServiceLogs collects service logs and writes them back to `config.OutStream`
|
||||
func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *container.LogsOptions) (<-chan *backend.LogMessage, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
state := c.currentNodeState()
|
||||
if !state.IsActiveManager() {
|
||||
return nil, c.errNoManager(state)
|
||||
}
|
||||
|
||||
swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error making log selector")
|
||||
}
|
||||
|
||||
// set the streams we'll use
|
||||
stdStreams := []swarmapi.LogStream{}
|
||||
if config.ShowStdout {
|
||||
stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
|
||||
}
|
||||
if config.ShowStderr {
|
||||
stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
|
||||
}
|
||||
|
||||
// Get tail value squared away - the number of previous log lines we look at
|
||||
var tail int64
|
||||
// in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
|
||||
@@ -476,6 +450,15 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector
|
||||
tail = int64(-(t + 1))
|
||||
}
|
||||
|
||||
// set the streams we'll use
|
||||
stdStreams := []swarmapi.LogStream{}
|
||||
if config.ShowStdout {
|
||||
stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
|
||||
}
|
||||
if config.ShowStderr {
|
||||
stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
|
||||
}
|
||||
|
||||
// get the since value - the time in the past we're looking at logs starting from
|
||||
var sinceProto *gogotypes.Timestamp
|
||||
if config.Since != "" {
|
||||
@@ -490,14 +473,24 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector
|
||||
}
|
||||
}
|
||||
|
||||
stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
|
||||
Selector: swarmSelector,
|
||||
Options: &swarmapi.LogSubscriptionOptions{
|
||||
Follow: config.Follow,
|
||||
Streams: stdStreams,
|
||||
Tail: tail,
|
||||
Since: sinceProto,
|
||||
},
|
||||
var stream swarmapi.Logs_SubscribeLogsClient
|
||||
// Ignore the context passed to the closure as it has a deadline.
|
||||
err := c.lockedManagerAction(ctx, func(_ context.Context, state nodeState) error {
|
||||
swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error making log selector")
|
||||
}
|
||||
|
||||
stream, err = state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
|
||||
Selector: swarmSelector,
|
||||
Options: &swarmapi.LogSubscriptionOptions{
|
||||
Follow: config.Follow,
|
||||
Streams: stdStreams,
|
||||
Tail: tail,
|
||||
Since: sinceProto,
|
||||
},
|
||||
})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -219,7 +219,7 @@ func (c *Cluster) Join(req types.JoinRequest) error {
|
||||
// Inspect retrieves the configuration properties of a managed swarm cluster.
|
||||
func (c *Cluster) Inspect() (types.Swarm, error) {
|
||||
var swarm types.Swarm
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
s, err := c.inspect(ctx, state)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -242,7 +242,7 @@ func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, er
|
||||
|
||||
// Update updates configuration of a managed swarm cluster.
|
||||
func (c *Cluster) Update(version uint64, spec types.Spec, flags swarmbackend.UpdateFlags) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
swarm, err := getSwarm(ctx, state.controlClient)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -285,7 +285,7 @@ func (c *Cluster) Update(version uint64, spec types.Spec, flags swarmbackend.Upd
|
||||
// GetUnlockKey returns the unlock key for the swarm.
|
||||
func (c *Cluster) GetUnlockKey() (string, error) {
|
||||
var resp *swarmapi.GetUnlockKeyResponse
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
client := swarmapi.NewCAClient(state.grpcConn)
|
||||
|
||||
r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
func (c *Cluster) GetTasks(options swarmbackend.TaskListOptions) ([]types.Task, error) {
|
||||
var r *swarmapi.ListTasksResponse
|
||||
|
||||
err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
filterTransform := func(filter filters.Args) error {
|
||||
if filter.Contains("service") {
|
||||
serviceFilters := filter.Get("service")
|
||||
@@ -77,7 +77,7 @@ func (c *Cluster) GetTasks(options swarmbackend.TaskListOptions) ([]types.Task,
|
||||
// GetTask returns a task by an ID.
|
||||
func (c *Cluster) GetTask(input string) (types.Task, error) {
|
||||
var task *swarmapi.Task
|
||||
err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
t, err := getTask(ctx, state.controlClient, input)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
func (c *Cluster) GetVolume(nameOrID string) (volumetypes.Volume, error) {
|
||||
var volume *swarmapi.Volume
|
||||
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
v, err := getVolume(ctx, state.controlClient, nameOrID)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -33,7 +33,7 @@ func (c *Cluster) GetVolume(nameOrID string) (volumetypes.Volume, error) {
|
||||
// GetVolumes returns all of the volumes matching the given options from a swarm cluster.
|
||||
func (c *Cluster) GetVolumes(options volumebackend.ListOptions) ([]*volumetypes.Volume, error) {
|
||||
var volumes []*volumetypes.Volume
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
r, err := state.controlClient.ListVolumes(
|
||||
ctx, &swarmapi.ListVolumesRequest{},
|
||||
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
||||
@@ -61,7 +61,7 @@ func (c *Cluster) GetVolumes(options volumebackend.ListOptions) ([]*volumetypes.
|
||||
// Returns the volume ID if creation is successful, or an error if not.
|
||||
func (c *Cluster) CreateVolume(v volumetypes.CreateOptions) (*volumetypes.Volume, error) {
|
||||
var resp *swarmapi.CreateVolumeResponse
|
||||
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
if err := c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
volumeSpec := convert.VolumeCreateToGRPC(&v)
|
||||
|
||||
r, err := state.controlClient.CreateVolume(
|
||||
@@ -89,7 +89,7 @@ func (c *Cluster) CreateVolume(v volumetypes.CreateOptions) (*volumetypes.Volume
|
||||
|
||||
// RemoveVolume removes a volume from the swarm cluster.
|
||||
func (c *Cluster) RemoveVolume(nameOrID string, force bool) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
volume, err := getVolume(ctx, state.controlClient, nameOrID)
|
||||
if err != nil {
|
||||
if force && cerrdefs.IsNotFound(err) {
|
||||
@@ -108,7 +108,7 @@ func (c *Cluster) RemoveVolume(nameOrID string, force bool) error {
|
||||
|
||||
// UpdateVolume updates a volume in the swarm cluster.
|
||||
func (c *Cluster) UpdateVolume(nameOrID string, version uint64, volume volumetypes.UpdateOptions) error {
|
||||
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
||||
return c.lockedManagerAction(context.TODO(), func(ctx context.Context, state nodeState) error {
|
||||
v, err := getVolume(ctx, state.controlClient, nameOrID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user