Compare commits

...

21 Commits

Author SHA1 Message Date
Flavio Crisciani
b6d4a90f86 [17.07] vndr libnetwork to latest bump_17.07
Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
2020-03-23 09:48:45 +00:00
Andrew Hsu
6cfa171a10 bump version to 17.07.0-ce-rc2
Signed-off-by: Andrew Hsu <andrewhsu@docker.com>
2020-03-23 09:48:45 +00:00
Alessandro Boch
351409f5c0 Fix api server null pointer def on inspect/ls null ipam-driver networks
- When a network is created with the null ipam driver, docker api server
  thread will deference a nil pointer on `docker network ls` and on
  `docker network inspect <nw>`. This because buildIpamResource()
  assumes a gateway address is always present, which is not correct.

Signed-off-by: Alessandro Boch <aboch@tetrationanalytics.com>
(cherry picked from commit beebfc0cf6)
Signed-off-by: Victor Vieux <victorvieux@gmail.com>
2020-03-23 09:48:45 +00:00
Brian Goff
b4f8c127a4 Make plugins dir private.
This prevents mounts in the plugins dir from leaking into other
namespaces which can prevent removal (`device or resource busy`),
particularly on older kernels.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit 0c2821d6f2)
Signed-off-by: Victor Vieux <victorvieux@gmail.com>
2020-03-23 09:48:44 +00:00
Andrew Hsu
d349008e19 vndr libnetwork to latest bump_17.07
Signed-off-by: Andrew Hsu <andrewhsu@docker.com>
2020-03-23 09:48:44 +00:00
Brian Goff
99262c939f Fix error handling with not-exist errors on remove
Specifically, none of the graphdrivers are supposed to return a
not-exist type of error on remove (or at least that's how they are
currently handled).

Found that AUFS still had one case where a not-exist error could escape,
when checking if the directory is mounted we call a `Statfs` on the
path.

This fixes AUFS to not return an error in this case, but also
double-checks at the daemon level on layer remove that the error is not
a `not-exist` type of error.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit d42dbdd3d4)
Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2020-03-23 09:48:44 +00:00
Justin Menga
4769aac754 Fix awslogs driver repeating last event - #34292
Signed-off-by: Justin Menga <justin.menga@gmail.com>
(cherry picked from commit 0fd5a0bab7)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2020-03-23 09:48:44 +00:00
Stefan Wernli
a66498b5d2 Fixing releaseableLayer handling of layer streams and mounts.
releaseableLayer includes automatic handling for creating a read/write layer and mounting it on a call to Mount(), but then does not correspondingly unmount the layer before trying to delete it, which will fail for some graphdrivers. Commit on a releaseable layer also leaks the tarstream for the layer. To fix this, the stream close is deferred in Commit and releaseRWLayer now correctly handles unmounting the layer before trying to delete it.  In addition, the changes include better error handling in Release() to make sure that errors are returned to the caller for failures on read/write layers instead of being ignored.# Please enter the commit message for your changes. Lines starting

Signed-off-by: Stefan Wernli <swernli@ntdev.microsoft.com>
(cherry picked from commit 1d457999c4)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2020-03-23 09:48:44 +00:00
Sebastiaan van Stijn
1dcdee3b5e Fix RestartPolicy default value
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit fc48b5529d)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2020-03-23 09:48:44 +00:00
Sebastiaan van Stijn
8c66a1526b Add API documentation for plugable secret backends
Documents the API changes introduced in

0304c98d85 and
08f7cf0526

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit c8dad44c32)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2020-03-23 09:48:44 +00:00
Sebastiaan van Stijn
c0d95e7ffe Update API plugin response examples
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit 4735c76632)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2020-03-23 09:48:44 +00:00
Aaron Lehmann
1f18524c98 cluster: Avoid recursive RLock
GetTasks can call GetService and GetNode with the read lock held. These
methods try to aquire the read side of the same lock. According to the
sync package documentation, this is not safe:

> If a goroutine holds a RWMutex for reading, it must not expect this or
> any other goroutine to be able to also take the read lock until the
> first read lock is released. In particular, this prohibits recursive
> read locking. This is to ensure that the lock eventually becomes
> available; a blocked Lock call excludes new readers from acquiring the
> lock.

Fix GetTasks to use the lower-level getService and getNode methods
instead. Also, use lockedManagerAction to simplify GetTasks.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
(cherry picked from commit bd4f66c8f1)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2020-03-23 09:48:44 +00:00
Aaron Lehmann
4644653c72 container: Fix Delete on nonexistent container
Delete needs to release names related to a container even if that
container isn't present in the db. However, slightly overzealous error
checking causes the transaction to get rolled back. Ignore the error
from Delete on the container itself, since it may not be present.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
(cherry picked from commit 1d9546fc62)
Signed-off-by: Victor Vieux <victorvieux@gmail.com>
2020-03-23 09:48:44 +00:00
Abhinandan Prativadi
f938a1e204 Changing the get network request to return swarm scope predefined networks
Starting 17.06 swarm service create supports service creates with predefined
networks like host and bridge. Due to the nature of the feature, swarm manager
has a swarm scope predefined networks in addition to local scoped
predefined networks on all nodes. However network inspects for swarm scoped
predefined networks was not possible. The fix adds support for network inspect
for swarm scoped predefined networks.

Signed-off-by: Abhinandan Prativadi <abhi@docker.com>
(cherry picked from commit 5bfefb2d36)
Signed-off-by: Victor Vieux <victorvieux@gmail.com>
2020-03-23 09:48:44 +00:00
Abhinandan Prativadi
a148de2e2f Fixing issue with driver opt not passed to drivers
Signed-off-by: Abhinandan Prativadi <abhi@docker.com>
(cherry picked from commit bcb55c6202)
Signed-off-by: Victor Vieux <victorvieux@gmail.com>
2020-03-23 09:48:44 +00:00
Andrew Hsu
01019b4690 bump version to 17.07.0-ce-rc1
Signed-off-by: Andrew Hsu <andrewhsu@docker.com>
2020-03-23 09:48:44 +00:00
Ying
b908d6ab3b Re-vendors swarmkit to include the following fix:
- https://github.com/docker/swarmkit/pull/2323 (fix for watch server being run only on leader)

Signed-off-by: Ying <ying.li@docker.com>
2020-03-23 09:48:43 +00:00
Ying
d9d7cda5fa Re-vendors swarmkit to include the following fixes:
- https://github.com/docker/swarmkit/pull/2288 (Allow updates of failed services with restart policy "none")
- https://github.com/docker/swarmkit/pull/2304 (Reset restart history when task spec changes)
- https://github.com/docker/swarmkit/pull/2309 (updating the service spec version when rolling back)
- https://github.com/docker/swarmkit/pull/2310 (fix for slow swarm shutdown)

Signed-off-by: Ying <ying.li@docker.com>
2020-03-23 09:48:43 +00:00
Tibor Vass
484dc6bfa7 [engine] Graceful upgrade of containerd and runc state files upon live-restore
Vendors new dependency github.com/crosbymichael/upgrade

Signed-off-by: Tibor Vass <tibor@docker.com>
(cherry picked from commit 358c36e93093961a512face64b6b7701b04e55a2)
Signed-off-by: Victor Vieux <victorvieux@gmail.com>
2020-03-23 09:48:43 +00:00
Eli Uriegas
0f91b0f2fb Add go-autogen to integration tests
Integration test were failing in trial runs for docker-ce 17.07 due to
the lack of go-autogen being sourced in `hack/make.sh`. This re-adds
go-autogen to be sourced for test-integration-cli so that we can
actually run tests without the error found in:
https://github.com/moby/moby/pull/33857

Signed-off-by: Eli Uriegas <eli.uriegas@docker.com>
(cherry picked from commit 3cdd471cac)
Signed-off-by: Eli Uriegas <eli.uriegas@docker.com>
2020-03-23 09:48:43 +00:00
Eli Uriegas
4bc0d61036 Increment engine version to 17.07
Release jobs will fail if the version files are out of sync due to how
`hack/make.sh` builds binaries (putting them into the version folder
according to `components/engine/VERSION` instead of the base `VERSION`
file)

Signed-off-by: Eli Uriegas <eli.uriegas@docker.com>
2020-03-23 09:48:43 +00:00
68 changed files with 1752 additions and 536 deletions

View File

@@ -1 +1 @@
17.06.0-dev
17.07.0-ce-rc2

View File

@@ -16,6 +16,7 @@ import (
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/versions"
"github.com/docker/libnetwork"
netconst "github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/networkdb"
)
@@ -135,6 +136,17 @@ func (n *networkRouter) getNetwork(ctx context.Context, w http.ResponseWriter, r
}
}
nwk, err := n.cluster.GetNetwork(term)
if err == nil {
// If the get network is passed with a specific network ID / partial network ID
// or if the get network was passed with a network name and scope as swarm
// return the network. Skipped using isMatchingScope because it is true if the scope
// is not set which would be case if the client API v1.30
if strings.HasPrefix(nwk.ID, term) || (netconst.SwarmScope == scope) {
return httputils.WriteJSON(w, http.StatusOK, nwk)
}
}
nr, _ := n.cluster.GetNetworks()
for _, network := range nr {
if network.ID == term && isMatchingScope(network.Scope, scope) {
@@ -397,7 +409,9 @@ func buildIpamResources(r *types.NetworkResource, nwInfo libnetwork.NetworkInfo)
for _, ip4Info := range ipv4Info {
iData := network.IPAMConfig{}
iData.Subnet = ip4Info.IPAMData.Pool.String()
iData.Gateway = ip4Info.IPAMData.Gateway.IP.String()
if ip4Info.IPAMData.Gateway != nil {
iData.Gateway = ip4Info.IPAMData.Gateway.IP.String()
}
r.IPAM.Config = append(r.IPAM.Config, iData)
}
}

View File

@@ -322,7 +322,6 @@ definitions:
MaximumRetryCount:
type: "integer"
description: "If `on-failure` is used, the number of times to retry before giving up"
default: {}
Resources:
description: "A container's resources (cgroups config, ulimits, etc)"
@@ -1349,26 +1348,33 @@ definitions:
Name:
type: "string"
x-nullable: false
example: "some-mount"
Description:
type: "string"
x-nullable: false
example: "This is a mount that's used by the plugin."
Settable:
type: "array"
items:
type: "string"
Source:
type: "string"
example: "/var/lib/docker/plugins/"
Destination:
type: "string"
x-nullable: false
example: "/mnt/state"
Type:
type: "string"
x-nullable: false
example: "bind"
Options:
type: "array"
items:
type: "string"
example:
- "rbind"
- "rw"
PluginDevice:
type: "object"
required: [Name, Description, Settable, Path]
@@ -1386,6 +1392,7 @@ definitions:
type: "string"
Path:
type: "string"
example: "/dev/fuse"
PluginEnv:
type: "object"
@@ -1427,13 +1434,16 @@ definitions:
properties:
Id:
type: "string"
example: "5724e2c8652da337ab2eedd19fc6fc0ec908e4bd907c7421bf6a8dfc70c4c078"
Name:
type: "string"
x-nullable: false
example: "tiborvass/sample-volume-plugin"
Enabled:
description: "True when the plugin is running. False when the plugin is not running, only installed."
description: "True if the plugin is running. False if the plugin is not running, only installed."
type: "boolean"
x-nullable: false
example: true
Settings:
description: "Settings that can be modified by users."
type: "object"
@@ -1448,6 +1458,8 @@ definitions:
type: "array"
items:
type: "string"
example:
- "DEBUG=0"
Args:
type: "array"
items:
@@ -1460,6 +1472,7 @@ definitions:
description: "plugin remote reference used to push/pull the plugin"
type: "string"
x-nullable: false
example: "localhost:5000/tiborvass/sample-volume-plugin:latest"
Config:
description: "The config of a plugin."
type: "object"
@@ -1483,12 +1496,15 @@ definitions:
description: "Docker Version used to create the plugin"
type: "string"
x-nullable: false
example: "17.06.0-ce"
Description:
type: "string"
x-nullable: false
example: "A sample volume plugin for Docker"
Documentation:
type: "string"
x-nullable: false
example: "https://docs.docker.com/engine/extend/plugins/"
Interface:
description: "The interface between Docker and the plugin"
x-nullable: false
@@ -1499,16 +1515,23 @@ definitions:
type: "array"
items:
$ref: "#/definitions/PluginInterfaceType"
example:
- "docker.volumedriver/1.0"
Socket:
type: "string"
x-nullable: false
example: "plugins.sock"
Entrypoint:
type: "array"
items:
type: "string"
example:
- "/usr/bin/sample-volume-plugin"
- "/data"
WorkDir:
type: "string"
x-nullable: false
example: "/bin/"
User:
type: "object"
x-nullable: false
@@ -1516,9 +1539,11 @@ definitions:
UID:
type: "integer"
format: "uint32"
example: 1000
GID:
type: "integer"
format: "uint32"
example: 1000
Network:
type: "object"
x-nullable: false
@@ -1527,6 +1552,7 @@ definitions:
Type:
x-nullable: false
type: "string"
example: "host"
Linux:
type: "object"
x-nullable: false
@@ -1536,9 +1562,13 @@ definitions:
type: "array"
items:
type: "string"
example:
- "CAP_SYS_ADMIN"
- "CAP_SYSLOG"
AllowAllDevices:
type: "boolean"
x-nullable: false
example: false
Devices:
type: "array"
items:
@@ -1546,12 +1576,15 @@ definitions:
PropagatedMount:
type: "string"
x-nullable: false
example: "/mnt/volumes"
IpcHost:
type: "boolean"
x-nullable: false
example: false
PidHost:
type: "boolean"
x-nullable: false
example: false
Mounts:
type: "array"
items:
@@ -1560,6 +1593,11 @@ definitions:
type: "array"
items:
$ref: "#/definitions/PluginEnv"
example:
- Name: "DEBUG"
Description: "If set, prints debug messages"
Settable: null
Value: "0"
Args:
type: "object"
x-nullable: false
@@ -1568,9 +1606,11 @@ definitions:
Name:
x-nullable: false
type: "string"
example: "args"
Description:
x-nullable: false
type: "string"
example: "command line arguments"
Settable:
type: "array"
items:
@@ -1584,50 +1624,14 @@ definitions:
properties:
type:
type: "string"
example: "layers"
diff_ids:
type: "array"
items:
type: "string"
example:
Id: "5724e2c8652da337ab2eedd19fc6fc0ec908e4bd907c7421bf6a8dfc70c4c078"
Name: "tiborvass/sample-volume-plugin"
Tag: "latest"
Active: true
Settings:
Env:
- "DEBUG=0"
Args: null
Devices: null
Config:
Description: "A sample volume plugin for Docker"
Documentation: "https://docs.docker.com/engine/extend/plugins/"
Interface:
Types:
- "docker.volumedriver/1.0"
Socket: "plugins.sock"
Entrypoint:
- "/usr/bin/sample-volume-plugin"
- "/data"
WorkDir: ""
User: {}
Network:
Type: ""
Linux:
Capabilities: null
AllowAllDevices: false
Devices: null
Mounts: null
PropagatedMount: "/data"
Env:
- Name: "DEBUG"
Description: "If set, prints debug messages"
Settable: null
Value: "0"
Args:
Name: "args"
Description: "command line arguments"
Settable: null
Value: []
example:
- "sha256:675532206fbf3030b8458f88d6e26d4eb1577688a25efec97154c94e8b6b4887"
- "sha256:e216a057b1cb1efc11f8a268f37ef62083e70b1b38323ba252e25ac88904a7e8"
ObjectVersion:
description: |
@@ -2780,6 +2784,27 @@ definitions:
type: "array"
items:
$ref: "#/definitions/Mount"
Driver:
description: "Driver represents a driver (network, logging, secrets)."
type: "object"
required: [Name]
properties:
Name:
description: "Name of the driver."
type: "string"
x-nullable: false
example: "some-driver"
Options:
description: "Key/value map of driver-specific options."
type: "object"
x-nullable: false
additionalProperties:
type: "string"
example:
OptionA: "value for driver-specific option A"
OptionB: "value for driver-specific option B"
SecretSpec:
type: "object"
properties:
@@ -2791,24 +2816,38 @@ definitions:
type: "object"
additionalProperties:
type: "string"
example:
com.example.some-label: "some-value"
com.example.some-other-label: "some-other-value"
Data:
description: "Base64-url-safe-encoded secret data"
type: "array"
items:
type: "string"
description: |
Base64-url-safe-encoded ([RFC 4648](https://tools.ietf.org/html/rfc4648#section-3.2))
data to store as secret.
This field is only used to _create_ a secret, and is not returned by
other endpoints.
type: "string"
example: ""
Driver:
description: "Name of the secrets driver used to fetch the secret's value from an external secret store"
$ref: "#/definitions/Driver"
Secret:
type: "object"
properties:
ID:
type: "string"
example: "blt1owaxmitz71s9v5zh81zun"
Version:
$ref: "#/definitions/ObjectVersion"
CreatedAt:
type: "string"
format: "dateTime"
example: "2017-07-20T13:55:28.678958722Z"
UpdatedAt:
type: "string"
format: "dateTime"
example: "2017-07-20T13:55:28.678958722Z"
Spec:
$ref: "#/definitions/SecretSpec"
ConfigSpec:
@@ -2823,10 +2862,10 @@ definitions:
additionalProperties:
type: "string"
Data:
description: "Base64-url-safe-encoded config data"
type: "array"
items:
type: "string"
description: |
Base64-url-safe-encoded ([RFC 4648](https://tools.ietf.org/html/rfc4648#section-3.2))
config data.
type: "string"
Config:
type: "object"
properties:
@@ -6887,46 +6926,6 @@ paths:
type: "array"
items:
$ref: "#/definitions/Plugin"
example:
- Id: "5724e2c8652da337ab2eedd19fc6fc0ec908e4bd907c7421bf6a8dfc70c4c078"
Name: "tiborvass/sample-volume-plugin"
Tag: "latest"
Active: true
Settings:
Env:
- "DEBUG=0"
Args: null
Devices: null
Config:
Description: "A sample volume plugin for Docker"
Documentation: "https://docs.docker.com/engine/extend/plugins/"
Interface:
Types:
- "docker.volumedriver/1.0"
Socket: "plugins.sock"
Entrypoint:
- "/usr/bin/sample-volume-plugin"
- "/data"
WorkDir: ""
User: {}
Network:
Type: ""
Linux:
Capabilities: null
AllowAllDevices: false
Devices: null
Mounts: null
PropagatedMount: "/data"
Env:
- Name: "DEBUG"
Description: "If set, prints debug messages"
Settable: null
Value: "0"
Args:
Name: "args"
Description: "command line arguments"
Settable: null
Value: []
500:
description: "Server error"
schema:
@@ -8416,6 +8415,20 @@ paths:
items:
$ref: "#/definitions/Secret"
example:
- ID: "blt1owaxmitz71s9v5zh81zun"
Version:
Index: 85
CreatedAt: "2017-07-20T13:55:28.678958722Z"
UpdatedAt: "2017-07-20T13:55:28.678958722Z"
Spec:
Name: "mysql-passwd"
Labels:
some.label: "some.value"
Driver:
Name: "secret-bucket"
Options:
OptionA: "value for driver option A"
OptionB: "value for driver option B"
- ID: "ktnbjxoalbkvbvedmg1urrz8h"
Version:
Index: 11
@@ -8423,6 +8436,8 @@ paths:
UpdatedAt: "2016-11-05T01:20:17.327670065Z"
Spec:
Name: "app-dev.crt"
Labels:
foo: "bar"
500:
description: "server error"
schema:
@@ -8486,6 +8501,11 @@ paths:
Labels:
foo: "bar"
Data: "VEhJUyBJUyBOT1QgQSBSRUFMIENFUlRJRklDQVRFCg=="
Driver:
Name: "secret-bucket"
Options:
OptionA: "value for driver option A"
OptionB: "value for driver option B"
tags: ["Secret"]
/secrets/{id}:
get:
@@ -8507,6 +8527,14 @@ paths:
UpdatedAt: "2016-11-05T01:20:17.327670065Z"
Spec:
Name: "app-dev.crt"
Labels:
foo: "bar"
Driver:
Name: "secret-bucket"
Options:
OptionA: "value for driver option A"
OptionB: "value for driver option B"
404:
description: "secret not found"
schema:

View File

@@ -11,7 +11,7 @@ type Plugin struct {
// Required: true
Config PluginConfig `json:"Config"`
// True when the plugin is running. False when the plugin is not running, only installed.
// True if the plugin is running. False if the plugin is not running, only installed.
// Required: true
Enabled bool `json:"Enabled"`

View File

@@ -20,7 +20,7 @@ type Annotations struct {
Labels map[string]string `json:"Labels"`
}
// Driver represents a driver (network, logging).
// Driver represents a driver (network, logging, secrets backend).
type Driver struct {
Name string `json:",omitempty"`
Options map[string]string `json:",omitempty"`

View File

@@ -745,6 +745,9 @@ func (container *Container) BuildCreateEndpointOptions(n libnetwork.Network, epC
for _, alias := range epConfig.Aliases {
createOptions = append(createOptions, libnetwork.CreateOptionMyAlias(alias))
}
for k, v := range epConfig.DriverOpts {
createOptions = append(createOptions, libnetwork.EndpointOptionGeneric(options.Generic{k: v}))
}
}
if container.NetworkSettings.Service != nil {
@@ -790,9 +793,6 @@ func (container *Container) BuildCreateEndpointOptions(n libnetwork.Network, epC
createOptions = append(createOptions, libnetwork.EndpointOptionGeneric(genericOption))
}
for k, v := range epConfig.DriverOpts {
createOptions = append(createOptions, libnetwork.EndpointOptionGeneric(options.Generic{k: v}))
}
}

View File

@@ -168,9 +168,9 @@ func (db *memDB) Delete(c *Container) error {
txn.Delete(memdbNamesTable, nameAssociation{name: name})
}
if err := txn.Delete(memdbContainersTable, NewBaseContainer(c.ID, c.Root)); err != nil {
return err
}
// Ignore error - the container may not actually exist in the
// db, but we still need to clean up associated names.
txn.Delete(memdbContainersTable, NewBaseContainer(c.ID, c.Root))
return nil
})
}

View File

@@ -150,4 +150,12 @@ func TestNames(t *testing.T) {
view = db.Snapshot()
assert.Equal(t, map[string][]string{"containerid1": {"name1", "name3", "name4"}, "containerid4": {"name2"}}, view.GetAllNames())
// Release containerid1's names with Delete even though no container exists
assert.NoError(t, db.Delete(&Container{ID: "containerid1"}))
// Reusing one of those names should work
assert.NoError(t, db.ReserveName("name1", "containerid4"))
view = db.Snapshot()
assert.Equal(t, map[string][]string{"containerid4": {"name1", "name2"}}, view.GetAllNames())
}

View File

@@ -27,6 +27,7 @@ type releaseableLayer struct {
func (rl *releaseableLayer) Mount() (string, error) {
var err error
var mountPath string
var chainID layer.ChainID
if rl.roLayer != nil {
chainID = rl.roLayer.ChainID()
@@ -38,7 +39,19 @@ func (rl *releaseableLayer) Mount() (string, error) {
return "", errors.Wrap(err, "failed to create rwlayer")
}
return rl.rwLayer.Mount("")
mountPath, err = rl.rwLayer.Mount("")
if err != nil {
// Clean up the layer if we fail to mount it here.
metadata, err := rl.layerStore.ReleaseRWLayer(rl.rwLayer)
layer.LogReleaseMetadata(metadata)
if err != nil {
logrus.Errorf("Failed to release RWLayer: %s", err)
}
rl.rwLayer = nil
return "", err
}
return mountPath, nil
}
func (rl *releaseableLayer) Commit(platform string) (builder.ReleaseableLayer, error) {
@@ -51,6 +64,7 @@ func (rl *releaseableLayer) Commit(platform string) (builder.ReleaseableLayer, e
if err != nil {
return nil, err
}
defer stream.Close()
newLayer, err := rl.layerStore.Register(stream, chainID, layer.Platform(platform))
if err != nil {
@@ -75,20 +89,32 @@ func (rl *releaseableLayer) Release() error {
if rl.released {
return nil
}
if err := rl.releaseRWLayer(); err != nil {
// Best effort attempt at releasing read-only layer before returning original error.
rl.releaseROLayer()
return err
}
if err := rl.releaseROLayer(); err != nil {
return err
}
rl.released = true
rl.releaseRWLayer()
return rl.releaseROLayer()
return nil
}
func (rl *releaseableLayer) releaseRWLayer() error {
if rl.rwLayer == nil {
return nil
}
if err := rl.rwLayer.Unmount(); err != nil {
logrus.Errorf("Failed to unmount RWLayer: %s", err)
return err
}
metadata, err := rl.layerStore.ReleaseRWLayer(rl.rwLayer)
layer.LogReleaseMetadata(metadata)
if err != nil {
logrus.Errorf("Failed to release RWLayer: %s", err)
}
rl.rwLayer = nil
return err
}
@@ -98,6 +124,10 @@ func (rl *releaseableLayer) releaseROLayer() error {
}
metadata, err := rl.layerStore.Release(rl.roLayer)
layer.LogReleaseMetadata(metadata)
if err != nil {
logrus.Errorf("Failed to release ROLayer: %s", err)
}
rl.roLayer = nil
return err
}

View File

@@ -11,57 +11,50 @@ import (
// GetTasks returns a list of tasks matching the filter options.
func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
c.mu.RLock()
defer c.mu.RUnlock()
var r *swarmapi.ListTasksResponse
state := c.currentNodeState()
if !state.IsActiveManager() {
return nil, c.errNoManager(state)
}
filterTransform := func(filter filters.Args) error {
if filter.Include("service") {
serviceFilters := filter.Get("service")
for _, serviceFilter := range serviceFilters {
service, err := c.GetService(serviceFilter, false)
if err != nil {
return err
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
filterTransform := func(filter filters.Args) error {
if filter.Include("service") {
serviceFilters := filter.Get("service")
for _, serviceFilter := range serviceFilters {
service, err := getService(ctx, state.controlClient, serviceFilter, false)
if err != nil {
return err
}
filter.Del("service", serviceFilter)
filter.Add("service", service.ID)
}
filter.Del("service", serviceFilter)
filter.Add("service", service.ID)
}
}
if filter.Include("node") {
nodeFilters := filter.Get("node")
for _, nodeFilter := range nodeFilters {
node, err := c.GetNode(nodeFilter)
if err != nil {
return err
if filter.Include("node") {
nodeFilters := filter.Get("node")
for _, nodeFilter := range nodeFilters {
node, err := getNode(ctx, state.controlClient, nodeFilter)
if err != nil {
return err
}
filter.Del("node", nodeFilter)
filter.Add("node", node.ID)
}
filter.Del("node", nodeFilter)
filter.Add("node", node.ID)
}
if !filter.Include("runtime") {
// default to only showing container tasks
filter.Add("runtime", "container")
filter.Add("runtime", "")
}
return nil
}
if !filter.Include("runtime") {
// default to only showing container tasks
filter.Add("runtime", "container")
filter.Add("runtime", "")
filters, err := newListTasksFilters(options.Filters, filterTransform)
if err != nil {
return err
}
return nil
}
filters, err := newListTasksFilters(options.Filters, filterTransform)
if err != nil {
return nil, err
}
ctx, cancel := c.getRequestContext()
defer cancel()
r, err := state.controlClient.ListTasks(
ctx,
&swarmapi.ListTasksRequest{Filters: filters})
if err != nil {
r, err = state.controlClient.ListTasks(
ctx,
&swarmapi.ListTasksRequest{Filters: filters})
return err
}); err != nil {
return nil, err
}

View File

@@ -119,7 +119,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo
if container.RWLayer != nil {
metadata, err := daemon.stores[container.Platform].layerStore.ReleaseRWLayer(container.RWLayer)
layer.LogReleaseMetadata(metadata)
if err != nil && err != layer.ErrMountDoesNotExist {
if err != nil && err != layer.ErrMountDoesNotExist && !os.IsNotExist(errors.Cause(err)) {
return errors.Wrapf(err, "driver %q failed to remove root filesystem for %s", daemon.GraphDriverName(container.Platform), container.ID)
}
}

View File

@@ -46,6 +46,7 @@ import (
"github.com/docker/docker/pkg/system"
rsystem "github.com/opencontainers/runc/libcontainer/system"
"github.com/opencontainers/selinux/go-selinux/label"
"github.com/pkg/errors"
"github.com/vbatts/tar-split/tar/storage"
"golang.org/x/sys/unix"
)
@@ -282,30 +283,41 @@ func (a *Driver) Remove(id string) error {
mountpoint = a.getMountpoint(id)
}
logger := logrus.WithFields(logrus.Fields{
"module": "graphdriver",
"driver": "aufs",
"layer": id,
})
var retries int
for {
mounted, err := a.mounted(mountpoint)
if err != nil {
if os.IsNotExist(err) {
break
}
return err
}
if !mounted {
break
}
if err := a.unmount(mountpoint); err != nil {
if err != unix.EBUSY {
return fmt.Errorf("aufs: unmount error: %s: %v", mountpoint, err)
}
if retries >= 5 {
return fmt.Errorf("aufs: unmount error after retries: %s: %v", mountpoint, err)
}
// If unmount returns EBUSY, it could be a transient error. Sleep and retry.
retries++
logrus.Warnf("unmount failed due to EBUSY: retry count: %d", retries)
time.Sleep(100 * time.Millisecond)
continue
err = a.unmount(mountpoint)
if err == nil {
break
}
break
if err != unix.EBUSY {
return errors.Wrapf(err, "aufs: unmount error: %s", mountpoint)
}
if retries >= 5 {
return errors.Wrapf(err, "aufs: unmount error after retries: %s", mountpoint)
}
// If unmount returns EBUSY, it could be a transient error. Sleep and retry.
retries++
logger.Warnf("unmount failed due to EBUSY: retry count: %d", retries)
time.Sleep(100 * time.Millisecond)
continue
}
// Atomically remove each directory in turn by first moving it out of the
@@ -314,21 +326,22 @@ func (a *Driver) Remove(id string) error {
tmpMntPath := path.Join(a.mntPath(), fmt.Sprintf("%s-removing", id))
if err := os.Rename(mountpoint, tmpMntPath); err != nil && !os.IsNotExist(err) {
if err == unix.EBUSY {
logrus.Warn("os.Rename err due to EBUSY")
logger.WithField("dir", mountpoint).WithError(err).Warn("os.Rename err due to EBUSY")
}
return err
return errors.Wrapf(err, "error preparing atomic delete of aufs mountpoint for id: %s", id)
}
if err := system.EnsureRemoveAll(tmpMntPath); err != nil {
return errors.Wrapf(err, "error removing aufs layer %s", id)
}
defer system.EnsureRemoveAll(tmpMntPath)
tmpDiffpath := path.Join(a.diffPath(), fmt.Sprintf("%s-removing", id))
if err := os.Rename(a.getDiffPath(id), tmpDiffpath); err != nil && !os.IsNotExist(err) {
return err
return errors.Wrapf(err, "error preparing atomic delete of aufs diff dir for id: %s", id)
}
defer system.EnsureRemoveAll(tmpDiffpath)
// Remove the layers file for the id
if err := os.Remove(path.Join(a.rootPath(), "layers", id)); err != nil && !os.IsNotExist(err) {
return err
return errors.Wrapf(err, "error removing layers dir for %s", id)
}
a.pathCacheLock.Lock()

View File

@@ -384,15 +384,18 @@ func (l *logStream) collectBatch() {
eventBufferNegative := eventBufferAge < 0
if eventBufferExpired || eventBufferNegative {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
}
l.publishBatch(events)
events = events[:0]
case msg, more := <-l.messages:
if !more {
// Flush event buffer
// Flush event buffer and release resources
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
l.publishBatch(events)
events = events[:0]
return
}
if eventBufferTimestamp == 0 {
@@ -400,17 +403,13 @@ func (l *logStream) collectBatch() {
}
unprocessedLine := msg.Line
if l.multilinePattern != nil {
if l.multilinePattern.Match(unprocessedLine) {
// This is a new log event so flush the current eventBuffer to events
if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
// If we will exceed max bytes per event flush the current event buffer before appending
if len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
// Append new line
processedLine := append(unprocessedLine, "\n"...)
eventBuffer = append(eventBuffer, processedLine...)

View File

@@ -641,7 +641,7 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
})
// Fire ticker batchPublishFrequency seconds later
ticks <- time.Now().Add(batchPublishFrequency * time.Second)
ticks <- time.Now().Add(batchPublishFrequency + time.Second)
// Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
argument := <-mockClient.putLogEventsArgument
@@ -649,6 +649,20 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
// Log an event 1 second later
stream.Log(&logger.Message{
Line: []byte(logline),
Timestamp: time.Now().Add(time.Second),
})
// Fire ticker another batchPublishFrequency seconds later
ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)
// Verify the event buffer is truly flushed - we should only receive a single event
argument = <-mockClient.putLogEventsArgument
assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
assert.Equal(t, logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
stream.Close()
}

View File

@@ -19,6 +19,14 @@ keywords: "API, Docker, rcli, REST, documentation"
* `DELETE /secrets/(name)` now returns status code 404 instead of 500 when the secret does not exist.
* `POST /secrets/create` now returns status code 409 instead of 500 when creating an already existing secret.
* `POST /secrets/create` now accepts a `Driver` struct, allowing the
`Name` and driver-specific `Options` to be passed to store a secrets
in an external secrets store. The `Driver` property can be omitted
if the default (internal) secrets store is used.
* `GET /secrets/(id)` and `GET /secrets` now return a `Driver` struct,
containing the `Name` and driver-specific `Options` of the external
secrets store used to store the secret. The `Driver` property is
omitted if no external store is used.
* `POST /secrets/(name)/update` now returns status code 400 instead of 500 when updating a secret's content which is not the labels.
* `POST /nodes/(name)/update` now returns status code 400 instead of 500 when demoting last node fails.
* `GET /networks/(id or name)` now takes an optional query parameter `scope` that will filter the network based on the scope (`local`, `swarm`, or `global`).

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env bash
set -e
source "${MAKEDIR}/.go-autogen"
source hack/make/.integration-test-helpers
# subshell so that we can export PATH without breaking other things

View File

@@ -690,6 +690,21 @@ func (s *DockerNetworkSuite) TestDockerNetworkIPAMOptions(c *check.C) {
c.Assert(opts["opt2"], checker.Equals, "drv2")
}
func (s *DockerNetworkSuite) TestDockerNetworkNullIPAMDriver(c *check.C) {
// Create a network with null ipam driver
_, _, err := dockerCmdWithError("network", "create", "-d", dummyNetworkDriver, "--ipam-driver", "null", "test000")
c.Assert(err, check.IsNil)
assertNwIsAvailable(c, "test000")
// Verify the inspect data contains the default subnet provided by the null
// ipam driver and no gateway, as the null ipam driver does not provide one
nr := getNetworkResource(c, "test000")
c.Assert(nr.IPAM.Driver, checker.Equals, "null")
c.Assert(len(nr.IPAM.Config), checker.Equals, 1)
c.Assert(nr.IPAM.Config[0].Subnet, checker.Equals, "0.0.0.0/0")
c.Assert(nr.IPAM.Config[0].Gateway, checker.Equals, "")
}
func (s *DockerNetworkSuite) TestDockerNetworkInspectDefault(c *check.C) {
nr := getNetworkResource(c, "none")
c.Assert(nr.Driver, checker.Equals, "null")

View File

@@ -19,6 +19,7 @@ import (
"github.com/Sirupsen/logrus"
containerd "github.com/containerd/containerd/api/grpc/types"
"github.com/crosbymichael/upgrade/v17_06_1"
"github.com/docker/docker/pkg/locker"
"github.com/docker/docker/pkg/system"
"github.com/golang/protobuf/ptypes"
@@ -39,7 +40,13 @@ const (
containerdPidFilename = "docker-containerd.pid"
containerdSockFilename = "docker-containerd.sock"
containerdStateDir = "containerd"
containerdInitDir = "init"
eventTimestampFilename = "event.ts"
processFilename = "process.json"
// TODO: Use user's --root parameter for runc, if possible
runcStateDir = "/run/runc"
runcStateFilename = "state.json"
)
type remote struct {
@@ -89,6 +96,7 @@ func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
}
if r.startDaemon {
r.makeUpgradeProof()
if err := r.runContainerdDaemon(); err != nil {
return nil, err
}
@@ -128,6 +136,37 @@ func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
return r, nil
}
func (r *remote) makeUpgradeProof() {
dir := filepath.Join(r.stateDir, containerdStateDir)
f, err := os.Open(dir)
if err != nil {
logrus.Warnf("libcontainerd: makeUpgradeProof could not open %s", dir)
return
}
fis, err := f.Readdir(0)
if err != nil {
logrus.Warnf("libcontainerd: makeUpgradeProof could not read directory entries in %s", dir)
f.Close()
return
}
containerIds := make([]string, 0, len(fis))
for _, fi := range fis {
if fi.IsDir() {
containerIds = append(containerIds, fi.Name())
}
}
f.Close()
for _, id := range containerIds {
if err := v17_06_1.Upgrade(
filepath.Join(runcStateDir, id, runcStateFilename),
filepath.Join(r.stateDir, id, configFilename),
filepath.Join(dir, id, containerdInitDir, processFilename),
); err != nil {
logrus.Warnf("libcontainerd: could not upgrade state files during live restore for container %s: %v", id, err)
}
}
}
func (r *remote) UpdateOptions(options ...RemoteOption) error {
for _, option := range options {
if err := option.Apply(r); err != nil {

View File

@@ -105,6 +105,11 @@ func NewManager(config ManagerConfig) (*Manager, error) {
if err := os.MkdirAll(manager.tmpDir(), 0700); err != nil {
return nil, errors.Wrapf(err, "failed to mkdir %v", manager.tmpDir())
}
if err := setupRoot(manager.config.Root); err != nil {
return nil, err
}
var err error
manager.containerdClient, err = config.Executor.Client(manager) // todo: move to another struct
if err != nil {

View File

@@ -162,6 +162,13 @@ func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd.
}
}
func setupRoot(root string) error {
if err := mount.MakePrivate(root); err != nil {
return errors.Wrap(err, "error setting plugin manager root to private")
}
return nil
}
func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
if !p.IsEnabled() {
return fmt.Errorf("plugin %s is already disabled", p.Name())
@@ -190,6 +197,7 @@ func (pm *Manager) Shutdown() {
shutdownPlugin(p, c, pm.containerdClient)
}
}
mount.Unmount(pm.config.Root)
}
func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest digest.Digest, blobsums []digest.Digest, tmpRootFSDir string, privileges *types.PluginPrivileges) (err error) {

View File

@@ -26,3 +26,5 @@ func (pm *Manager) restore(p *v2.Plugin) error {
// Shutdown plugins
func (pm *Manager) Shutdown() {
}
func setupRoot(root string) error { return nil }

View File

@@ -28,3 +28,5 @@ func (pm *Manager) restore(p *v2.Plugin) error {
// Shutdown plugins
func (pm *Manager) Shutdown() {
}
func setupRoot(root string) error { return nil }

View File

@@ -3,6 +3,7 @@ github.com/Azure/go-ansiterm 388960b655244e76e24c75f48631564eaefade62
github.com/Microsoft/hcsshim v0.5.25
github.com/Microsoft/go-winio v0.4.2
github.com/Sirupsen/logrus v0.11.0
github.com/crosbymichael/upgrade 3ee9eb41518034a2dfe45d8273297f309a9d94da
github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76
github.com/docker/libtrust 9cbd2a1374f46905c68a4eb3694a130610adc62a
github.com/go-check/check 4ed411733c5785b40214c70bce814c3a3a689609 https://github.com/cpuguy83/check.git
@@ -26,7 +27,7 @@ github.com/imdario/mergo 0.2.1
golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0
#get libnetwork packages
github.com/docker/libnetwork 6426d1e66f33c0b0c8bb135b7ee547447f54d043
github.com/docker/libnetwork 9647f993a81e404639592e8ed73693b48ec09c2e
github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
@@ -106,7 +107,7 @@ github.com/stevvooe/continuity cd7a8e21e2b6f84799f5dd4b65faf49c8d3ee02d
github.com/tonistiigi/fsutil 0ac4c11b053b9c5c7c47558f81f96c7100ce50fb
# cluster
github.com/docker/swarmkit 3e2dd3c0a76149b1620b42d28dd6ff48270404e5
github.com/docker/swarmkit 87c2a23c2da1fca31abe6161bc908061fb06643e
github.com/gogo/protobuf v0.4
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e
@@ -143,4 +144,4 @@ github.com/opencontainers/selinux v1.0.0-rc1
# git --git-dir ./go/.git --work-tree ./go checkout revert-prefix-ignore
# cp -a go/src/archive/tar ./vendor/archive/tar
# rm -rf ./go
# vndr
# vndr

View File

@@ -0,0 +1,3 @@
# How to generate
go generate ./template.go

View File

@@ -0,0 +1,29 @@
// DO NOT EDIT
// This file has been auto-generated with go generate.
package v17_06_1
import specs "github.com/opencontainers/runtime-spec/specs-go" // a45ba0989fc26c695fe166a49c45bb8b7618ab36 https://github.com/docker/runtime-spec
type ProcessState struct {
Terminal bool `json:"terminal,omitempty"`
ConsoleSize specs.Box `json:"consoleSize,omitempty"`
User specs.User `json:"user"`
Args []string `json:"args"`
Env []string `json:"env,omitempty"`
Cwd string `json:"cwd"`
Capabilities linuxCapabilities `json:"capabilities,omitempty" platform:"linux"`
Rlimits []specs.LinuxRlimit `json:"rlimits,omitempty" platform:"linux"`
NoNewPrivileges bool `json:"noNewPrivileges,omitempty" platform:"linux"`
ApparmorProfile string `json:"apparmorProfile,omitempty" platform:"linux"`
SelinuxLabel string `json:"selinuxLabel,omitempty" platform:"linux"`
Exec bool `json:"exec"`
Stdin string `json:"containerdStdin"`
Stdout string `json:"containerdStdout"`
Stderr string `json:"containerdStderr"`
RuntimeArgs []string `json:"runtimeArgs"`
NoPivotRoot bool `json:"noPivotRoot"`
Checkpoint string `json:"checkpoint"`
RootUID int `json:"rootUID"`
RootGID int `json:"rootGID"`
}

View File

@@ -0,0 +1,66 @@
// DO NOT EDIT
// This file has been auto-generated with go generate.
package v17_06_1
import specs "github.com/opencontainers/runtime-spec/specs-go" // a45ba0989fc26c695fe166a49c45bb8b7618ab36 https://github.com/docker/runtime-spec
type Spec struct {
Version string `json:"ociVersion"`
Platform specs.Platform `json:"platform"`
Process struct {
Terminal bool `json:"terminal,omitempty"`
ConsoleSize specs.Box `json:"consoleSize,omitempty"`
User specs.User `json:"user"`
Args []string `json:"args"`
Env []string `json:"env,omitempty"`
Cwd string `json:"cwd"`
Capabilities linuxCapabilities `json:"capabilities,omitempty" platform:"linux"`
Rlimits []specs.LinuxRlimit `json:"rlimits,omitempty" platform:"linux"`
NoNewPrivileges bool `json:"noNewPrivileges,omitempty" platform:"linux"`
ApparmorProfile string `json:"apparmorProfile,omitempty" platform:"linux"`
SelinuxLabel string `json:"selinuxLabel,omitempty" platform:"linux"`
} `json:"process"`
Root specs.Root `json:"root"`
Hostname string `json:"hostname,omitempty"`
Mounts []specs.Mount `json:"mounts,omitempty"`
Hooks *specs.Hooks `json:"hooks,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
Linux *struct {
UIDMappings []specs.LinuxIDMapping `json:"uidMappings,omitempty"`
GIDMappings []specs.LinuxIDMapping `json:"gidMappings,omitempty"`
Sysctl map[string]string `json:"sysctl,omitempty"`
Resources *struct {
Devices []specs.LinuxDeviceCgroup `json:"devices,omitempty"`
DisableOOMKiller *bool `json:"disableOOMKiller,omitempty"`
OOMScoreAdj *int `json:"oomScoreAdj,omitempty"`
Memory *struct {
Limit *int64 `json:"limit,omitempty"`
Reservation *int64 `json:"reservation,omitempty"`
Swap *int64 `json:"swap,omitempty"`
Kernel *int64 `json:"kernel,omitempty"`
KernelTCP *int64 `json:"kernelTCP,omitempty"`
Swappiness memorySwappiness `json:"swappiness,omitempty"`
} `json:"memory,omitempty"`
CPU *specs.LinuxCPU `json:"cpu,omitempty"`
Pids *specs.LinuxPids `json:"pids,omitempty"`
BlockIO *specs.LinuxBlockIO `json:"blockIO,omitempty"`
HugepageLimits []specs.LinuxHugepageLimit `json:"hugepageLimits,omitempty"`
Network *specs.LinuxNetwork `json:"network,omitempty"`
} `json:"resources,omitempty"`
CgroupsPath string `json:"cgroupsPath,omitempty"`
Namespaces []specs.LinuxNamespace `json:"namespaces,omitempty"`
Devices []specs.LinuxDevice `json:"devices,omitempty"`
Seccomp *struct {
DefaultAction specs.LinuxSeccompAction `json:"defaultAction"`
Architectures []specs.Arch `json:"architectures,omitempty"`
Syscalls linuxSyscalls `json:"syscalls"`
} `json:"seccomp,omitempty"`
RootfsPropagation string `json:"rootfsPropagation,omitempty"`
MaskedPaths []string `json:"maskedPaths,omitempty"`
ReadonlyPaths []string `json:"readonlyPaths,omitempty"`
MountLabel string `json:"mountLabel,omitempty"`
} `json:"linux,omitempty" platform:"linux"`
Solaris *specs.Solaris `json:"solaris,omitempty" platform:"solaris"`
Windows *specs.Windows `json:"windows,omitempty" platform:"windows"`
}

View File

@@ -0,0 +1,89 @@
// DO NOT EDIT
// This file has been auto-generated with go generate.
package v17_06_1
import (
"time"
"github.com/opencontainers/runc/libcontainer/configs" // 810190ceaa507aa2727d7ae6f4790c76ec150bd2 https://github.com/docker/runc
)
type State struct {
ID string `json:"id"`
InitProcessPid int `json:"init_process_pid"`
InitProcessStartTime string `json:"init_process_start"`
Created time.Time `json:"created"`
Config struct {
NoPivotRoot bool `json:"no_pivot_root"`
ParentDeathSignal int `json:"parent_death_signal"`
Rootfs string `json:"rootfs"`
Readonlyfs bool `json:"readonlyfs"`
RootPropagation int `json:"rootPropagation"`
Mounts []*configs.Mount `json:"mounts"`
Devices []*configs.Device `json:"devices"`
MountLabel string `json:"mount_label"`
Hostname string `json:"hostname"`
Namespaces configs.Namespaces `json:"namespaces"`
Capabilities linuxCapabilities `json:"capabilities"`
Networks []*configs.Network `json:"networks"`
Routes []*configs.Route `json:"routes"`
Cgroups *struct {
Name string `json:"name,omitempty"`
Parent string `json:"parent,omitempty"`
Path string `json:"path"`
ScopePrefix string `json:"scope_prefix"`
Paths map[string]string
AllowAllDevices *bool `json:"allow_all_devices,omitempty"`
AllowedDevices []*configs.Device `json:"allowed_devices,omitempty"`
DeniedDevices []*configs.Device `json:"denied_devices,omitempty"`
Devices []*configs.Device `json:"devices"`
Memory int64 `json:"memory"`
MemoryReservation int64 `json:"memory_reservation"`
MemorySwap int64 `json:"memory_swap"`
KernelMemory int64 `json:"kernel_memory"`
KernelMemoryTCP int64 `json:"kernel_memory_tcp"`
CpuShares uint64 `json:"cpu_shares"`
CpuQuota int64 `json:"cpu_quota"`
CpuPeriod uint64 `json:"cpu_period"`
CpuRtRuntime int64 `json:"cpu_rt_quota"`
CpuRtPeriod uint64 `json:"cpu_rt_period"`
CpusetCpus string `json:"cpuset_cpus"`
CpusetMems string `json:"cpuset_mems"`
PidsLimit int64 `json:"pids_limit"`
BlkioWeight uint16 `json:"blkio_weight"`
BlkioLeafWeight uint16 `json:"blkio_leaf_weight"`
BlkioWeightDevice []*configs.WeightDevice `json:"blkio_weight_device"`
BlkioThrottleReadBpsDevice []*configs.ThrottleDevice `json:"blkio_throttle_read_bps_device"`
BlkioThrottleWriteBpsDevice []*configs.ThrottleDevice `json:"blkio_throttle_write_bps_device"`
BlkioThrottleReadIOPSDevice []*configs.ThrottleDevice `json:"blkio_throttle_read_iops_device"`
BlkioThrottleWriteIOPSDevice []*configs.ThrottleDevice `json:"blkio_throttle_write_iops_device"`
Freezer configs.FreezerState `json:"freezer"`
HugetlbLimit []*configs.HugepageLimit `json:"hugetlb_limit"`
OomKillDisable bool `json:"oom_kill_disable"`
MemorySwappiness memorySwappiness `json:"memory_swappiness"`
NetPrioIfpriomap []*configs.IfPrioMap `json:"net_prio_ifpriomap"`
NetClsClassid uint32 `json:"net_cls_classid_u"`
} `json:"cgroups"`
AppArmorProfile string `json:"apparmor_profile,omitempty"`
ProcessLabel string `json:"process_label,omitempty"`
Rlimits []configs.Rlimit `json:"rlimits,omitempty"`
OomScoreAdj int `json:"oom_score_adj"`
UidMappings []configs.IDMap `json:"uid_mappings"`
GidMappings []configs.IDMap `json:"gid_mappings"`
MaskPaths []string `json:"mask_paths"`
ReadonlyPaths []string `json:"readonly_paths"`
Sysctl map[string]string `json:"sysctl"`
Seccomp *configs.Seccomp `json:"seccomp"`
NoNewPrivileges bool `json:"no_new_privileges,omitempty"`
Hooks *configs.Hooks
Version string `json:"version"`
Labels []string `json:"labels"`
NoNewKeyring bool `json:"no_new_keyring"`
Rootless bool `json:"rootless"`
} `json:"config"`
Rootless bool `json:"rootless"`
CgroupPaths map[string]string `json:"cgroup_paths"`
NamespacePaths map[configs.NamespaceType]string `json:"namespace_paths"`
ExternalDescriptors []string `json:"external_descriptors,omitempty"`
}

View File

@@ -0,0 +1,119 @@
package v17_06_1
import (
"bytes"
"encoding/json"
"fmt"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
type linuxSyscalls []linuxSyscall
type linuxSyscall struct {
specs.LinuxSyscall
}
func (ls *linuxSyscall) UnmarshalJSON(b []byte) error {
var t struct {
specs.LinuxSyscall
Name *string `json:"name,omitempty"`
}
if err := json.Unmarshal(b, &t); err != nil {
return err
}
ls.LinuxSyscall = t.LinuxSyscall
if t.Name != nil {
if ls.LinuxSyscall.Names != nil {
return fmt.Errorf("found incompatible 'name' and 'names' fields")
}
ls.LinuxSyscall.Names = []string{*t.Name}
t.Name = nil
}
return nil
}
// TODO: figure out how to omitempty when pointer is nil
type memorySwappiness struct {
V *uint64 `json:",omitempty"`
}
func (m memorySwappiness) String() string {
if m.V == nil {
return "<nil>"
}
return fmt.Sprintf("%d", *m.V)
}
var null = []byte("null")
func (m *memorySwappiness) MarshalJSON() ([]byte, error) {
if m.V == nil {
return null, nil
}
return []byte(fmt.Sprintf("%d", *m.V)), nil
}
func (m *memorySwappiness) UnmarshalJSON(b []byte) error {
if bytes.Compare(b, null) == 0 {
return nil
}
var n uint64
var i int64
err := json.Unmarshal(b, &i)
switch err.(type) {
case nil:
n = uint64(i)
case *json.UnmarshalTypeError:
// The only valid reason for accepting a uint64 that does not fit into an int64
// is for erroneous -1 values being converted to uint64.
// Nevertheless, try unmarshaling it and error out if it's not a number at all.
if err := json.Unmarshal(b, &n); err != nil {
return err
}
default:
return err
}
if n >= 0 && n <= 100 {
m.V = &n
} else {
m.V = nil
}
return nil
}
type linuxCapabilities struct {
V *specs.LinuxCapabilities
}
func (l *linuxCapabilities) MarshalJSON() ([]byte, error) {
return json.Marshal(l.V)
}
func (l *linuxCapabilities) UnmarshalJSON(b []byte) error {
if bytes.Compare(b, null) == 0 {
return nil
}
var s specs.LinuxCapabilities
err := json.Unmarshal(b, &s)
switch err.(type) {
case nil:
l.V = &s
case *json.UnmarshalTypeError:
var caps []string
err = json.Unmarshal(b, &caps)
if err != nil {
return err
}
// TODO: copy caps or not copy caps?
l.V = &specs.LinuxCapabilities{
Bounding: caps,
Effective: caps,
Inheritable: caps,
Permitted: caps,
Ambient: nil,
}
}
return err
}

View File

@@ -0,0 +1,63 @@
package v17_06_1
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"github.com/docker/docker/pkg/ioutils"
)
type file struct {
name string
x interface{}
buf bytes.Buffer
w io.WriteCloser
}
func Upgrade(runcState, containerdConfig, containerdProcess string) error {
files := []*file{
&file{name: runcState, x: new(State)},
&file{name: containerdConfig, x: new(Spec)},
&file{name: containerdProcess, x: new(ProcessState)},
}
for _, f := range files {
fd, err := os.Open(f.name)
if err != nil {
return err
}
defer fd.Close()
// error out if any of the files have issues being decoded
// before overwriting them, to prevent being in a mixed state.
if err := json.NewDecoder(fd).Decode(f.x); err != nil {
return err
}
// error out if any of the files have issues being encoded
// before overwriting them, to prevent being in a mixed state.
if err := json.NewEncoder(&f.buf).Encode(f.x); err != nil {
return err
}
fi, err := fd.Stat()
if err != nil {
return err
}
f.w, err = ioutils.NewAtomicFileWriter(f.name, fi.Mode())
if err != nil {
return err
}
defer f.w.Close()
}
var errs []string
for _, f := range files {
if _, err := f.w.Write(f.buf.Bytes()); err != nil {
errs = append(errs, fmt.Sprintf("error writing to %s: %v", f.name, err))
}
}
if errs != nil {
return fmt.Errorf(strings.Join(errs, ", "))
}
return nil
}

View File

@@ -0,0 +1,23 @@
# runtime-spec
github.com/opencontainers/runtime-spec a45ba0989fc26c695fe166a49c45bb8b7618ab36 https://github.com/docker/runtime-spec
# runc
github.com/opencontainers/runc 810190ceaa507aa2727d7ae6f4790c76ec150bd2 https://github.com/docker/runc
github.com/mrunalp/fileutils ed869b029674c0e9ce4c0dfa781405c2d9946d08
github.com/seccomp/libseccomp-golang 32f571b70023028bd57d9288c20efbcb237f3ce0
github.com/syndtr/gocapability e7cb7fa329f456b3855136a2642b197bad7366ba
github.com/golang/protobuf f7137ae6b19afbfd61a94b746fda3b3fe
github.com/docker/go-units v0.2.0
github.com/vishvananda/netlink 1e2e08e8a2dcdacaae3f14ac44c5c
github.com/docker/docker 0f5c9d301b9b1cca66b3ea0f9dec3b5317d3686d
github.com/opencontainers/selinux v1.0.0-rc1
github.com/coreos/go-systemd v14
github.com/coreos/pkg v3
github.com/godbus/dbus v3
# containerd
github.com/containerd/containerd 6e23458c129b551d5c9871e5174f6b1b7f6d1170 https://github.com/docker/containerd
golang.org/x/net 991d3e32f76f19ee6d9caadb3a22eae8d23315f7 https://github.com/golang/net.git
golang.org/x/sys d4feaf1a7e61e1d9e79e6c4e76c6349e9 https://github.com/golang/sys.git
github.com/Sirupsen/logrus v0.11.2

1
vendor/github.com/crosbymichael/upgrade/vendor.conf generated vendored Symbolic link
View File

@@ -0,0 +1 @@
v17_06_1/vendor.conf

View File

@@ -214,8 +214,8 @@ func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
listen := clusterProvider.GetListenAddress()
listenAddr, _, _ := net.SplitHostPort(listen)
logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v",
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d",
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.NetworkControlPlaneMTU)
if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
logrus.Errorf("error in agentInit: %v", err)
@@ -286,12 +286,19 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)
nDB, err := networkdb.New(&networkdb.Config{
BindAddr: listenAddr,
AdvertiseAddr: advertiseAddr,
NodeName: nodeName,
Keys: keys,
})
netDBConf := networkdb.DefaultConfig()
netDBConf.NodeName = nodeName
netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys
if c.Config().Daemon.NetworkControlPlaneMTU != 0 {
// Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
// To be on the safe side let's cut 100 bytes
netDBConf.PacketBufferSize = (c.Config().Daemon.NetworkControlPlaneMTU - 100)
logrus.Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
}
nDB, err := networkdb.New(netDBConf)
if err != nil {
return err
@@ -383,15 +390,11 @@ func (c *controller) agentClose() {
agent.Lock()
for _, cancelFuncs := range agent.driverCancelFuncs {
for _, cancel := range cancelFuncs {
cancelList = append(cancelList, cancel)
}
cancelList = append(cancelList, cancelFuncs...)
}
// Add also the cancel functions for the network db
for _, cancel := range agent.coreCancelFuncs {
cancelList = append(cancelList, cancel)
}
cancelList = append(cancelList, agent.coreCancelFuncs...)
agent.Unlock()
for _, cancel := range cancelList {
@@ -738,11 +741,12 @@ func (n *network) addDriverWatches() {
return
}
agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
if nid == n.ID() {
agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
// skip the entries that are mark for deletion, this is safe because this function is
// called at initialization time so there is no state to delete
if nid == n.ID() && !deleted {
d.EventNotify(driverapi.Create, nid, table.name, key, value)
}
return false
})
}

View File

@@ -497,7 +497,10 @@ func getFirstAvailable(head *sequence, start uint64) (uint64, uint64, error) {
// Derive the this sequence offsets
byteOffset := byteStart - inBlockBytePos
bitOffset := inBlockBytePos*8 + bitStart
var firstOffset uint64
if current == head {
firstOffset = byteOffset
}
for current != nil {
if current.block != blockMAX {
bytePos, bitPos, err := current.getAvailableBit(bitOffset)
@@ -505,7 +508,8 @@ func getFirstAvailable(head *sequence, start uint64) (uint64, uint64, error) {
}
// Moving to next block: Reset bit offset.
bitOffset = 0
byteOffset += current.count * blockBytes
byteOffset += (current.count * blockBytes) - firstOffset
firstOffset = 0
current = current.next
}
return invalidPos, invalidPos, ErrNoBitAvailable

29
vendor/github.com/docker/libnetwork/common/caller.go generated vendored Normal file
View File

@@ -0,0 +1,29 @@
package common
import (
"runtime"
"strings"
)
func callerInfo(i int) string {
ptr, _, _, ok := runtime.Caller(i)
fName := "unknown"
if ok {
f := runtime.FuncForPC(ptr)
if f != nil {
// f.Name() is like: github.com/docker/libnetwork/common.MethodName
tmp := strings.Split(f.Name(), ".")
if len(tmp) > 0 {
fName = tmp[len(tmp)-1]
}
}
}
return fName
}
// CallerName returns the name of the function at the specified level
// level == 0 means current method name
func CallerName(level int) string {
return callerInfo(2 + level)
}

View File

@@ -26,14 +26,15 @@ type Config struct {
// DaemonCfg represents libnetwork core configuration
type DaemonCfg struct {
Debug bool
Experimental bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
Debug bool
Experimental bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
NetworkControlPlaneMTU int
}
// ClusterCfg represents cluster configuration
@@ -221,6 +222,18 @@ func OptionExperimental(exp bool) Option {
}
}
// OptionNetworkControlPlaneMTU function returns an option setter for control plane MTU
func OptionNetworkControlPlaneMTU(exp int) Option {
return func(c *Config) {
logrus.Debugf("Network Control Plane MTU: %d", exp)
if exp < 1500 {
// if exp == 0 the value won't be used
logrus.Warnf("Received a MTU of %d, this value is very low, the network control plane can misbehave", exp)
}
c.Daemon.NetworkControlPlaneMTU = exp
}
}
// ProcessOptions processes options and stores it in config
func (c *Config) ProcessOptions(options ...Option) {
for _, opt := range options {
@@ -232,10 +245,7 @@ func (c *Config) ProcessOptions(options ...Option) {
// IsValidName validates configuration objects supported by libnetwork
func IsValidName(name string) bool {
if strings.TrimSpace(name) == "" {
return false
}
return true
return strings.TrimSpace(name) != ""
}
// OptionLocalKVProvider function returns an option setter for kvstore provider

View File

@@ -0,0 +1,133 @@
package diagnose
import (
"fmt"
"net"
"net/http"
"sync"
"github.com/Sirupsen/logrus"
)
// HTTPHandlerFunc TODO
type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request)
type httpHandlerCustom struct {
ctx interface{}
F func(interface{}, http.ResponseWriter, *http.Request)
}
// ServeHTTP TODO
func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.F(h.ctx, w, r)
}
var diagPaths2Func = map[string]HTTPHandlerFunc{
"/": notImplemented,
"/help": help,
"/ready": ready,
}
// Server when the debug is enabled exposes a
// This data structure is protected by the Agent mutex so does not require and additional mutex here
type Server struct {
sk net.Listener
port int
mux *http.ServeMux
registeredHanders []string
sync.Mutex
}
// Init TODO
func (n *Server) Init() {
n.mux = http.NewServeMux()
// Register local handlers
n.RegisterHandler(n, diagPaths2Func)
}
// RegisterHandler TODO
func (n *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) {
n.Lock()
defer n.Unlock()
for path, fun := range hdlrs {
n.mux.Handle(path, httpHandlerCustom{ctx, fun})
n.registeredHanders = append(n.registeredHanders, path)
}
}
// EnableDebug opens a TCP socket to debug the passed network DB
func (n *Server) EnableDebug(ip string, port int) {
n.Lock()
defer n.Unlock()
n.port = port
logrus.SetLevel(logrus.DebugLevel)
if n.sk != nil {
logrus.Infof("The server is already up and running")
return
}
logrus.Infof("Starting the server listening on %d for commands", port)
// // Create the socket
// var err error
// n.sk, err = net.Listen("tcp", listeningAddr)
// if err != nil {
// log.Fatal(err)
// }
//
// go func() {
// http.Serve(n.sk, n.mux)
// }()
http.ListenAndServe(":8000", n.mux)
}
// DisableDebug stop the dubug and closes the tcp socket
func (n *Server) DisableDebug() {
n.Lock()
defer n.Unlock()
n.sk.Close()
n.sk = nil
}
// IsDebugEnable returns true when the debug is enabled
func (n *Server) IsDebugEnable() bool {
n.Lock()
defer n.Unlock()
return n.sk != nil
}
func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "URL path: %s no method implemented check /help\n", r.URL.Path)
}
func help(ctx interface{}, w http.ResponseWriter, r *http.Request) {
n, ok := ctx.(*Server)
if ok {
for _, path := range n.registeredHanders {
fmt.Fprintf(w, "%s\n", path)
}
}
}
func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "OK\n")
}
// DebugHTTPForm TODO
func DebugHTTPForm(r *http.Request) {
r.ParseForm()
for k, v := range r.Form {
logrus.Debugf("Form[%q] = %q\n", k, v)
}
}
// HTTPReplyError TODO
func HTTPReplyError(w http.ResponseWriter, message, usage string) {
fmt.Fprintf(w, "%s\n", message)
if usage != "" {
fmt.Fprintf(w, "Usage: %s\n", usage)
}
}

View File

@@ -120,8 +120,7 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
}
}
d.peerDbAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac,
net.ParseIP(d.advertiseAddress), true)
d.peerAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true)
if err := d.checkEncryption(nid, nil, n.vxlanID(s), true, true); err != nil {
logrus.Warn(err)
@@ -205,7 +204,7 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
return
}
d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false)
d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false, false)
}
// Leave method is invoked when a Sandbox detaches from an endpoint.

View File

@@ -12,6 +12,7 @@ import (
"strings"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/reexec"
@@ -682,10 +683,12 @@ func (n *network) initSandbox(restore bool) error {
return fmt.Errorf("could not get network sandbox (oper %t): %v", restore, err)
}
// this is needed to let the peerAdd configure the sandbox
n.setSandbox(sbox)
if !restore {
n.driver.peerDbUpdateSandbox(n.id)
// Initialize the sandbox with all the peers previously received from networkdb
n.driver.initSandboxPeerDB(n.id)
}
var nlSock *nl.NetlinkSocket
@@ -705,6 +708,7 @@ func (n *network) initSandbox(restore bool) error {
}
func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
t := time.Now()
for {
msgs, err := nlSock.Receive()
if err != nil {
@@ -757,23 +761,52 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
continue
}
if !n.driver.isSerfAlive() {
continue
}
mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, ip)
if err != nil {
logrus.Errorf("could not resolve peer %q: %v", ip, err)
continue
}
if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil {
logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err)
if n.driver.isSerfAlive() {
mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, ip)
if err != nil {
logrus.Errorf("could not resolve peer %q: %v", ip, err)
continue
}
n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss, false)
} else {
// If the gc_thresh values are lower kernel might knock off the neighor entries.
// When we get a L3 miss check if its a valid peer and reprogram the neighbor
// entry again. Rate limit it to once attempt every 500ms, just in case a faulty
// container sends a flood of packets to invalid peers
if !l3Miss {
continue
}
if time.Since(t) > 500*time.Millisecond {
t = time.Now()
n.programNeighbor(ip)
}
}
}
}
}
func (n *network) programNeighbor(ip net.IP) {
peerMac, _, _, err := n.driver.peerDbSearch(n.id, ip)
if err != nil {
logrus.Errorf("Reprogramming on L3 miss failed for %s, no peer entry", ip)
return
}
s := n.getSubnetforIPAddr(ip)
if s == nil {
logrus.Errorf("Reprogramming on L3 miss failed for %s, not a valid subnet", ip)
return
}
sbox := n.sandbox()
if sbox == nil {
logrus.Errorf("Reprogramming on L3 miss failed for %s, overlay sandbox missing", ip)
return
}
if err := sbox.AddNeighbor(ip, peerMac, true, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
logrus.Errorf("Reprogramming on L3 miss failed for %s: %v", ip, err)
return
}
}
func (d *driver) addNetwork(n *network) {
d.Lock()
d.networks[n.id] = n
@@ -1052,6 +1085,15 @@ func (n *network) contains(ip net.IP) bool {
return false
}
func (n *network) getSubnetforIPAddr(ip net.IP) *subnet {
for _, s := range n.subnets {
if s.subnetIP.Contains(ip) {
return s
}
}
return nil
}
// getSubnetforIP returns the subnet to which the given IP belongs
func (n *network) getSubnetforIP(ip *net.IPNet) *subnet {
for _, s := range n.subnets {

View File

@@ -120,15 +120,10 @@ func (d *driver) processEvent(u serf.UserEvent) {
switch action {
case "join":
if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
net.ParseIP(vtepStr), true, false, false); err != nil {
logrus.Errorf("Peer add failed in the driver: %v\n", err)
}
d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr),
true, false, false, false)
case "leave":
if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
net.ParseIP(vtepStr), true); err != nil {
logrus.Errorf("Peer delete failed in the driver: %v\n", err)
}
d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), true)
}
}

View File

@@ -3,6 +3,7 @@ package overlay
//go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
import (
"context"
"fmt"
"net"
"sync"
@@ -50,6 +51,8 @@ type driver struct {
joinOnce sync.Once
localJoinOnce sync.Once
keys []*key
peerOpCh chan *peerOperation
peerOpCancel context.CancelFunc
sync.Mutex
}
@@ -64,10 +67,16 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
peerDb: peerNetworkMap{
mp: map[string]*peerMap{},
},
secMap: &encrMap{nodes: map[string][]*spi{}},
config: config,
secMap: &encrMap{nodes: map[string][]*spi{}},
config: config,
peerOpCh: make(chan *peerOperation),
}
// Launch the go routine for processing peer operations
ctx, cancel := context.WithCancel(context.Background())
d.peerOpCancel = cancel
go d.peerOpRoutine(ctx, d.peerOpCh)
if data, ok := config[netlabel.GlobalKVClient]; ok {
var err error
dsc, ok := data.(discoverapi.DatastoreConfigData)
@@ -161,7 +170,7 @@ func (d *driver) restoreEndpoints() error {
}
n.incEndpointCount()
d.peerDbAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true)
d.peerAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true)
}
return nil
}
@@ -170,6 +179,11 @@ func (d *driver) restoreEndpoints() error {
func Fini(drv driverapi.Driver) {
d := drv.(*driver)
// Notify the peer go routine to return
if d.peerOpCancel != nil {
d.peerOpCancel()
}
if d.exitCh != nil {
waitCh := make(chan struct{})

View File

@@ -1,12 +1,14 @@
package overlay
import (
"context"
"fmt"
"net"
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/common"
)
const ovPeerTable = "overlay_peer_table"
@@ -59,8 +61,6 @@ func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error {
return nil
}
var peerDbWg sync.WaitGroup
func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error {
d.peerDb.Lock()
nids := []string{}
@@ -141,8 +141,6 @@ func (d *driver) peerDbSearch(nid string, peerIP net.IP) (net.HardwareAddr, net.
func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, isLocal bool) {
peerDbWg.Wait()
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
if !ok {
@@ -173,7 +171,6 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask
func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP) peerEntry {
peerDbWg.Wait()
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
@@ -206,55 +203,109 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM
return pEntry
}
func (d *driver) peerDbUpdateSandbox(nid string) {
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
if !ok {
d.peerDb.Unlock()
return
}
d.peerDb.Unlock()
// The overlay uses a lazy initialization approach, this means that when a network is created
// and the driver registered the overlay does not allocate resources till the moment that a
// sandbox is actually created.
// At the moment of this call, that happens when a sandbox is initialized, is possible that
// networkDB has already delivered some events of peers already available on remote nodes,
// these peers are saved into the peerDB and this function is used to properly configure
// the network sandbox with all those peers that got previously notified.
// Note also that this method sends a single message on the channel and the go routine on the
// other side, will atomically loop on the whole table of peers and will program their state
// in one single atomic operation. This is fundamental to guarantee consistency, and avoid that
// new peerAdd or peerDelete gets reordered during the sandbox init.
func (d *driver) initSandboxPeerDB(nid string) {
d.peerInit(nid)
}
peerDbWg.Add(1)
type peerOperationType int32
var peerOps []func()
pMap.Lock()
for pKeyStr, pEntry := range pMap.mp {
var pKey peerKey
if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil {
logrus.Errorf("peer key scan failed: %v", err)
}
const (
peerOperationINIT peerOperationType = iota
peerOperationADD
peerOperationDELETE
)
if pEntry.isLocal {
continue
}
type peerOperation struct {
opType peerOperationType
networkID string
endpointID string
peerIP net.IP
peerIPMask net.IPMask
peerMac net.HardwareAddr
vtepIP net.IP
updateDB bool
l2Miss bool
l3Miss bool
localPeer bool
callerName string
}
// Go captures variables by reference. The pEntry could be
// pointing to the same memory location for every iteration. Make
// a copy of pEntry before capturing it in the following closure.
entry := pEntry
op := func() {
if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask,
pKey.peerMac, entry.vtep,
false, false, false); err != nil {
logrus.Errorf("peerdbupdate in sandbox failed for ip %s and mac %s: %v",
pKey.peerIP, pKey.peerMac, err)
func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) {
var err error
for {
select {
case <-ctx.Done():
return
case op := <-ch:
switch op.opType {
case peerOperationINIT:
err = d.peerInitOp(op.networkID)
case peerOperationADD:
err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.updateDB, op.l2Miss, op.l3Miss, op.localPeer)
case peerOperationDELETE:
err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer)
}
if err != nil {
logrus.Warnf("Peer operation failed:%s op:%v", err, op)
}
}
peerOps = append(peerOps, op)
}
pMap.Unlock()
}
for _, op := range peerOps {
op()
func (d *driver) peerInit(nid string) {
callerName := common.CallerName(1)
d.peerOpCh <- &peerOperation{
opType: peerOperationINIT,
networkID: nid,
callerName: callerName,
}
}
peerDbWg.Done()
func (d *driver) peerInitOp(nid string) error {
return d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
// Local entries do not need to be added
if pEntry.isLocal {
return false
}
d.peerAddOp(nid, pEntry.eid, pKey.peerIP, pEntry.peerIPMask, pKey.peerMac, pEntry.vtep, false, false, false, false)
// return false to loop on all entries
return false
})
}
func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss bool) error {
peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, localPeer bool) {
callerName := common.CallerName(1)
d.peerOpCh <- &peerOperation{
opType: peerOperationADD,
networkID: nid,
endpointID: eid,
peerIP: peerIP,
peerIPMask: peerIPMask,
peerMac: peerMac,
vtepIP: vtep,
updateDB: updateDb,
l2Miss: l2Miss,
l3Miss: l3Miss,
localPeer: localPeer,
callerName: callerName,
}
}
func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, updateOnlyDB bool) error {
if err := validateID(nid, eid); err != nil {
return err
@@ -262,6 +313,9 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
if updateDb {
d.peerDbAdd(nid, eid, peerIP, peerIPMask, peerMac, vtep, false)
if updateOnlyDB {
return nil
}
}
n := d.network(nid)
@@ -271,6 +325,9 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
sbox := n.sandbox()
if sbox == nil {
// We are hitting this case for all the events that are arriving before that the sandbox
// is being created. The peer got already added into the database and the sanbox init will
// call the peerDbUpdateSandbox that will configure all these peers from the database
return nil
}
@@ -311,6 +368,22 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
}
func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, updateDb bool) {
callerName := common.CallerName(1)
d.peerOpCh <- &peerOperation{
opType: peerOperationDELETE,
networkID: nid,
endpointID: eid,
peerIP: peerIP,
peerIPMask: peerIPMask,
peerMac: peerMac,
vtepIP: vtep,
updateDB: updateDb,
callerName: callerName,
}
}
func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
if err := validateID(nid, eid); err != nil {

View File

@@ -28,6 +28,9 @@ const (
// DNSServers of the network
DNSServers = "com.docker.network.windowsshim.dnsservers"
// MacPool of the network
MacPool = "com.docker.network.windowsshim.macpool"
// SourceMac of the network
SourceMac = "com.docker.network.windowsshim.sourcemac"

View File

@@ -38,6 +38,7 @@ type networkConfiguration struct {
VLAN uint
VSID uint
DNSServers string
MacPools []hcsshim.MacPool
DNSSuffix string
SourceMac string
NetworkAdapterName string
@@ -168,6 +169,18 @@ func (d *driver) parseNetworkOptions(id string, genericOptions map[string]string
config.DNSSuffix = value
case DNSServers:
config.DNSServers = value
case MacPool:
config.MacPools = make([]hcsshim.MacPool, 0)
s := strings.Split(value, ",")
if len(s)%2 != 0 {
return nil, types.BadRequestErrorf("Invalid mac pool. You must specify both a start range and an end range")
}
for i := 0; i < len(s)-1; i += 2 {
config.MacPools = append(config.MacPools, hcsshim.MacPool{
StartMacAddress: s[i],
EndMacAddress: s[i+1],
})
}
case VLAN:
vlan, err := strconv.ParseUint(value, 10, 32)
if err != nil {
@@ -274,6 +287,7 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
Subnets: subnets,
DNSServerList: config.DNSServers,
DNSSuffix: config.DNSSuffix,
MacPools: config.MacPools,
SourceMac: config.SourceMac,
NetworkAdapterName: config.NetworkAdapterName,
}

View File

@@ -16,5 +16,6 @@ func getInitializers(experimental bool) []initializer {
{windows.GetInit("l2bridge"), "l2bridge"},
{windows.GetInit("l2tunnel"), "l2tunnel"},
{windows.GetInit("nat"), "nat"},
{windows.GetInit("ics"), "ics"},
}
}

View File

@@ -154,9 +154,7 @@ func (epi *endpointInterface) CopyTo(dstEpi *endpointInterface) error {
dstEpi.v6PoolID = epi.v6PoolID
if len(epi.llAddrs) != 0 {
dstEpi.llAddrs = make([]*net.IPNet, 0, len(epi.llAddrs))
for _, ll := range epi.llAddrs {
dstEpi.llAddrs = append(dstEpi.llAddrs, ll)
}
dstEpi.llAddrs = append(dstEpi.llAddrs, epi.llAddrs...)
}
for _, route := range epi.routes {
@@ -415,7 +413,7 @@ func (epj *endpointJoinInfo) UnmarshalJSON(b []byte) error {
return err
}
if v, ok := epMap["gw"]; ok {
epj.gw6 = net.ParseIP(v.(string))
epj.gw = net.ParseIP(v.(string))
}
if v, ok := epMap["gw6"]; ok {
epj.gw6 = net.ParseIP(v.(string))
@@ -444,6 +442,6 @@ func (epj *endpointJoinInfo) CopyTo(dstEpj *endpointJoinInfo) error {
dstEpj.driverTableEntries = make([]*tableEntry, len(epj.driverTableEntries))
copy(dstEpj.driverTableEntries, epj.driverTableEntries)
dstEpj.gw = types.GetIPCopy(epj.gw)
dstEpj.gw = types.GetIPCopy(epj.gw6)
dstEpj.gw6 = types.GetIPCopy(epj.gw6)
return nil
}

View File

@@ -151,11 +151,11 @@ func ProgramChain(c *ChainInfo, bridgeName string, hairpinMode, enable bool) err
"-j", c.Name}
if !Exists(Nat, "PREROUTING", preroute...) && enable {
if err := c.Prerouting(Append, preroute...); err != nil {
return fmt.Errorf("Failed to inject docker in PREROUTING chain: %s", err)
return fmt.Errorf("Failed to inject %s in PREROUTING chain: %s", c.Name, err)
}
} else if Exists(Nat, "PREROUTING", preroute...) && !enable {
if err := c.Prerouting(Delete, preroute...); err != nil {
return fmt.Errorf("Failed to remove docker in PREROUTING chain: %s", err)
return fmt.Errorf("Failed to remove %s in PREROUTING chain: %s", c.Name, err)
}
}
output := []string{
@@ -167,11 +167,11 @@ func ProgramChain(c *ChainInfo, bridgeName string, hairpinMode, enable bool) err
}
if !Exists(Nat, "OUTPUT", output...) && enable {
if err := c.Output(Append, output...); err != nil {
return fmt.Errorf("Failed to inject docker in OUTPUT chain: %s", err)
return fmt.Errorf("Failed to inject %s in OUTPUT chain: %s", c.Name, err)
}
} else if Exists(Nat, "OUTPUT", output...) && !enable {
if err := c.Output(Delete, output...); err != nil {
return fmt.Errorf("Failed to inject docker in OUTPUT chain: %s", err)
return fmt.Errorf("Failed to inject %s in OUTPUT chain: %s", c.Name, err)
}
}
case Filter:

View File

@@ -434,15 +434,11 @@ func (n *network) applyConfigurationTo(to *network) error {
}
if len(n.ipamV4Config) > 0 {
to.ipamV4Config = make([]*IpamConf, 0, len(n.ipamV4Config))
for _, v4conf := range n.ipamV4Config {
to.ipamV4Config = append(to.ipamV4Config, v4conf)
}
to.ipamV4Config = append(to.ipamV4Config, n.ipamV4Config...)
}
if len(n.ipamV6Config) > 0 {
to.ipamV6Config = make([]*IpamConf, 0, len(n.ipamV6Config))
for _, v6conf := range n.ipamV6Config {
to.ipamV6Config = append(to.ipamV6Config, v6conf)
}
to.ipamV6Config = append(to.ipamV6Config, n.ipamV6Config...)
}
if len(n.generic) > 0 {
to.generic = options.Generic{}
@@ -873,8 +869,7 @@ func (n *network) resolveDriver(name string, load bool) (driverapi.Driver, *driv
d, cap := c.drvRegistry.Driver(name)
if d == nil {
if load {
var err error
err = c.loadDriver(name)
err := c.loadDriver(name)
if err != nil {
return nil, nil, err
}
@@ -1451,11 +1446,7 @@ func (n *network) ipamAllocate() error {
}
err = n.ipamAllocateVersion(6, ipam)
if err != nil {
return err
}
return nil
return err
}
func (n *network) requestPoolHelper(ipam ipamapi.Ipam, addressSpace, preferredPool, subPool string, options map[string]string, v6 bool) (string, *net.IPNet, map[string]string, error) {
@@ -1654,9 +1645,7 @@ func (n *network) getIPInfo(ipVer int) []*IpamInfo {
}
l := make([]*IpamInfo, 0, len(info))
n.Lock()
for _, d := range info {
l = append(l, d)
}
l = append(l, info...)
n.Unlock()
return l
}
@@ -1870,7 +1859,7 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) {
// the docker network domain. If the network is not v6 enabled
// set ipv6Miss to filter the DNS query from going to external
// resolvers.
if ok && n.enableIPv6 == false {
if ok && !n.enableIPv6 {
ipv6Miss = true
}
ipSet, ok = sr.svcIPv6Map.Get(req)

View File

@@ -29,7 +29,7 @@ func executeInCompartment(compartmentID uint32, x func()) {
func (n *network) startResolver() {
n.resolverOnce.Do(func() {
logrus.Debugf("Launching DNS server for network", n.Name())
logrus.Debugf("Launching DNS server for network %q", n.Name())
options := n.Info().DriverOptions()
hnsid := options[windows.HNSID]

View File

@@ -114,7 +114,8 @@ type tableEventMessage struct {
}
func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
return false
otherm := other.(*tableEventMessage)
return m.tname == otherm.tname && m.id == otherm.id && m.key == otherm.key
}
func (m *tableEventMessage) Message() []byte {

View File

@@ -98,10 +98,14 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
}
func (nDB *NetworkDB) clusterInit() error {
nDB.lastStatsTimestamp = time.Now()
nDB.lastHealthTimestamp = nDB.lastStatsTimestamp
config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.BindAddr = nDB.config.BindAddr
config.AdvertiseAddr = nDB.config.AdvertiseAddr
config.UDPBufferSize = nDB.config.PacketBufferSize
if nDB.config.BindPort != 0 {
config.BindPort = nDB.config.BindPort
@@ -199,9 +203,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist
if _, err := mlist.Join(members); err != nil {
// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)
return fmt.Errorf("could not join node to memberlist: %v", err)
}
@@ -287,13 +290,6 @@ func (nDB *NetworkDB) reconnectNode() {
return
}
// Update all the local table state to a new time to
// force update on the node we are trying to rejoin, just in
// case that node has these in deleting state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
nDB.updateLocalTableTime()
logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
nDB.bulkSync([]string{node.Name}, true)
}
@@ -310,12 +306,11 @@ func (nDB *NetworkDB) reapState() {
func (nDB *NetworkDB) reapNetworks() {
nDB.Lock()
for name, nn := range nDB.networks {
for _, nn := range nDB.networks {
for id, n := range nn {
if n.leaving {
if n.reapTime <= 0 {
delete(nn, id)
nDB.deleteNetworkNode(id, name)
continue
}
n.reapTime -= reapPeriod
@@ -373,11 +368,21 @@ func (nDB *NetworkDB) gossip() {
networkNodes[nid] = nDB.networkNodes[nid]
}
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod
nDB.RUnlock()
if printHealth {
healthScore := nDB.memberlist.GetHealthScore()
if healthScore != 0 {
logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore)
}
nDB.lastHealthTimestamp = time.Now()
}
for nid, nodes := range networkNodes {
mNodes := nDB.mRandomNodes(3, nodes)
bytesAvail := udpSendBuf - compoundHeaderOverhead
bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead
nDB.RLock()
network, ok := thisNodeNetworks[nid]
@@ -398,6 +403,14 @@ func (nDB *NetworkDB) gossip() {
}
msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}
if len(msgs) == 0 {
continue
}
@@ -415,11 +428,15 @@ func (nDB *NetworkDB) gossip() {
}
// Send the compound message
if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
if err := nDB.memberlist.SendBestEffort(&mnode.Node, compound); err != nil {
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
}
}
}
// Reset the stats
if printStats {
nDB.lastStatsTimestamp = time.Now()
}
}
func (nDB *NetworkDB) bulkSyncTables() {
@@ -590,7 +607,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
nDB.bulkSyncAckTbl[node] = ch
nDB.Unlock()
err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
err = nDB.memberlist.SendReliable(&mnode.Node, buf)
if err != nil {
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)
@@ -607,7 +624,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
case <-t.C:
logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch:
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
}
t.Stop()
}

View File

@@ -104,6 +104,9 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
}
n = nDB.checkAndGetNode(nEvent)
if n == nil {
return false
}
nDB.purgeSameNode(n)
n.ltime = nEvent.LTime
@@ -111,9 +114,12 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
switch nEvent.Type {
case NodeEventTypeJoin:
nDB.Lock()
_, found := nDB.nodes[n.Name]
nDB.nodes[n.Name] = n
nDB.Unlock()
logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
if !found {
logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
}
return true
case NodeEventTypeLeave:
nDB.Lock()
@@ -127,25 +133,12 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
}
func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
var flushEntries bool
// Update our local clock if the received messages has newer
// time.
nDB.networkClock.Witness(nEvent.LTime)
nDB.Lock()
defer func() {
nDB.Unlock()
// When a node leaves a network on the last task removal cleanup the
// local entries for this network & node combination. When the tasks
// on a network are removed we could have missed the gossip updates.
// Not doing this cleanup can leave stale entries because bulksyncs
// from the node will no longer include this network state.
//
// deleteNodeNetworkEntries takes nDB lock.
if flushEntries {
nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
}
}()
defer nDB.Unlock()
if nEvent.NodeName == nDB.config.NodeName {
return false
@@ -173,10 +166,20 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
n.leaving = nEvent.Type == NetworkEventTypeLeave
if n.leaving {
n.reapTime = reapInterval
flushEntries = true
// The remote node is leaving the network, but not the gossip cluster.
// Mark all its entries in deleted state, this will guarantee that
// if some node bulk sync with us, the deleted state of
// these entries will be propagated.
nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
}
if nEvent.Type == NetworkEventTypeLeave {
nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName)
} else {
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
}
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
return true
}
@@ -203,17 +206,22 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
nDB.RLock()
networks := nDB.networks[nDB.config.NodeName]
network, ok := networks[tEvent.NetworkID]
nDB.RUnlock()
if !ok || network.leaving {
return true
// Check if the owner of the event is still part of the network
nodes := nDB.networkNodes[tEvent.NetworkID]
var nodePresent bool
for _, node := range nodes {
if node == tEvent.NodeName {
nodePresent = true
break
}
}
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we don't have the entry here nothing to do.
nDB.RUnlock()
if !ok || network.leaving || !nodePresent {
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
return false
}
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
if err == nil {
// We have the latest state. Ignore the event
// since it is stale.
@@ -238,6 +246,11 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
nDB.Unlock()
if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we didn't have the entry here don't repropagate
return true
}
var op opType
switch tEvent.Type {
case TableEventTypeCreate:
@@ -278,8 +291,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
return
}
// Do not rebroadcast a bulk sync
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
var err error
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
if err != nil {

View File

@@ -45,9 +45,12 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
var failed bool
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
e.broadcastNodeEvent(mn.Addr, opDelete)
e.nDB.deleteNodeTableEntries(mn.Name)
e.nDB.deleteNetworkEntriesForNode(mn.Name)
// The node left or failed, delete all the entries created by it.
// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
// If the node instead left because was going down, then it makes sense to just delete all its state
e.nDB.Lock()
e.nDB.deleteNetworkEntriesForNode(mn.Name)
e.nDB.deleteNodeTableEntries(mn.Name)
if n, ok := e.nDB.nodes[mn.Name]; ok {
delete(e.nDB.nodes, mn.Name)
@@ -61,7 +64,6 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
if failed {
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
}
}
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {

View File

@@ -3,10 +3,6 @@ package networkdb
import "github.com/gogo/protobuf/proto"
const (
// Max udp message size chosen to avoid network packet
// fragmentation.
udpSendBuf = 1400
// Compound message header overhead 1 byte(message type) + 4
// bytes (num messages)
compoundHeaderOverhead = 5

View File

@@ -1,10 +1,11 @@
package networkdb
//go:generate protoc -I.:../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
//go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
import (
"fmt"
"net"
"os"
"strings"
"sync"
"time"
@@ -93,6 +94,12 @@ type NetworkDB struct {
// bootStrapIP is the list of IPs that can be used to bootstrap
// the gossip.
bootStrapIP []net.IP
// lastStatsTimestamp is the last timestamp when the stats got printed
lastStatsTimestamp time.Time
// lastHealthTimestamp is the last timestamp when the health score got printed
lastHealthTimestamp time.Time
}
// PeerInfo represents the peer (gossip cluster) nodes of a network
@@ -101,6 +108,11 @@ type PeerInfo struct {
IP string
}
// PeerClusterInfo represents the peer (gossip cluster) nodes
type PeerClusterInfo struct {
PeerInfo
}
type node struct {
memberlist.Node
ltime serf.LamportTime
@@ -126,6 +138,9 @@ type network struct {
// The broadcast queue for table event gossip. This is only
// initialized for this node's network attachment entries.
tableBroadcasts *memberlist.TransmitLimitedQueue
// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int
}
// Config represents the configuration of the networdb instance and
@@ -149,6 +164,21 @@ type Config struct {
// Keys to be added to the Keyring of the memberlist. Key at index
// 0 is the primary key
Keys [][]byte
// PacketBufferSize is the maximum number of bytes that memberlist will
// put in a packet (this will be for UDP packets by default with a NetTransport).
// A safe value for this is typically 1400 bytes (which is the default). However,
// depending on your network's MTU (Maximum Transmission Unit) you may
// be able to increase this to get more content into each gossip packet.
PacketBufferSize int
// StatsPrintPeriod the period to use to print queue stats
// Default is 5min
StatsPrintPeriod time.Duration
// HealthPrintPeriod the period to use to print the health score
// Default is 1min
HealthPrintPeriod time.Duration
}
// entry defines a table entry
@@ -171,6 +201,18 @@ type entry struct {
reapTime time.Duration
}
// DefaultConfig returns a NetworkDB config with default values
func DefaultConfig() *Config {
hostname, _ := os.Hostname()
return &Config{
NodeName: hostname,
BindAddr: "0.0.0.0",
PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Minute,
HealthPrintPeriod: 1 * time.Minute,
}
}
// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
@@ -200,6 +242,7 @@ func New(c *Config) (*NetworkDB, error) {
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
nDB.Lock()
nDB.bootStrapIP = make([]net.IP, 0, len(members))
for _, m := range members {
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
}
@@ -215,6 +258,20 @@ func (nDB *NetworkDB) Close() {
}
}
// ClusterPeers returns all the gossip cluster peers.
func (nDB *NetworkDB) ClusterPeers() []PeerInfo {
nDB.RLock()
defer nDB.RUnlock()
peers := make([]PeerInfo, 0, len(nDB.nodes))
for _, node := range nDB.nodes {
peers = append(peers, PeerInfo{
Name: node.Name,
IP: node.Node.Addr.String(),
})
}
return peers
}
// Peers returns the gossip peers for a given network.
func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
nDB.RLock()
@@ -361,7 +418,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
}
func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
nDB.Lock()
for nid, nodes := range nDB.networkNodes {
updatedNodes := make([]string, 0, len(nodes))
for _, node := range nodes {
@@ -376,11 +432,25 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
}
delete(nDB.networks, deletedNode)
nDB.Unlock()
}
// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
// 1) when a notification is coming of a node leaving the network
// - Walk all the network entries and mark the leaving node's entries for deletion
// These will be garbage collected when the reap timer will expire
// 2) when the local node is leaving the network
// - Walk all the network entries:
// A) if the entry is owned by the local node
// then we will mark it for deletion. This will ensure that if a node did not
// yet received the notification that the local node is leaving, will be aware
// of the entries to be deleted.
// B) if the entry is owned by a remote node, then we can safely delete it. This
// ensures that if we join back this network as we receive the CREATE event for
// entries owned by remote nodes, we will accept them and we notify the application
func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
nDB.Lock()
// Indicates if the delete is triggered for the local node
isNodeLocal := node == nDB.config.NodeName
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
func(path string, v interface{}) bool {
oldEntry := v.(*entry)
@@ -389,7 +459,15 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
tname := params[1]
key := params[2]
if oldEntry.node != node {
// If the entry is owned by a remote node and this node is not leaving the network
if oldEntry.node != node && !isNodeLocal {
// Don't do anything because the event is triggered for a node that does not own this entry
return false
}
// If this entry is already marked for deletion and this node is not leaving the network
if oldEntry.deleting && !isNodeLocal {
// Don't do anything this entry will be already garbage collected using the old reapTime
return false
}
@@ -401,17 +479,29 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
reapTime: reapInterval,
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
// we arrived at this point in 2 cases:
// 1) this entry is owned by the node that is leaving the network
// 2) the local node is leaving the network
if oldEntry.node == node {
if isNodeLocal {
// TODO fcrisciani: this can be removed if there is no way to leave the network
// without doing a delete of all the objects
entry.ltime++
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
} else {
// the local node is leaving the network, all the entries of remote nodes can be safely removed
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
}
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
return false
})
nDB.Unlock()
}
func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nDB.Lock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
oldEntry := v.(*entry)
if oldEntry.node != node {
@@ -423,27 +513,18 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nid := params[1]
key := params[2]
entry := &entry{
ltime: oldEntry.ltime,
node: node,
value: oldEntry.value,
deleting: true,
reapTime: reapInterval,
}
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
return false
})
nDB.Unlock()
}
// WalkTable walks a single table in NetworkDB and invokes the passed
// function for each entry in the table passing the network, key,
// value. The walk stops if the passed function returns a true.
func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
nDB.RLock()
values := make(map[string]interface{})
nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
@@ -456,7 +537,7 @@ func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bo
params := strings.Split(k[1:], "/")
nid := params[1]
key := params[2]
if fn(nid, key, v.(*entry).value) {
if fn(nid, key, v.(*entry).value, v.(*entry).deleting) {
return nil
}
}
@@ -481,13 +562,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
num := len(nDB.networkNodes[nid])
nDB.RUnlock()
return num
defer nDB.RUnlock()
return len(nDB.networkNodes[nid])
},
RetransmitMult: 4,
}
nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
nDB.addNetworkNode(nid, nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid]
nDB.Unlock()
@@ -517,35 +597,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
nDB.Lock()
defer nDB.Unlock()
var (
paths []string
entries []*entry
)
nwWalker := func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}
paths = append(paths, path)
entries = append(entries, entry)
return false
}
// Remove myself from the list of the nodes participating to the network
nDB.deleteNetworkNode(nid, nDB.config.NodeName)
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker)
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[1]
key := params[2]
if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
}
// Update all the local entries marking them for deletion and delete all the remote entries
nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName)
nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
if !ok {
@@ -558,6 +615,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
}
n.ltime = ltime
n.reapTime = reapInterval
n.leaving = true
return nil
}
@@ -580,7 +638,10 @@ func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
// passed network. Caller should hold the NetworkDB lock while calling
// this
func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
nodes := nDB.networkNodes[nid]
nodes, ok := nDB.networkNodes[nid]
if !ok || len(nodes) == 0 {
return
}
newNodes := make([]string, 0, len(nodes)-1)
for _, name := range nodes {
if name == nodeName {
@@ -618,27 +679,3 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
n.ltime = ltime
}
}
func (nDB *NetworkDB) updateLocalTableTime() {
nDB.Lock()
defer nDB.Unlock()
ltime := nDB.tableClock.Increment()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry := v.(*entry)
if entry.node != nDB.config.NodeName {
return false
}
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]
entry.ltime = ltime
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
return false
})
}

View File

@@ -0,0 +1,242 @@
package networkdb
import (
"fmt"
"net/http"
"strings"
"github.com/docker/libnetwork/diagnose"
)
const (
missingParameter = "missing parameter"
)
// NetDbPaths2Func TODO
var NetDbPaths2Func = map[string]diagnose.HTTPHandlerFunc{
"/join": dbJoin,
"/networkpeers": dbPeers,
"/clusterpeers": dbClusterPeers,
"/joinnetwork": dbJoinNetwork,
"/leavenetwork": dbLeaveNetwork,
"/createentry": dbCreateEntry,
"/updateentry": dbUpdateEntry,
"/deleteentry": dbDeleteEntry,
"/getentry": dbGetEntry,
"/gettable": dbGetTable,
}
func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["members"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path))
return
}
nDB, ok := ctx.(*NetworkDB)
if ok {
err := nDB.Join(strings.Split(r.Form["members"][0], ","))
if err != nil {
fmt.Fprintf(w, "%s error in the DB join %s\n", r.URL.Path, err)
return
}
fmt.Fprintf(w, "OK\n")
}
}
func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["nid"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
return
}
nDB, ok := ctx.(*NetworkDB)
if ok {
peers := nDB.Peers(r.Form["nid"][0])
fmt.Fprintf(w, "Network:%s Total peers: %d\n", r.Form["nid"], len(peers))
for i, peerInfo := range peers {
fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP)
}
}
}
func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
nDB, ok := ctx.(*NetworkDB)
if ok {
peers := nDB.ClusterPeers()
fmt.Fprintf(w, "Total peers: %d\n", len(peers))
for i, peerInfo := range peers {
fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP)
}
}
}
func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 ||
len(r.Form["key"]) < 1 ||
len(r.Form["value"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
return
}
tname := r.Form["tname"][0]
nid := r.Form["nid"][0]
key := r.Form["key"][0]
value := r.Form["value"][0]
nDB, ok := ctx.(*NetworkDB)
if ok {
if err := nDB.CreateEntry(tname, nid, key, []byte(value)); err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
return
}
fmt.Fprintf(w, "OK\n")
}
}
func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 ||
len(r.Form["key"]) < 1 ||
len(r.Form["value"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
return
}
tname := r.Form["tname"][0]
nid := r.Form["nid"][0]
key := r.Form["key"][0]
value := r.Form["value"][0]
nDB, ok := ctx.(*NetworkDB)
if ok {
if err := nDB.UpdateEntry(tname, nid, key, []byte(value)); err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
return
}
fmt.Fprintf(w, "OK\n")
}
}
func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 ||
len(r.Form["key"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
return
}
tname := r.Form["tname"][0]
nid := r.Form["nid"][0]
key := r.Form["key"][0]
nDB, ok := ctx.(*NetworkDB)
if ok {
err := nDB.DeleteEntry(tname, nid, key)
if err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
return
}
fmt.Fprintf(w, "OK\n")
}
}
func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 ||
len(r.Form["key"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
return
}
tname := r.Form["tname"][0]
nid := r.Form["nid"][0]
key := r.Form["key"][0]
nDB, ok := ctx.(*NetworkDB)
if ok {
value, err := nDB.GetEntry(tname, nid, key)
if err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
return
}
fmt.Fprintf(w, "key:`%s` value:`%s`\n", key, string(value))
}
}
func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["nid"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
return
}
nid := r.Form["nid"][0]
nDB, ok := ctx.(*NetworkDB)
if ok {
if err := nDB.JoinNetwork(nid); err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
return
}
fmt.Fprintf(w, "OK\n")
}
}
func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["nid"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
return
}
nid := r.Form["nid"][0]
nDB, ok := ctx.(*NetworkDB)
if ok {
if err := nDB.LeaveNetwork(nid); err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
return
}
fmt.Fprintf(w, "OK\n")
}
}
func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path))
return
}
tname := r.Form["tname"][0]
nid := r.Form["nid"][0]
nDB, ok := ctx.(*NetworkDB)
if ok {
table := nDB.GetTableByNetwork(tname, nid)
fmt.Fprintf(w, "total elements: %d\n", len(table))
i := 0
for k, v := range table {
fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, string(v.([]byte)))
i++
}
}
}

View File

@@ -91,9 +91,7 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr,
if nh.linkDst != "" {
nlnh.LinkIndex = iface.Attrs().Index
}
if err := nlh.NeighDel(nlnh); err != nil {
logrus.Warnf("Deleting bridge mac mac %s failed, %v", dstMac, err)
}
nlh.NeighDel(nlnh)
}
}

View File

@@ -446,7 +446,7 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
defer co.Close()
// limits the number of outstanding concurrent queries.
if r.forwardQueryStart() == false {
if !r.forwardQueryStart() {
old := r.tStamp
r.tStamp = time.Now()
if r.tStamp.Sub(old) > logInterval {

View File

@@ -621,7 +621,7 @@ func (sb *sandbox) resolveName(req string, networkName string, epList []*endpoin
func (sb *sandbox) SetKey(basePath string) error {
start := time.Now()
defer func() {
logrus.Debugf("sandbox set key processing took %s for container %s", time.Now().Sub(start), sb.ContainerID())
logrus.Debugf("sandbox set key processing took %s for container %s", time.Since(start), sb.ContainerID())
}()
if basePath == "" {
@@ -773,9 +773,7 @@ func (sb *sandbox) restoreOslSandbox() error {
}
Ifaces[fmt.Sprintf("%s+%s", i.srcName, i.dstPrefix)] = ifaceOptions
if joinInfo != nil {
for _, r := range joinInfo.StaticRoutes {
routes = append(routes, r)
}
routes = append(routes, joinInfo.StaticRoutes...)
}
if ep.needResolver() {
sb.startResolver(true)
@@ -789,11 +787,7 @@ func (sb *sandbox) restoreOslSandbox() error {
// restore osl sandbox
err := sb.osSbox.Restore(Ifaces, routes, gwep.joinInfo.gw, gwep.joinInfo.gw6)
if err != nil {
return err
}
return nil
return err
}
func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
@@ -958,9 +952,7 @@ func (sb *sandbox) joinLeaveStart() {
joinLeaveDone := sb.joinLeaveDone
sb.Unlock()
select {
case <-joinLeaveDone:
}
<-joinLeaveDone
sb.Lock()
}

View File

@@ -52,7 +52,6 @@ func processSetKeyReexec() {
controllerID := os.Args[2]
err = SetExternalKey(controllerID, containerID, fmt.Sprintf("/proc/%d/ns/net", state.Pid))
return
}
// SetExternalKey provides a convenient way to set an External key to a sandbox

View File

@@ -115,9 +115,7 @@ func (sbs *sbState) CopyTo(o datastore.KVObject) error {
dstSbs.dbExists = sbs.dbExists
dstSbs.EpPriority = sbs.EpPriority
for _, eps := range sbs.Eps {
dstSbs.Eps = append(dstSbs.Eps, eps)
}
dstSbs.Eps = append(dstSbs.Eps, sbs.Eps...)
if len(sbs.ExtDNS2) > 0 {
for _, dns := range sbs.ExtDNS2 {

View File

@@ -372,6 +372,7 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
if err := iptables.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil {
return fmt.Errorf("failed to add jump rule to %s in filter table forward chain: %v", ingressChain, err)
}
arrangeUserFilterRule()
}
oifName, err := findOIFName(gwIP)
@@ -438,7 +439,9 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
return nil
}
// In the filter table FORWARD chain first rule should be to jump to INGRESS-CHAIN
// In the filter table FORWARD chain the first rule should be to jump to
// DOCKER-USER so the user is able to filter packet first.
// The second rule should be jump to INGRESS-CHAIN.
// This chain has the rules to allow access to the published ports for swarm tasks
// from local bridge networks and docker_gwbridge (ie:taks on other swarm netwroks)
func arrangeIngressFilterRule() {

View File

@@ -57,12 +57,12 @@ func New(store *store.MemoryStore) *LogBroker {
}
}
// Run the log broker
func (lb *LogBroker) Run(ctx context.Context) error {
// Start starts the log broker
func (lb *LogBroker) Start(ctx context.Context) error {
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.cancelAll != nil {
lb.mu.Unlock()
return errAlreadyRunning
}
@@ -71,12 +71,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
lb.subscriptionQueue = watch.NewQueue()
lb.registeredSubscriptions = make(map[string]*subscription)
lb.subscriptionsByNode = make(map[string]map[*subscription]struct{})
lb.mu.Unlock()
select {
case <-lb.pctx.Done():
return lb.pctx.Err()
}
return nil
}
// Stop stops the log broker
@@ -234,8 +229,15 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
return err
}
lb.mu.Lock()
pctx := lb.pctx
lb.mu.Unlock()
if pctx == nil {
return errNotRunning
}
subscription := lb.newSubscription(request.Selector, request.Options)
subscription.Run(lb.pctx)
subscription.Run(pctx)
defer subscription.Stop()
log := log.G(ctx).WithFields(
@@ -257,8 +259,8 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
select {
case <-ctx.Done():
return ctx.Err()
case <-lb.pctx.Done():
return lb.pctx.Err()
case <-pctx.Done():
return pctx.Err()
case event := <-publishCh:
publish := event.(*logMessage)
if publish.completed {
@@ -308,6 +310,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
return err
}
lb.mu.Lock()
pctx := lb.pctx
lb.mu.Unlock()
if pctx == nil {
return errNotRunning
}
lb.nodeConnected(remote.NodeID)
defer lb.nodeDisconnected(remote.NodeID)
@@ -329,7 +338,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
select {
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
case <-pctx.Done():
return nil
default:
}
@@ -362,7 +371,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
}
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
case <-pctx.Done():
return nil
}
}

View File

@@ -130,6 +130,7 @@ type Manager struct {
caserver *ca.Server
dispatcher *dispatcher.Dispatcher
logbroker *logbroker.LogBroker
watchServer *watchapi.Server
replicatedOrchestrator *replicated.Orchestrator
globalOrchestrator *global.Orchestrator
taskReaper *taskreaper.TaskReaper
@@ -221,6 +222,7 @@ func New(config *Config) (*Manager, error) {
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths),
dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig(), drivers.New(config.PluginGetter)),
logbroker: logbroker.New(raftNode.MemoryStore()),
watchServer: watchapi.NewServer(raftNode.MemoryStore()),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
raftNode: raftNode,
@@ -398,13 +400,12 @@ func (m *Manager) Run(parent context.Context) error {
}
baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.caserver, m.config.PluginGetter)
baseWatchAPI := watchapi.NewServer(m.raftNode.MemoryStore())
baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
healthServer := health.NewHealthServer()
localHealthServer := health.NewHealthServer()
authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(baseWatchAPI, authorize)
authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize)
authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
@@ -477,7 +478,7 @@ func (m *Manager) Run(parent context.Context) error {
grpc_prometheus.Register(m.server)
api.RegisterControlServer(m.localserver, localProxyControlAPI)
api.RegisterWatchServer(m.localserver, baseWatchAPI)
api.RegisterWatchServer(m.localserver, m.watchServer)
api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
api.RegisterHealthServer(m.localserver, localHealthServer)
api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
@@ -490,6 +491,10 @@ func (m *Manager) Run(parent context.Context) error {
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
if err := m.watchServer.Start(ctx); err != nil {
log.G(ctx).WithError(err).Error("watch server failed to start")
}
go m.serveListener(ctx, m.remoteListener)
go m.serveListener(ctx, m.controlListener)
@@ -565,8 +570,8 @@ func (m *Manager) Run(parent context.Context) error {
const stopTimeout = 8 * time.Second
// Stop stops the manager. It immediately closes all open connections and
// active RPCs as well as stopping the scheduler. If clearData is set, the
// raft logs, snapshots, and keys will be erased.
// active RPCs as well as stopping the manager's subsystems. If clearData is
// set, the raft logs, snapshots, and keys will be erased.
func (m *Manager) Stop(ctx context.Context, clearData bool) {
log.G(ctx).Info("Stopping manager")
// It's not safe to start shutting down while the manager is still
@@ -600,6 +605,7 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) {
m.dispatcher.Stop()
m.logbroker.Stop()
m.watchServer.Stop()
m.caserver.Stop()
if m.allocator != nil {
@@ -1001,11 +1007,9 @@ func (m *Manager) becomeLeader(ctx context.Context) {
}
}(m.dispatcher)
go func(lb *logbroker.LogBroker) {
if err := lb.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("LogBroker exited with an error")
}
}(m.logbroker)
if err := m.logbroker.Start(ctx); err != nil {
log.G(ctx).WithError(err).Error("LogBroker failed to start")
}
go func(server *ca.Server) {
if err := server.Run(ctx); err != nil {

View File

@@ -169,12 +169,6 @@ func (g *Orchestrator) Run(ctx context.Context) error {
delete(g.nodes, v.Node.ID)
case api.EventUpdateTask:
g.handleTaskChange(ctx, v.Task)
case api.EventDeleteTask:
// CLI allows deleting task
if _, exists := g.globalServices[v.Task.ServiceID]; !exists {
continue
}
g.reconcileServicesOneNode(ctx, []string{v.Task.ServiceID}, v.Task.NodeID)
}
case <-g.stopChan:
return nil
@@ -216,7 +210,7 @@ func (g *Orchestrator) handleTaskChange(ctx context.Context, t *api.Task) {
if _, exists := g.globalServices[t.ServiceID]; !exists {
return
}
// if a task's DesiredState has past running, which
// if a task's DesiredState has passed running, it
// means the task has been processed
if t.DesiredState > api.TaskStateRunning {
return
@@ -264,7 +258,6 @@ func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node,
}
func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []string) {
nodeCompleted := make(map[string]map[string]struct{})
nodeTasks := make(map[string]map[string][]*api.Task)
g.store.View(func(tx store.ReadTx) {
@@ -275,8 +268,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
continue
}
// a node may have completed this service
nodeCompleted[serviceID] = make(map[string]struct{})
// nodeID -> task list
nodeTasks[serviceID] = make(map[string][]*api.Task)
@@ -284,11 +275,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
if t.DesiredState <= api.TaskStateRunning {
// Collect all running instances of this service
nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
} else {
// for finished tasks, check restartPolicy
if isTaskCompleted(t, orchestrator.RestartCondition(t)) {
nodeCompleted[serviceID][t.NodeID] = struct{}{}
}
}
}
}
@@ -311,9 +297,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
ntasks := nodeTasks[serviceID][nodeID]
delete(nodeTasks[serviceID], nodeID)
// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
if !meetsConstraints {
g.shutdownTasks(ctx, batch, ntasks)
continue
}
@@ -400,8 +384,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
return
}
// whether each service has completed on the node
completed := make(map[string]bool)
// tasks by service
tasks := make(map[string][]*api.Task)
@@ -425,10 +407,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
}
if t.DesiredState <= api.TaskStateRunning {
tasks[serviceID] = append(tasks[serviceID], t)
} else {
if isTaskCompleted(t, orchestrator.RestartCondition(t)) {
completed[serviceID] = true
}
}
}
}
@@ -444,13 +422,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
continue
}
// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed[serviceID] {
g.shutdownTasks(ctx, batch, tasks[serviceID])
continue
}
if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update tasks
continue

View File

@@ -30,6 +30,13 @@ type instanceRestartInfo struct {
// Restart.MaxAttempts and Restart.Window are both
// nonzero.
restartedInstances *list.List
// Why is specVersion in this structure and not in the map key? While
// putting it in the key would be a very simple solution, it wouldn't
// be easy to clean up map entries corresponding to old specVersions.
// Making the key version-agnostic and clearing the value whenever the
// version changes avoids the issue of stale map entries for old
// versions.
specVersion api.Version
}
type delayedStart struct {
@@ -54,8 +61,7 @@ type Supervisor struct {
mu sync.Mutex
store *store.MemoryStore
delays map[string]*delayedStart
history map[instanceTuple]*instanceRestartInfo
historyByService map[string]map[instanceTuple]struct{}
historyByService map[string]map[instanceTuple]*instanceRestartInfo
TaskTimeout time.Duration
}
@@ -64,8 +70,7 @@ func NewSupervisor(store *store.MemoryStore) *Supervisor {
return &Supervisor{
store: store,
delays: make(map[string]*delayedStart),
history: make(map[instanceTuple]*instanceRestartInfo),
historyByService: make(map[string]map[instanceTuple]struct{}),
historyByService: make(map[string]map[instanceTuple]*instanceRestartInfo),
TaskTimeout: defaultOldTaskTimeout,
}
}
@@ -214,8 +219,8 @@ func (r *Supervisor) shouldRestart(ctx context.Context, t *api.Task, service *ap
r.mu.Lock()
defer r.mu.Unlock()
restartInfo := r.history[instanceTuple]
if restartInfo == nil {
restartInfo := r.historyByService[t.ServiceID][instanceTuple]
if restartInfo == nil || (t.SpecVersion != nil && *t.SpecVersion != restartInfo.specVersion) {
return true
}
@@ -268,17 +273,26 @@ func (r *Supervisor) recordRestartHistory(restartTask *api.Task) {
r.mu.Lock()
defer r.mu.Unlock()
if r.history[tuple] == nil {
r.history[tuple] = &instanceRestartInfo{}
}
restartInfo := r.history[tuple]
restartInfo.totalRestarts++
if r.historyByService[restartTask.ServiceID] == nil {
r.historyByService[restartTask.ServiceID] = make(map[instanceTuple]struct{})
r.historyByService[restartTask.ServiceID] = make(map[instanceTuple]*instanceRestartInfo)
}
r.historyByService[restartTask.ServiceID][tuple] = struct{}{}
if r.historyByService[restartTask.ServiceID][tuple] == nil {
r.historyByService[restartTask.ServiceID][tuple] = &instanceRestartInfo{}
}
restartInfo := r.historyByService[restartTask.ServiceID][tuple]
if restartTask.SpecVersion != nil && *restartTask.SpecVersion != restartInfo.specVersion {
// This task has a different SpecVersion from the one we're
// tracking. Most likely, the service was updated. Past failures
// shouldn't count against the new service definition, so clear
// the history for this instance.
*restartInfo = instanceRestartInfo{
specVersion: *restartTask.SpecVersion,
}
}
restartInfo.totalRestarts++
if restartTask.Spec.Restart.Window != nil && (restartTask.Spec.Restart.Window.Seconds != 0 || restartTask.Spec.Restart.Window.Nanos != 0) {
if restartInfo.restartedInstances == nil {
@@ -432,16 +446,6 @@ func (r *Supervisor) CancelAll() {
// ClearServiceHistory forgets restart history related to a given service ID.
func (r *Supervisor) ClearServiceHistory(serviceID string) {
r.mu.Lock()
defer r.mu.Unlock()
tuples := r.historyByService[serviceID]
if tuples == nil {
return
}
delete(r.historyByService, serviceID)
for t := range tuples {
delete(r.history, t)
}
r.mu.Unlock()
}

View File

@@ -601,7 +601,9 @@ func (u *Updater) rollbackUpdate(ctx context.Context, serviceID, message string)
return errors.New("cannot roll back service because no previous spec is available")
}
service.Spec = *service.PreviousSpec
service.SpecVersion = service.PreviousSpecVersion.Copy()
service.PreviousSpec = nil
service.PreviousSpecVersion = nil
return store.UpdateService(tx, service)
})

View File

@@ -1,12 +1,24 @@
package watchapi
import (
"errors"
"sync"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
)
var (
errAlreadyRunning = errors.New("broker is already running")
errNotRunning = errors.New("broker is not running")
)
// Server is the store API gRPC server.
type Server struct {
store *store.MemoryStore
store *store.MemoryStore
mu sync.Mutex
pctx context.Context
cancelAll func()
}
// NewServer creates a store API server.
@@ -15,3 +27,30 @@ func NewServer(store *store.MemoryStore) *Server {
store: store,
}
}
// Start starts the watch server.
func (s *Server) Start(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.cancelAll != nil {
return errAlreadyRunning
}
s.pctx, s.cancelAll = context.WithCancel(ctx)
return nil
}
// Stop stops the watch server.
func (s *Server) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.cancelAll == nil {
return errNotRunning
}
s.cancelAll()
s.cancelAll = nil
return nil
}

View File

@@ -17,6 +17,13 @@ import (
func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer) error {
ctx := stream.Context()
s.mu.Lock()
pctx := s.pctx
s.mu.Unlock()
if pctx == nil {
return errNotRunning
}
watchArgs, err := api.ConvertWatchArgs(request.Entries)
if err != nil {
return grpc.Errorf(codes.InvalidArgument, "%s", err.Error())
@@ -39,6 +46,8 @@ func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer)
select {
case <-ctx.Done():
return ctx.Err()
case <-pctx.Done():
return pctx.Err()
case event := <-watch:
if commitEvent, ok := event.(state.EventCommit); ok && len(events) > 0 {
if err := stream.Send(&api.WatchMessage{Events: events, Version: commitEvent.Version}); err != nil {