Compare commits

...

40 Commits

Author SHA1 Message Date
Paweł Gronowski
265f709647 Merge pull request #50247 from vvoland/50245-28.x
[28.x backport] docs: cut api docs for v1.51
2025-06-20 16:22:35 +00:00
Paweł Gronowski
b2a9318a1e docs: cut api docs for v1.51
Used by the upcoming 28.3.0 release

Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
(cherry picked from commit ef50844a0b)
Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
2025-06-20 18:16:33 +02:00
Sebastiaan van Stijn
b3e2e22b2a Merge pull request #50244 from vvoland/50177-28.x
[28.x backport] gha: lower timeouts on "build" and "merge" steps
2025-06-20 17:37:41 +02:00
Sebastiaan van Stijn
c571cd8513 Merge pull request #50243 from vvoland/50238-28.x
[28.x backport] vendor: update buildkit to v0.23.1
2025-06-20 17:36:18 +02:00
Sebastiaan van Stijn
8c713c1af4 gha: lower timeouts on "build" and "merge" steps
We had some runs timeout after 120 minutes; expected duration is much
lower than that, so let's lower the timeout to make actions fail faster.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit 0a30b98447)
Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
2025-06-20 16:29:08 +02:00
Paweł Gronowski
539c115023 Merge pull request #50240 from thaJeztah/28.x_backport_validate_mirrors
[28.x backport] daemon/config: Validate: add missing validation for registry mirrors and improve errors
2025-06-20 14:16:09 +00:00
CrazyMax
8e7ea470cf vendor: update buildkit to v0.23.1
Signed-off-by: CrazyMax <1951866+crazy-max@users.noreply.github.com>
(cherry picked from commit 5a02e7f4e3)
Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
2025-06-20 16:05:04 +02:00
Jonathan A. Sternberg
222baf4ccb vendor: github.com/moby/buildkit v0.23.0
Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
(cherry picked from commit e43968d7ed)
Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
2025-06-20 16:04:30 +02:00
Paweł Gronowski
1627e828d7 Merge pull request #50241 from thaJeztah/28.x_backport_update_cgroups
[28.x backport] vendor: github.com/opencontainers/cgroups v0.0.3
2025-06-20 14:00:46 +00:00
Paweł Gronowski
4070ebda88 Merge pull request #50242 from thaJeztah/28.x_backport_fix_event_ordering
[28.x backport] daemon: containerStop: fix ordering of "stop" and "die" events
2025-06-20 13:38:31 +00:00
Paweł Gronowski
b613ac489e Merge pull request #50239 from vvoland/50237-28.x
[28.x backport] Update containerd to v2.1.3
2025-06-20 11:36:57 +00:00
Sebastiaan van Stijn
0e0ca09ddc daemon: containerStop: fix ordering of "stop" and "die" events
Commit 8e6cd44ce4 added synchronisation to
wait for the container's status to be updated in memory. However, since
952902efbc, a defer was used to produce
the container's "stop" event.

As a result of the sychronisation that was added, the "die" event would
now be produced before the "stop" event.

This patch moves the locking inside the defer to restore the previous
behavior.

Unfortunately the order of events is still not guaranteed, because events
are emited from multiple goroutines that don't have synchronisation between
them; this is something to look at for follow ups. This patch keeps the status
quo and should preserve the old behavior, which was "more" correct in most
cases.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit 062082ec9b)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-06-20 13:29:19 +02:00
Sebastiaan van Stijn
e62b0e2234 vendor: github.com/opencontainers/cgroups v0.0.3
- ConvertCPUSharesToCgroupV2Value: improve
- Add .github/dependabot.yml
- Remove annotations from Resources (fixes a regression introduced in v0.0.2)

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit a90da2edc3)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-06-20 13:26:46 +02:00
Sebastiaan van Stijn
06ab9cd1ed daemon/config: Validate: add missing validation for registry mirrors
Validation of registry mirrors was performed during daemon startup,
but after the config-file was validated. As a result, the `--validate`
option would incorrectly print that the configuration was valid, but
the daemon would fail to start;

    echo '{"registry-mirrors":["example.com"]}' > my-config.json
    dockerd --config-file ./my-config.json --validate
    configuration OK

    dockerd --config-file ./my-config.json
    # ...
    failed to start daemon: invalid mirror: no scheme specified for "example.com": must use either 'https://' or 'http://'

With this patch applied, validation is also performed as part of the
daemon config validation;

    echo '{"registry-mirrors":["example.com"]}' > my-config.json
    dockerd --config-file ./my-config.json --validate
    unable to configure the Docker daemon with file ./my-config.json: merged configuration validation from file and command line flags failed: invalid mirror: no scheme specified for "example.com": must use either 'https://' or 'http://'

    # fix the invalid config
    echo '{"registry-mirrors":["https://example.com"]}' > my-config.json
    dockerd --config-file ./my-config.json --validate
    configuration OK

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit 1d8545d60c)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-06-20 13:20:27 +02:00
Sebastiaan van Stijn
97aa4e8550 registry: ValidateMirror: improve validation for missing schemes
Before this patch, a missing scheme would sometimes produce a confusing
error message. If no scheme was specified at all, an empty "" would be
included in the message;

    echo '{"registry-mirrors":["example.com"]}' > my-config.json
    dockerd --config-file ./my-config.json
    # ...
    failed to start daemon: invalid mirror: unsupported scheme "" in "example.com"

If a scheme was missing, but a port was included, the hostname would be
printed as the scheme;

    echo '{"registry-mirrors":["example.com:8080"]}' > my-config.json
    dockerd --config-file ./my-config.json
    # ...
    failed to start daemon: invalid mirror: unsupported scheme "example.com" in "example.com:8080"

With this patch applied, the error messages are slightly more user-friendly;

    echo '{"registry-mirrors":["example.com"]}' > my-config.json
    dockerd --config-file ./my-config.json
    # ...
    failed to start daemon: invalid mirror: no scheme specified for "example.com": must use either 'https://' or 'http://'

    echo '{"registry-mirrors":["example.com:8080"]}' > my-config.json
    dockerd --config-file ./my-config.json
    # ...
    failed to start daemon: invalid mirror: no scheme specified for "example.com:8080": must use either 'https://' or 'http://'

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit 307c18598d)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-06-20 13:20:26 +02:00
Derek McGowan
e18a9c95b8 Update containerd to v2.1.3
Fixes various issues with pulling from registries

Signed-off-by: Derek McGowan <derek@mcg.dev>
(cherry picked from commit b466c35da1)
Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
2025-06-20 11:28:31 +02:00
Sebastiaan van Stijn
b959bebdfc Merge pull request #50219 from thaJeztah/28.x_backport_deprecate_execconfig_detach
[28.x backport] api/types/container: deprecate ExecOptions.Detach
2025-06-18 23:03:00 +02:00
Sebastiaan van Stijn
02ade1a34c Merge pull request #50210 from thaJeztah/28.x_backport_pkg_idtools_deprecate
[28.x backport] pkg/idtools: deprecate IdentityMapping, Identity.Chown
2025-06-18 23:02:12 +02:00
Paweł Gronowski
106c4b0af6 Merge pull request #50211 from thaJeztah/28.x_backport_bump_swarmkit
[28.x backport] vendor: github.com/moby/swarmkit/v2 v2.0.0
2025-06-17 16:18:54 +00:00
Sebastiaan van Stijn
54d2eee6d6 Merge pull request #50217 from thaJeztah/28.x_backport_update-buildkit-v0.23.0-rc2
[28.x backport] vendor: update buildkit to v0.13.0-rc2
2025-06-17 15:10:47 +02:00
Sebastiaan van Stijn
09fef2b26e api/types/container: deprecate ExecOptions.Detach
This field was added in 5130fe5d38, which
added it for use as intermediate struct when parsing CLI flags (through
`runconfig.ParseExec`) in c786a8ee5e.

Commit 9d9dff3d0d rewrote the CLI to use
Cobra, and as part of this introduced a separate `execOptions` type in
`api/client/container`.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit 0c182d4d57)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-06-17 13:10:50 +02:00
Tonis Tiigi
44c8cd2e8f vendor: update buildkit to v0.13.0-rc2
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
(cherry picked from commit 1289519b03)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-06-17 11:36:27 +02:00
Cory Snider
78b6204f9e vendor: github.com/moby/swarmkit/v2 v2.0.0
Use the tagged version instead of the v2.0.0-20250613170222-a45be3cac15c
pseudo-version. The referenced commit has not changed.

Signed-off-by: Cory Snider <csnider@mirantis.com>
(cherry picked from commit c3ac979ecf)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-06-17 11:23:57 +02:00
Cory Snider
cf98237186 vendor: github.com/moby/swarmkit/v2 v2.0.0-20250613170222-a45be3cac15c
- fix task scheduler infinite loop

full diff: 8c19597365...a45be3cac1

Signed-off-by: Cory Snider <csnider@mirantis.com>
(cherry picked from commit 2d60b8eacd)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-06-16 17:53:26 +02:00
Sebastiaan van Stijn
fd96b01b0e pkg/idtools: deprecate IdentityMapping, Identity.Chown
The IdentityMapping and Identity types are still used internally, but
should be considered transitional.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
(cherry picked from commit b7ef527bdc)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2025-06-16 17:40:11 +02:00
Paweł Gronowski
6a1fb46d48 Merge pull request #50169 from robmry/revert_overlay_refactoring
[28.x]: Revert overlay bug fixes / refactoring
2025-06-13 15:49:07 +00:00
Rob Murray
7acb079403 Revert "libn/networkdb: don't exceed broadcast size limit"
This reverts commit dacf445614.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:49 +01:00
Rob Murray
0df31cf585 Revert "libn/networkdb: fix data race in GetTableByNetwork"
This reverts commit ec65f2d21b.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:48 +01:00
Rob Murray
83b2fc245d Revert "Fix possible overlapping IPs when ingressNA == nil"
This reverts commit 56ad941564.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:46 +01:00
Rob Murray
e079583ab4 Revert "libnetwork/networkdb: use correct index in GetTableByNetwork"
This reverts commit d5c370dee6.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:45 +01:00
Rob Murray
cfd5e5e4d4 Revert "libn/networkdb: b'cast watch events from local POV"
This reverts commit c68671d908.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:44 +01:00
Rob Murray
576cf73add Revert "libn/networkdb: record tombstones for all deletes"
This reverts commit ada8bc3695.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:43 +01:00
Rob Murray
2297ae3e64 Revert "libn/networkdb: Watch() without race conditions"
This reverts commit a3aea15257.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:41 +01:00
Rob Murray
cc60ec8d3c Revert "libn/networkdb: stop table events from racing network leaves"
This reverts commit 270a4d41dc.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:40 +01:00
Rob Murray
b5b349dbd6 Revert "libn/osl: drop unused AddNeighbor force parameter"
This reverts commit 3bdf99d127.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:39 +01:00
Rob Murray
35916f0869 Revert "libn/osl: refactor func (*Namespace) AddNeighbor"
This reverts commit b6d76eb572.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:38 +01:00
Rob Murray
3eb59ba5a2 Revert "libnetwork/osl: remove superfluous locks in Namespace"
This reverts commit 9866738736.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:37 +01:00
Rob Murray
5d6ae34753 Revert "libnetwork/osl: stop tracking neighbor entries"
This reverts commit 0d6e7cd983.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:36 +01:00
Rob Murray
ea818a7f6f Revert "libnetwork/internal/setmatrix: make keys generic"
This reverts commit 0317f773a6.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:33 +01:00
Rob Murray
78ccc20545 Revert "libn/d/overlay: use netip types more"
This reverts commit d188df0039.

Signed-off-by: Rob Murray <rob.murray@docker.com>
2025-06-11 12:05:26 +01:00
50 changed files with 14119 additions and 862 deletions

View File

@@ -91,7 +91,7 @@ jobs:
build:
runs-on: ubuntu-24.04
timeout-minutes: 120 # guardrails timeout for the whole job
timeout-minutes: 20 # guardrails timeout for the whole job
needs:
- validate-dco
- prepare
@@ -167,7 +167,7 @@ jobs:
merge:
runs-on: ubuntu-24.04
timeout-minutes: 120 # guardrails timeout for the whole job
timeout-minutes: 40 # guardrails timeout for the whole job
needs:
- build
if: always() && !contains(needs.*.result, 'failure') && !contains(needs.*.result, 'cancelled') && github.event_name != 'pull_request' && github.repository == 'moby/moby'

View File

@@ -304,7 +304,7 @@ linters:
- staticcheck
# FIXME(thaJeztah): ignoring these transitional utilities until BuildKit is vendored with https://github.com/moby/moby/pull/49743
- text: "SA1019: idtools\\.(ToUserIdentityMapping|FromUserIdentityMapping) is deprecated"
- text: "SA1019: idtools\\.(ToUserIdentityMapping|FromUserIdentityMapping|IdentityMapping) is deprecated"
linters:
- staticcheck

View File

@@ -18,11 +18,13 @@ type ExecOptions struct {
AttachStdin bool // Attach the standard input, makes possible user interaction
AttachStderr bool // Attach the standard error
AttachStdout bool // Attach the standard output
Detach bool // Execute in detach mode
DetachKeys string // Escape keys for detach
Env []string // Environment variables
WorkingDir string // Working directory
Cmd []string // Execution commands and args
// Deprecated: the Detach field is not used, and will be removed in a future release.
Detach bool
}
// ExecStartOptions is a temp struct used by execStart

View File

@@ -213,35 +213,36 @@ func (e *executor) Configure(ctx context.Context, node *api.Node) error {
if ingressNA == nil {
e.backend.ReleaseIngress()
} else {
options := network.CreateOptions{
Driver: ingressNA.Network.DriverState.Name,
IPAM: &network.IPAM{
Driver: ingressNA.Network.IPAM.Driver.Name,
},
Options: ingressNA.Network.DriverState.Options,
Ingress: true,
}
return e.backend.GetAttachmentStore().ResetAttachments(attachments)
}
for _, ic := range ingressNA.Network.IPAM.Configs {
c := network.IPAMConfig{
Subnet: ic.Subnet,
IPRange: ic.Range,
Gateway: ic.Gateway,
}
options.IPAM.Config = append(options.IPAM.Config, c)
}
options := network.CreateOptions{
Driver: ingressNA.Network.DriverState.Name,
IPAM: &network.IPAM{
Driver: ingressNA.Network.IPAM.Driver.Name,
},
Options: ingressNA.Network.DriverState.Options,
Ingress: true,
}
_, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
ID: ingressNA.Network.ID,
CreateRequest: network.CreateRequest{
Name: ingressNA.Network.Spec.Annotations.Name,
CreateOptions: options,
},
}, ingressNA.Addresses[0])
if err != nil {
return err
for _, ic := range ingressNA.Network.IPAM.Configs {
c := network.IPAMConfig{
Subnet: ic.Subnet,
IPRange: ic.Range,
Gateway: ic.Gateway,
}
options.IPAM.Config = append(options.IPAM.Config, c)
}
_, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
ID: ingressNA.Network.ID,
CreateRequest: network.CreateRequest{
Name: ingressNA.Network.Spec.Annotations.Name,
CreateOptions: options,
},
}, ingressNA.Addresses[0])
if err != nil {
return err
}
var (

View File

@@ -748,6 +748,12 @@ func Validate(config *Config) error {
}
}
for _, mirror := range config.ServiceOptions.Mirrors {
if _, err := registry.ValidateMirror(mirror); err != nil {
return err
}
}
if config.CorsHeaders != "" {
// TODO(thaJeztah): option is used to produce error when used; remove in next release
return errors.New(`DEPRECATED: The "api-cors-header" config parameter and the dockerd "--api-cors-header" option have been removed; use a reverse proxy if you need CORS headers`)

View File

@@ -14,6 +14,7 @@ import (
"github.com/docker/docker/api"
"github.com/docker/docker/libnetwork/ipamutils"
"github.com/docker/docker/opts"
"github.com/docker/docker/registry"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/spf13/pflag"
@@ -428,6 +429,17 @@ func TestValidateConfigurationErrors(t *testing.T) {
platform: "windows",
expectedErr: "invalid exec-opt (native.cgroupdriver=systemd): option 'native.cgroupdriver' is only supported on linux",
},
{
name: "invalid mirror",
config: &Config{
CommonConfig: CommonConfig{
ServiceOptions: registry.ServiceOptions{
Mirrors: []string{"ftp://example.com"},
},
},
},
expectedErr: `invalid mirror: unsupported scheme "ftp" in "ftp://example.com": must use either 'https://' or 'http://'`,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

View File

@@ -74,6 +74,9 @@ func (daemon *Daemon) containerStop(ctx context.Context, ctr *container.Containe
defer func() {
if retErr == nil {
daemon.LogContainerEvent(ctr, events.ActionStop)
// Ensure container status changes are committed by handler of container exit before returning control to the caller
ctr.Lock()
defer ctr.Unlock()
}
}()
@@ -93,9 +96,6 @@ func (daemon *Daemon) containerStop(ctx context.Context, ctr *container.Containe
defer cancel()
if status := <-ctr.Wait(subCtx, containertypes.WaitConditionNotRunning); status.Err() == nil {
// Ensure container status changes are committed by handler of container exit before returning control to the caller
ctr.Lock()
defer ctr.Unlock()
// container did exit, so ignore any previous errors and return
return nil
}
@@ -125,9 +125,5 @@ func (daemon *Daemon) containerStop(ctx context.Context, ctr *container.Containe
// container did exit, so ignore previous errors and continue
}
// Ensure container status changes are committed by handler of container exit before returning control to the caller
ctr.Lock()
defer ctr.Unlock()
return nil
}

13430
docs/api/v1.51.yaml Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -127,9 +127,8 @@ func TestExecResize(t *testing.T) {
cmd = []string{"sleep", "240"}
}
resp, err := apiClient.ContainerExecCreate(ctx, cID, containertypes.ExecOptions{
Tty: true, // Windows requires a TTY for the resize to work, otherwise fails with "is not a tty: failed precondition", see https://github.com/moby/moby/pull/48665#issuecomment-2412530345
Detach: true,
Cmd: cmd,
Tty: true, // Windows requires a TTY for the resize to work, otherwise fails with "is not a tty: failed precondition", see https://github.com/moby/moby/pull/48665#issuecomment-2412530345
Cmd: cmd,
})
assert.NilError(t, err)
execID := resp.ID

View File

@@ -777,6 +777,23 @@ func (n *Network) addDriverWatches() {
agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
agent.mu.Unlock()
go c.handleTableEvents(ch, n.handleDriverTableEvent)
d, err := n.driver(false)
if err != nil {
log.G(context.TODO()).Errorf("Could not resolve driver %s while walking driver table: %v", n.networkType, err)
return
}
err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
// skip the entries that are mark for deletion, this is safe because this function is
// called at initialization time so there is no state to delete
if nid == n.ID() && !deleted {
d.EventNotify(driverapi.Create, nid, table.name, key, value)
}
return false
})
if err != nil {
log.G(context.TODO()).WithError(err).Warn("Error while walking networkdb")
}
}
}
@@ -894,7 +911,7 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
err := proto.Unmarshal(value, &epRec)
if err != nil {
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
log.G(context.TODO()).Errorf("Failed to unmarshal service table value: %v", err)
return
}
@@ -907,54 +924,53 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
serviceAliases := epRec.Aliases
taskAliases := epRec.TaskAliases
logger := log.G(context.TODO()).WithFields(log.Fields{
"nid": nid,
"eid": eid,
"T": fmt.Sprintf("%T", ev),
"R": epRec,
})
if containerName == "" || ip == nil {
logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
log.G(context.TODO()).Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
}
logger.Debug("handleEpTableEvent")
switch ev.(type) {
case networkdb.CreateEvent, networkdb.UpdateEvent:
case networkdb.CreateEvent:
log.G(context.TODO()).Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if epRec.ServiceDisabled {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
logger.WithError(err).Error("failed disabling service binding")
return
}
} else {
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logger.WithError(err).Error("failed adding service binding")
return
}
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
log.G(context.TODO()).Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logger.WithError(err).Errorf("failed adding container name resolution")
log.G(context.TODO()).Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
case networkdb.DeleteEvent:
log.G(context.TODO()).Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
logger.WithError(err).Error("failed removing service binding")
log.G(context.TODO()).Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logger.WithError(err).Errorf("failed removing container name resolution")
log.G(context.TODO()).Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
case networkdb.UpdateEvent:
log.G(context.TODO()).Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
// We currently should only get these to inform us that an endpoint
// is disabled. Report if otherwise.
if svcID == "" || !epRec.ServiceDisabled {
log.G(context.TODO()).Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
return
}
// This is a remote task that is part of a service that is now disabled
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
log.G(context.TODO()).Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
}
}

View File

@@ -11,7 +11,6 @@ import (
"fmt"
"hash/fnv"
"net"
"net/netip"
"strconv"
"sync"
"syscall"
@@ -92,7 +91,7 @@ func (s *spi) String() string {
}
type encrMap struct {
nodes map[netip.Addr][]*spi
nodes map[string][]*spi
sync.Mutex
}
@@ -102,7 +101,7 @@ func (e *encrMap) String() string {
b := new(bytes.Buffer)
for k, v := range e.nodes {
b.WriteString("\n")
b.WriteString(k.String())
b.WriteString(k)
b.WriteString(":")
b.WriteString("[")
for _, s := range v {
@@ -114,7 +113,7 @@ func (e *encrMap) String() string {
return b.String()
}
func (d *driver) checkEncryption(nid string, rIP netip.Addr, isLocal, add bool) error {
func (d *driver) checkEncryption(nid string, rIP net.IP, isLocal, add bool) error {
log.G(context.TODO()).Debugf("checkEncryption(%.7s, %v, %t)", nid, rIP, isLocal)
n := d.network(nid)
@@ -128,13 +127,13 @@ func (d *driver) checkEncryption(nid string, rIP netip.Addr, isLocal, add bool)
lIP := d.bindAddress
aIP := d.advertiseAddress
nodes := map[netip.Addr]struct{}{}
nodes := map[string]net.IP{}
switch {
case isLocal:
if err := d.peerDbNetworkWalk(nid, func(_ netip.Addr, _ net.HardwareAddr, pEntry *peerEntry) bool {
if aIP != pEntry.vtep {
nodes[pEntry.vtep] = struct{}{}
if err := d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
if !aIP.Equal(pEntry.vtep) {
nodes[pEntry.vtep.String()] = pEntry.vtep
}
return false
}); err != nil {
@@ -142,14 +141,14 @@ func (d *driver) checkEncryption(nid string, rIP netip.Addr, isLocal, add bool)
}
default:
if len(d.network(nid).endpoints) > 0 {
nodes[rIP] = struct{}{}
nodes[rIP.String()] = rIP
}
}
log.G(context.TODO()).Debugf("List of nodes: %s", nodes)
if add {
for rIP := range nodes {
for _, rIP := range nodes {
if err := setupEncryption(lIP, aIP, rIP, d.secMap, d.keys); err != nil {
log.G(context.TODO()).Warnf("Failed to program network encryption between %s and %s: %v", lIP, rIP, err)
}
@@ -167,18 +166,19 @@ func (d *driver) checkEncryption(nid string, rIP netip.Addr, isLocal, add bool)
// setupEncryption programs the encryption parameters for secure communication
// between the local node and a remote node.
func setupEncryption(localIP, advIP, remoteIP netip.Addr, em *encrMap, keys []*key) error {
func setupEncryption(localIP, advIP, remoteIP net.IP, em *encrMap, keys []*key) error {
log.G(context.TODO()).Debugf("Programming encryption between %s and %s", localIP, remoteIP)
rIPs := remoteIP.String()
indices := make([]*spi, 0, len(keys))
for i, k := range keys {
spis := &spi{buildSPI(advIP.AsSlice(), remoteIP.AsSlice(), k.tag), buildSPI(remoteIP.AsSlice(), advIP.AsSlice(), k.tag)}
spis := &spi{buildSPI(advIP, remoteIP, k.tag), buildSPI(remoteIP, advIP, k.tag)}
dir := reverse
if i == 0 {
dir = bidir
}
fSA, rSA, err := programSA(localIP.AsSlice(), remoteIP.AsSlice(), spis, k, dir, true)
fSA, rSA, err := programSA(localIP, remoteIP, spis, k, dir, true)
if err != nil {
log.G(context.TODO()).Warn(err)
}
@@ -193,15 +193,15 @@ func setupEncryption(localIP, advIP, remoteIP netip.Addr, em *encrMap, keys []*k
}
em.Lock()
em.nodes[remoteIP] = indices
em.nodes[rIPs] = indices
em.Unlock()
return nil
}
func removeEncryption(localIP, remoteIP netip.Addr, em *encrMap) error {
func removeEncryption(localIP, remoteIP net.IP, em *encrMap) error {
em.Lock()
indices, ok := em.nodes[remoteIP]
indices, ok := em.nodes[remoteIP.String()]
em.Unlock()
if !ok {
return nil
@@ -211,7 +211,7 @@ func removeEncryption(localIP, remoteIP netip.Addr, em *encrMap) error {
if i == 0 {
dir = bidir
}
fSA, rSA, err := programSA(localIP.AsSlice(), remoteIP.AsSlice(), idxs, nil, dir, false)
fSA, rSA, err := programSA(localIP, remoteIP, idxs, nil, dir, false)
if err != nil {
log.G(context.TODO()).Warn(err)
}
@@ -478,7 +478,7 @@ func buildAeadAlgo(k *key, s int) *netlink.XfrmStateAlgo {
}
}
func (d *driver) secMapWalk(f func(netip.Addr, []*spi) ([]*spi, bool)) error {
func (d *driver) secMapWalk(f func(string, []*spi) ([]*spi, bool)) error {
d.secMap.Lock()
for node, indices := range d.secMap.nodes {
idxs, stop := f(node, indices)
@@ -499,7 +499,7 @@ func (d *driver) setKeys(keys []*key) error {
// Accept the encryption keys and clear any stale encryption map
d.Lock()
d.keys = keys
d.secMap = &encrMap{nodes: map[netip.Addr][]*spi{}}
d.secMap = &encrMap{nodes: map[string][]*spi{}}
d.Unlock()
log.G(context.TODO()).Debugf("Initial encryption keys: %v", keys)
return nil
@@ -548,8 +548,9 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
return types.InvalidParameterErrorf("attempting to both make a key (index %d) primary and delete it", priIdx)
}
d.secMapWalk(func(rIP netip.Addr, spis []*spi) ([]*spi, bool) {
return updateNodeKey(lIP.AsSlice(), aIP.AsSlice(), rIP.AsSlice(), spis, d.keys, newIdx, priIdx, delIdx), false
d.secMapWalk(func(rIPs string, spis []*spi) ([]*spi, bool) {
rIP := net.ParseIP(rIPs)
return updateNodeKey(lIP, aIP, rIP, spis, d.keys, newIdx, priIdx, delIdx), false
})
// swap primary

View File

@@ -6,12 +6,10 @@ import (
"context"
"fmt"
"net"
"net/netip"
"syscall"
"github.com/containerd/log"
"github.com/docker/docker/libnetwork/driverapi"
"github.com/docker/docker/libnetwork/internal/netiputil"
"github.com/docker/docker/libnetwork/netlabel"
"github.com/docker/docker/libnetwork/ns"
"github.com/docker/docker/libnetwork/osl"
@@ -107,7 +105,7 @@ func (d *driver) Join(ctx context.Context, nid, eid string, sboxKey string, jinf
if sub == s {
continue
}
if err = jinfo.AddStaticRoute(netiputil.ToIPNet(sub.subnetIP), types.NEXTHOP, s.gwIP.Addr().AsSlice()); err != nil {
if err = jinfo.AddStaticRoute(sub.subnetIP, types.NEXTHOP, s.gwIP.IP); err != nil {
log.G(ctx).Errorf("Adding subnet %s static route in network %q failed\n", s.subnetIP, n.id)
}
}
@@ -119,9 +117,9 @@ func (d *driver) Join(ctx context.Context, nid, eid string, sboxKey string, jinf
}
}
d.peerAdd(nid, eid, ep.addr, ep.mac, d.advertiseAddress, true)
d.peerAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, d.advertiseAddress, true)
if err = d.checkEncryption(nid, netip.Addr{}, true, true); err != nil {
if err = d.checkEncryption(nid, nil, true, true); err != nil {
log.G(ctx).Warn(err)
}
@@ -174,34 +172,34 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
// Ignore local peers. We already know about them and they
// should not be added to vxlan fdb.
if addr, _ := netip.ParseAddr(peer.TunnelEndpointIP); addr == d.advertiseAddress {
if net.ParseIP(peer.TunnelEndpointIP).Equal(d.advertiseAddress) {
return
}
addr, err := netip.ParsePrefix(peer.EndpointIP)
addr, err := types.ParseCIDR(peer.EndpointIP)
if err != nil {
log.G(context.TODO()).WithError(err).Errorf("Invalid peer IP %s received in event notify", peer.EndpointIP)
log.G(context.TODO()).Errorf("Invalid peer IP %s received in event notify", peer.EndpointIP)
return
}
mac, err := net.ParseMAC(peer.EndpointMAC)
if err != nil {
log.G(context.TODO()).WithError(err).Errorf("Invalid mac %s received in event notify", peer.EndpointMAC)
log.G(context.TODO()).Errorf("Invalid mac %s received in event notify", peer.EndpointMAC)
return
}
vtep, err := netip.ParseAddr(peer.TunnelEndpointIP)
if err != nil {
log.G(context.TODO()).WithError(err).Errorf("Invalid VTEP %s received in event notify", peer.TunnelEndpointIP)
vtep := net.ParseIP(peer.TunnelEndpointIP)
if vtep == nil {
log.G(context.TODO()).Errorf("Invalid VTEP %s received in event notify", peer.TunnelEndpointIP)
return
}
if etype == driverapi.Delete {
d.peerDelete(nid, eid, addr, mac, vtep, false)
d.peerDelete(nid, eid, addr.IP, addr.Mask, mac, vtep, false)
return
}
d.peerAdd(nid, eid, addr, mac, vtep, false)
d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, false)
}
// Leave method is invoked when a Sandbox detaches from an endpoint.
@@ -221,7 +219,7 @@ func (d *driver) Leave(nid, eid string) error {
return types.InternalMaskableErrorf("could not find endpoint with id %s", eid)
}
d.peerDelete(nid, eid, ep.addr, ep.mac, d.advertiseAddress, true)
d.peerDelete(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, d.advertiseAddress, true)
n.leaveSandbox()

View File

@@ -6,11 +6,9 @@ import (
"context"
"fmt"
"net"
"net/netip"
"github.com/containerd/log"
"github.com/docker/docker/libnetwork/driverapi"
"github.com/docker/docker/libnetwork/internal/netiputil"
"github.com/docker/docker/libnetwork/netutils"
"github.com/docker/docker/libnetwork/ns"
)
@@ -22,7 +20,7 @@ type endpoint struct {
nid string
ifName string
mac net.HardwareAddr
addr netip.Prefix
addr *net.IPNet
}
func (n *network) endpoint(eid string) *endpoint {
@@ -63,13 +61,12 @@ func (d *driver) CreateEndpoint(_ context.Context, nid, eid string, ifInfo drive
}
ep := &endpoint{
id: eid,
nid: n.id,
mac: ifInfo.MacAddress(),
id: eid,
nid: n.id,
addr: ifInfo.Address(),
mac: ifInfo.MacAddress(),
}
var ok bool
ep.addr, ok = netiputil.ToPrefix(ifInfo.Address())
if !ok {
if ep.addr == nil {
return fmt.Errorf("create endpoint was not passed interface IP address")
}
@@ -78,7 +75,7 @@ func (d *driver) CreateEndpoint(_ context.Context, nid, eid string, ifInfo drive
}
if ep.mac == nil {
ep.mac = netutils.GenerateMACFromIP(ep.addr.Addr().AsSlice())
ep.mac = netutils.GenerateMACFromIP(ep.addr.IP)
if err := ifInfo.SetMacAddress(ep.mac); err != nil {
return err
}

View File

@@ -6,7 +6,7 @@ import (
"context"
"errors"
"fmt"
"net/netip"
"net"
"os"
"path/filepath"
"runtime"
@@ -18,7 +18,6 @@ import (
"github.com/docker/docker/internal/nlwrap"
"github.com/docker/docker/libnetwork/driverapi"
"github.com/docker/docker/libnetwork/drivers/overlay/overlayutils"
"github.com/docker/docker/libnetwork/internal/netiputil"
"github.com/docker/docker/libnetwork/netlabel"
"github.com/docker/docker/libnetwork/ns"
"github.com/docker/docker/libnetwork/osl"
@@ -43,8 +42,8 @@ type subnet struct {
brName string
vni uint32
initErr error
subnetIP netip.Prefix
gwIP netip.Prefix
subnetIP *net.IPNet
gwIP *net.IPNet
}
type network struct {
@@ -139,9 +138,11 @@ func (d *driver) CreateNetwork(ctx context.Context, id string, option map[string
}
for i, ipd := range ipV4Data {
s := &subnet{vni: vnis[i]}
s.subnetIP, _ = netiputil.ToPrefix(ipd.Pool)
s.gwIP, _ = netiputil.ToPrefix(ipd.Gateway)
s := &subnet{
subnetIP: ipd.Pool,
gwIP: ipd.Gateway,
vni: vnis[i],
}
n.subnets = append(n.subnets, s)
}
@@ -426,7 +427,7 @@ func (n *network) setupSubnetSandbox(s *subnet, brName, vxlanName string) error
// create a bridge and vxlan device for this subnet and move it to the sandbox
sbox := n.sbox
if err := sbox.AddInterface(context.TODO(), brName, "br", "", osl.WithIPv4Address(netiputil.ToIPNet(s.gwIP)), osl.WithIsBridge(true)); err != nil {
if err := sbox.AddInterface(context.TODO(), brName, "br", "", osl.WithIPv4Address(s.gwIP), osl.WithIsBridge(true)); err != nil {
return fmt.Errorf("bridge creation in sandbox failed for subnet %q: %v", s.subnetIP.String(), err)
}
@@ -613,13 +614,15 @@ func (n *network) sandbox() *osl.Namespace {
}
// getSubnetforIP returns the subnet to which the given IP belongs
func (n *network) getSubnetforIP(ip netip.Prefix) *subnet {
func (n *network) getSubnetforIP(ip *net.IPNet) *subnet {
for _, s := range n.subnets {
// first check if the mask lengths are the same
if s.subnetIP.Bits() != ip.Bits() {
i, _ := s.subnetIP.Mask.Size()
j, _ := ip.Mask.Size()
if i != j {
continue
}
if s.subnetIP.Contains(ip.Addr()) {
if s.subnetIP.Contains(ip.IP) {
return s
}
}

View File

@@ -7,7 +7,7 @@ package overlay
import (
"context"
"fmt"
"net/netip"
"net"
"sync"
"github.com/containerd/log"
@@ -28,7 +28,7 @@ const (
var _ discoverapi.Discover = (*driver)(nil)
type driver struct {
bindAddress, advertiseAddress netip.Addr
bindAddress, advertiseAddress net.IP
config map[string]interface{}
peerDb peerNetworkMap
@@ -48,7 +48,7 @@ func Register(r driverapi.Registerer, config map[string]interface{}) error {
peerDb: peerNetworkMap{
mp: map[string]*peerMap{},
},
secMap: &encrMap{nodes: map[netip.Addr][]*spi{}},
secMap: &encrMap{nodes: map[string][]*spi{}},
config: config,
}
return r.RegisterDriver(NetworkType, d, driverapi.Capability{
@@ -78,17 +78,16 @@ func (d *driver) isIPv6Transport() (bool, error) {
// from the address family of our own advertise address. This is a
// reasonable inference to make as Linux VXLAN links do not support
// mixed-address-family remote peers.
if !d.advertiseAddress.IsValid() {
if d.advertiseAddress == nil {
return false, fmt.Errorf("overlay: cannot determine address family of transport: the local data-plane address is not currently known")
}
return d.advertiseAddress.Is6(), nil
return d.advertiseAddress.To4() == nil, nil
}
func (d *driver) nodeJoin(data discoverapi.NodeDiscoveryData) error {
if data.Self {
advAddr, _ := netip.ParseAddr(data.Address)
bindAddr, _ := netip.ParseAddr(data.BindAddress)
if !advAddr.IsValid() {
advAddr, bindAddr := net.ParseIP(data.Address), net.ParseIP(data.BindAddress)
if advAddr == nil {
return fmt.Errorf("invalid discovery data")
}
d.Lock()

View File

@@ -7,7 +7,6 @@ import (
"context"
"fmt"
"net"
"net/netip"
"sync"
"syscall"
@@ -18,16 +17,52 @@ import (
const ovPeerTable = "overlay_peer_table"
type peerKey struct {
peerIP net.IP
peerMac net.HardwareAddr
}
type peerEntry struct {
eid string
vtep netip.Addr
prefixBits int // number of 1-bits in network mask of peerIP
vtep net.IP
peerIPMask net.IPMask
isLocal bool
}
func (p *peerEntry) MarshalDB() peerEntryDB {
ones, bits := p.peerIPMask.Size()
return peerEntryDB{
eid: p.eid,
vtep: p.vtep.String(),
peerIPMaskOnes: ones,
peerIPMaskBits: bits,
isLocal: p.isLocal,
}
}
// This the structure saved into the set (SetMatrix), due to the implementation of it
// the value inserted in the set has to be Hashable so the []byte had to be converted into
// strings
type peerEntryDB struct {
eid string
vtep string
peerIPMaskOnes int
peerIPMaskBits int
isLocal bool
}
func (p *peerEntryDB) UnMarshalDB() peerEntry {
return peerEntry{
eid: p.eid,
vtep: net.ParseIP(p.vtep),
peerIPMask: net.CIDRMask(p.peerIPMaskOnes, p.peerIPMaskBits),
isLocal: p.isLocal,
}
}
type peerMap struct {
// set of peerEntry, note the values have to be objects and not pointers to maintain the proper equality checks
mp setmatrix.SetMatrix[ipmac, peerEntry]
mp setmatrix.SetMatrix[peerEntryDB]
sync.Mutex
}
@@ -37,7 +72,28 @@ type peerNetworkMap struct {
sync.Mutex
}
func (d *driver) peerDbWalk(f func(string, netip.Addr, net.HardwareAddr, *peerEntry) bool) error {
func (pKey peerKey) String() string {
return fmt.Sprintf("%s %s", pKey.peerIP, pKey.peerMac)
}
func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error {
ipB, err := state.Token(true, nil)
if err != nil {
return err
}
pKey.peerIP = net.ParseIP(string(ipB))
macB, err := state.Token(true, nil)
if err != nil {
return err
}
pKey.peerMac, err = net.ParseMAC(string(macB))
return err
}
func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error {
d.peerDb.Lock()
nids := []string{}
for nid := range d.peerDb.mp {
@@ -46,14 +102,14 @@ func (d *driver) peerDbWalk(f func(string, netip.Addr, net.HardwareAddr, *peerEn
d.peerDb.Unlock()
for _, nid := range nids {
d.peerDbNetworkWalk(nid, func(peerIP netip.Addr, peerMac net.HardwareAddr, pEntry *peerEntry) bool {
return f(nid, peerIP, peerMac, pEntry)
d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
return f(nid, pKey, pEntry)
})
}
return nil
}
func (d *driver) peerDbNetworkWalk(nid string, f func(netip.Addr, net.HardwareAddr, *peerEntry) bool) error {
func (d *driver) peerDbNetworkWalk(nid string, f func(*peerKey, *peerEntry) bool) error {
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
d.peerDb.Unlock()
@@ -62,18 +118,22 @@ func (d *driver) peerDbNetworkWalk(nid string, f func(netip.Addr, net.HardwareAd
return nil
}
mp := map[ipmac]peerEntry{}
mp := map[string]peerEntry{}
pMap.Lock()
for _, pKey := range pMap.mp.Keys() {
entryDBList, ok := pMap.mp.Get(pKey)
for _, pKeyStr := range pMap.mp.Keys() {
entryDBList, ok := pMap.mp.Get(pKeyStr)
if ok {
mp[pKey] = entryDBList[0]
mp[pKeyStr] = entryDBList[0].UnMarshalDB()
}
}
pMap.Unlock()
for pKey, pEntry := range mp {
if f(pKey.ip, pKey.mac.HardwareAddr(), &pEntry) {
for pKeyStr, pEntry := range mp {
var pKey peerKey
if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil {
log.G(context.TODO()).Warnf("Peer key scan on network %s failed: %v", nid, err)
}
if f(&pKey, &pEntry) {
return nil
}
}
@@ -81,14 +141,12 @@ func (d *driver) peerDbNetworkWalk(nid string, f func(netip.Addr, net.HardwareAd
return nil
}
func (d *driver) peerDbSearch(nid string, peerIP netip.Addr) (netip.Addr, net.HardwareAddr, *peerEntry, error) {
var peerIPMatched netip.Addr
var peerMacMatched net.HardwareAddr
func (d *driver) peerDbSearch(nid string, peerIP net.IP) (*peerKey, *peerEntry, error) {
var pKeyMatched *peerKey
var pEntryMatched *peerEntry
err := d.peerDbNetworkWalk(nid, func(ip netip.Addr, mac net.HardwareAddr, pEntry *peerEntry) bool {
if ip == peerIP {
peerIPMatched = ip
peerMacMatched = mac
err := d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
if pKey.peerIP.Equal(peerIP) {
pKeyMatched = pKey
pEntryMatched = pEntry
return true
}
@@ -96,17 +154,17 @@ func (d *driver) peerDbSearch(nid string, peerIP netip.Addr) (netip.Addr, net.Ha
return false
})
if err != nil {
return netip.Addr{}, nil, nil, fmt.Errorf("peerdb search for peer ip %q failed: %v", peerIP, err)
return nil, nil, fmt.Errorf("peerdb search for peer ip %q failed: %v", peerIP, err)
}
if !peerIPMatched.IsValid() || pEntryMatched == nil {
return netip.Addr{}, nil, nil, fmt.Errorf("peer ip %q not found in peerdb", peerIP)
if pKeyMatched == nil || pEntryMatched == nil {
return nil, nil, fmt.Errorf("peer ip %q not found in peerdb", peerIP)
}
return peerIPMatched, peerMacMatched, pEntryMatched, nil
return pKeyMatched, pEntryMatched, nil
}
func (d *driver) peerDbAdd(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, isLocal bool) (bool, int) {
func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, isLocal bool) (bool, int) {
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
if !ok {
@@ -115,27 +173,30 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP netip.Prefix, peerMac net.Har
}
d.peerDb.Unlock()
pKey := ipmacOf(peerIP.Addr(), peerMac)
pKey := peerKey{
peerIP: peerIP,
peerMac: peerMac,
}
pEntry := peerEntry{
eid: eid,
vtep: vtep,
prefixBits: peerIP.Bits(),
peerIPMask: peerIPMask,
isLocal: isLocal,
}
pMap.Lock()
defer pMap.Unlock()
b, i := pMap.mp.Insert(pKey, pEntry)
b, i := pMap.mp.Insert(pKey.String(), pEntry.MarshalDB())
if i != 1 {
// Transient case, there is more than one endpoint that is using the same IP,MAC pair
s, _ := pMap.mp.String(pKey)
s, _ := pMap.mp.String(pKey.String())
log.G(context.TODO()).Warnf("peerDbAdd transient condition - Key:%s cardinality:%d db state:%s", pKey.String(), i, s)
}
return b, i
}
func (d *driver) peerDbDelete(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, isLocal bool) (bool, int) {
func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, isLocal bool) (bool, int) {
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
if !ok {
@@ -144,22 +205,25 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP netip.Prefix, peerMac net.
}
d.peerDb.Unlock()
pKey := ipmacOf(peerIP.Addr(), peerMac)
pKey := peerKey{
peerIP: peerIP,
peerMac: peerMac,
}
pEntry := peerEntry{
eid: eid,
vtep: vtep,
prefixBits: peerIP.Bits(),
peerIPMask: peerIPMask,
isLocal: isLocal,
}
pMap.Lock()
defer pMap.Unlock()
b, i := pMap.mp.Remove(pKey, pEntry)
b, i := pMap.mp.Remove(pKey.String(), pEntry.MarshalDB())
if i != 0 {
// Transient case, there is more than one endpoint that is using the same IP,MAC pair
s, _ := pMap.mp.String(pKey)
log.G(context.TODO()).Warnf("peerDbDelete transient condition - Key:%s cardinality:%d db state:%s", pKey, i, s)
s, _ := pMap.mp.String(pKey.String())
log.G(context.TODO()).Warnf("peerDbDelete transient condition - Key:%s cardinality:%d db state:%s", pKey.String(), i, s)
}
return b, i
}
@@ -184,28 +248,28 @@ func (d *driver) initSandboxPeerDB(nid string) {
}
func (d *driver) peerInitOp(nid string) error {
return d.peerDbNetworkWalk(nid, func(peerIP netip.Addr, peerMac net.HardwareAddr, pEntry *peerEntry) bool {
return d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
// Local entries do not need to be added
if pEntry.isLocal {
return false
}
d.peerAddOp(nid, pEntry.eid, netip.PrefixFrom(peerIP, pEntry.prefixBits), peerMac, pEntry.vtep, false, pEntry.isLocal)
d.peerAddOp(nid, pEntry.eid, pKey.peerIP, pEntry.peerIPMask, pKey.peerMac, pEntry.vtep, false, pEntry.isLocal)
// return false to loop on all entries
return false
})
}
func (d *driver) peerAdd(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, localPeer bool) {
func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, localPeer bool) {
d.peerOpMu.Lock()
defer d.peerOpMu.Unlock()
err := d.peerAddOp(nid, eid, peerIP, peerMac, vtep, true, localPeer)
err := d.peerAddOp(nid, eid, peerIP, peerIPMask, peerMac, vtep, true, localPeer)
if err != nil {
log.G(context.TODO()).WithError(err).Warn("Peer add operation failed")
}
}
func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, updateDB, localPeer bool) error {
func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDB, localPeer bool) error {
if err := validateID(nid, eid); err != nil {
return err
}
@@ -213,7 +277,7 @@ func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.Har
var dbEntries int
var inserted bool
if updateDB {
inserted, dbEntries = d.peerDbAdd(nid, eid, peerIP, peerMac, vtep, localPeer)
inserted, dbEntries = d.peerDbAdd(nid, eid, peerIP, peerIPMask, peerMac, vtep, localPeer)
if !inserted {
log.G(context.TODO()).Warnf("Entry already present in db: nid:%s eid:%s peerIP:%v peerMac:%v isLocal:%t vtep:%v",
nid, eid, peerIP, peerMac, localPeer, vtep)
@@ -238,9 +302,14 @@ func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.Har
return nil
}
s := n.getSubnetforIP(peerIP)
IP := &net.IPNet{
IP: peerIP,
Mask: peerIPMask,
}
s := n.getSubnetforIP(IP)
if s == nil {
return fmt.Errorf("couldn't find the subnet %q in network %q", peerIP.String(), n.id)
return fmt.Errorf("couldn't find the subnet %q in network %q", IP.String(), n.id)
}
if err := n.joinSandbox(s, false); err != nil {
@@ -252,7 +321,7 @@ func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.Har
}
// Add neighbor entry for the peer IP
if err := sbox.AddNeighbor(peerIP.Addr().AsSlice(), peerMac, osl.WithLinkName(s.vxlanName)); err != nil {
if err := sbox.AddNeighbor(peerIP, peerMac, false, osl.WithLinkName(s.vxlanName)); err != nil {
if _, ok := err.(osl.NeighborSearchError); ok && dbEntries > 1 {
// We are in the transient case so only the first configuration is programmed into the kernel
// Upon deletion if the active configuration is deleted the next one from the database will be restored
@@ -263,28 +332,28 @@ func (d *driver) peerAddOp(nid, eid string, peerIP netip.Prefix, peerMac net.Har
}
// Add fdb entry to the bridge for the peer mac
if err := sbox.AddNeighbor(vtep.AsSlice(), peerMac, osl.WithLinkName(s.vxlanName), osl.WithFamily(syscall.AF_BRIDGE)); err != nil {
if err := sbox.AddNeighbor(vtep, peerMac, false, osl.WithLinkName(s.vxlanName), osl.WithFamily(syscall.AF_BRIDGE)); err != nil {
return fmt.Errorf("could not add fdb entry for nid:%s eid:%s into the sandbox:%v", nid, eid, err)
}
return nil
}
func (d *driver) peerDelete(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, localPeer bool) {
func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, localPeer bool) {
d.peerOpMu.Lock()
defer d.peerOpMu.Unlock()
err := d.peerDeleteOp(nid, eid, peerIP, peerMac, vtep, localPeer)
err := d.peerDeleteOp(nid, eid, peerIP, peerIPMask, peerMac, vtep, localPeer)
if err != nil {
log.G(context.TODO()).WithError(err).Warn("Peer delete operation failed")
}
}
func (d *driver) peerDeleteOp(nid, eid string, peerIP netip.Prefix, peerMac net.HardwareAddr, vtep netip.Addr, localPeer bool) error {
func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, localPeer bool) error {
if err := validateID(nid, eid); err != nil {
return err
}
deleted, dbEntries := d.peerDbDelete(nid, eid, peerIP, peerMac, vtep, localPeer)
deleted, dbEntries := d.peerDbDelete(nid, eid, peerIP, peerIPMask, peerMac, vtep, localPeer)
if !deleted {
log.G(context.TODO()).Warnf("Entry was not in db: nid:%s eid:%s peerIP:%v peerMac:%v isLocal:%t vtep:%v",
nid, eid, peerIP, peerMac, localPeer, vtep)
@@ -306,12 +375,8 @@ func (d *driver) peerDeleteOp(nid, eid string, peerIP netip.Prefix, peerMac net.
// Local peers do not have any local configuration to delete
if !localPeer {
s := n.getSubnetforIP(peerIP)
if s == nil {
return fmt.Errorf("could not find the subnet %q in network %q", peerIP.String(), n.id)
}
// Remove fdb entry to the bridge for the peer mac
if err := sbox.DeleteNeighbor(vtep.AsSlice(), peerMac, osl.WithLinkName(s.vxlanName)); err != nil {
if err := sbox.DeleteNeighbor(vtep, peerMac); err != nil {
if _, ok := err.(osl.NeighborSearchError); ok && dbEntries > 0 {
// We fall in here if there is a transient state and if the neighbor that is being deleted
// was never been configured into the kernel (we allow only 1 configuration at the time per <ip,mac> mapping)
@@ -321,7 +386,7 @@ func (d *driver) peerDeleteOp(nid, eid string, peerIP netip.Prefix, peerMac net.
}
// Delete neighbor entry for the peer IP
if err := sbox.DeleteNeighbor(peerIP.Addr().AsSlice(), peerMac, osl.WithLinkName(s.vxlanName), osl.WithFamily(syscall.AF_BRIDGE)); err != nil {
if err := sbox.DeleteNeighbor(peerIP, peerMac); err != nil {
return fmt.Errorf("could not delete neighbor entry for nid:%s eid:%s into the sandbox:%v", nid, eid, err)
}
}
@@ -333,12 +398,12 @@ func (d *driver) peerDeleteOp(nid, eid string, peerIP netip.Prefix, peerMac net.
// If there is still an entry into the database and the deletion went through without errors means that there is now no
// configuration active in the kernel.
// Restore one configuration for the <ip,mac> directly from the database, note that is guaranteed that there is one
peerIPAddr, peerMac, peerEntry, err := d.peerDbSearch(nid, peerIP.Addr())
peerKey, peerEntry, err := d.peerDbSearch(nid, peerIP)
if err != nil {
log.G(context.TODO()).Errorf("peerDeleteOp unable to restore a configuration for nid:%s ip:%v mac:%v err:%s", nid, peerIP, peerMac, err)
return err
}
return d.peerAddOp(nid, peerEntry.eid, netip.PrefixFrom(peerIPAddr, peerEntry.prefixBits), peerMac, peerEntry.vtep, false, peerEntry.isLocal)
return d.peerAddOp(nid, peerEntry.eid, peerIP, peerEntry.peerIPMask, peerKey.peerMac, peerEntry.vtep, false, peerEntry.isLocal)
}
func (d *driver) peerFlush(nid string) {
@@ -361,7 +426,7 @@ func (d *driver) peerFlushOp(nid string) error {
}
func (d *driver) peerDBUpdateSelf() {
d.peerDbWalk(func(nid string, _ netip.Addr, _ net.HardwareAddr, pEntry *peerEntry) bool {
d.peerDbWalk(func(nid string, pkey *peerKey, pEntry *peerEntry) bool {
if pEntry.isLocal {
pEntry.vtep = d.advertiseAddress
}

View File

@@ -0,0 +1,32 @@
//go:build linux
package overlay
import (
"net"
"testing"
)
func TestPeerMarshal(t *testing.T) {
_, ipNet, _ := net.ParseCIDR("192.168.0.1/24")
p := &peerEntry{
eid: "eid",
isLocal: true,
peerIPMask: ipNet.Mask,
vtep: ipNet.IP,
}
entryDB := p.MarshalDB()
x := entryDB.UnMarshalDB()
if x.eid != p.eid {
t.Fatalf("Incorrect Unmarshalling for eid: %v != %v", x.eid, p.eid)
}
if x.isLocal != p.isLocal {
t.Fatalf("Incorrect Unmarshalling for isLocal: %v != %v", x.isLocal, p.isLocal)
}
if x.peerIPMask.String() != p.peerIPMask.String() {
t.Fatalf("Incorrect Unmarshalling for eid: %v != %v", x.peerIPMask, p.peerIPMask)
}
if x.vtep.String() != p.vtep.String() {
t.Fatalf("Incorrect Unmarshalling for eid: %v != %v", x.vtep, p.vtep)
}
}

View File

@@ -1,52 +0,0 @@
package overlay
// Handy utility types for making unhashable values hashable.
import (
"net"
"net/netip"
)
// macAddr is a hashable encoding of a MAC address.
type macAddr uint64
// macAddrOf converts a net.HardwareAddr to a macAddr.
func macAddrOf(mac net.HardwareAddr) macAddr {
if len(mac) != 6 {
return 0
}
return macAddr(mac[0])<<40 | macAddr(mac[1])<<32 | macAddr(mac[2])<<24 |
macAddr(mac[3])<<16 | macAddr(mac[4])<<8 | macAddr(mac[5])
}
// HardwareAddr converts a macAddr back to a net.HardwareAddr.
func (p macAddr) HardwareAddr() net.HardwareAddr {
mac := [6]byte{
byte(p >> 40), byte(p >> 32), byte(p >> 24),
byte(p >> 16), byte(p >> 8), byte(p),
}
return mac[:]
}
// String returns p.HardwareAddr().String().
func (p macAddr) String() string {
return p.HardwareAddr().String()
}
// ipmac is a hashable tuple of an IP address and a MAC address suitable for use as a map key.
type ipmac struct {
ip netip.Addr
mac macAddr
}
// ipmacOf is a convenience constructor for creating an ipmac from a [net.HardwareAddr].
func ipmacOf(ip netip.Addr, mac net.HardwareAddr) ipmac {
return ipmac{
ip: ip,
mac: macAddrOf(mac),
}
}
func (i ipmac) String() string {
return i.ip.String() + " " + i.mac.String()
}

View File

@@ -1,29 +0,0 @@
package overlay
import (
"net"
"net/netip"
"testing"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
func TestMACAddrOf(t *testing.T) {
want := net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}
assert.DeepEqual(t, macAddrOf(want).HardwareAddr(), want)
}
func TestIPMACOf(t *testing.T) {
assert.Check(t, is.Equal(ipmacOf(netip.Addr{}, nil), ipmac{}))
assert.Check(t, is.Equal(
ipmacOf(
netip.MustParseAddr("11.22.33.44"),
net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06},
),
ipmac{
ip: netip.MustParseAddr("11.22.33.44"),
mac: macAddrOf(net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}),
},
))
}

View File

@@ -13,14 +13,14 @@ import (
// The zero value is an empty set matrix ready to use.
//
// SetMatrix values are safe for concurrent use.
type SetMatrix[K, V comparable] struct {
matrix map[K]mapset.Set[V]
type SetMatrix[T comparable] struct {
matrix map[string]mapset.Set[T]
mu sync.Mutex
}
// Get returns the members of the set for a specific key as a slice.
func (s *SetMatrix[K, V]) Get(key K) ([]V, bool) {
func (s *SetMatrix[T]) Get(key string) ([]T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -31,7 +31,7 @@ func (s *SetMatrix[K, V]) Get(key K) ([]V, bool) {
}
// Contains is used to verify if an element is in a set for a specific key.
func (s *SetMatrix[K, V]) Contains(key K, value V) (containsElement, setExists bool) {
func (s *SetMatrix[T]) Contains(key string, value T) (containsElement, setExists bool) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -43,13 +43,13 @@ func (s *SetMatrix[K, V]) Contains(key K, value V) (containsElement, setExists b
// Insert inserts the value in the set of a key and returns whether the value is
// inserted (was not already in the set) and the number of elements in the set.
func (s *SetMatrix[K, V]) Insert(key K, value V) (inserted bool, cardinality int) {
func (s *SetMatrix[T]) Insert(key string, value T) (inserted bool, cardinality int) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
if !ok {
if s.matrix == nil {
s.matrix = make(map[K]mapset.Set[V])
s.matrix = make(map[string]mapset.Set[T])
}
s.matrix[key] = mapset.NewThreadUnsafeSet(value)
return true, 1
@@ -59,7 +59,7 @@ func (s *SetMatrix[K, V]) Insert(key K, value V) (inserted bool, cardinality int
}
// Remove removes the value in the set for a specific key.
func (s *SetMatrix[K, V]) Remove(key K, value V) (removed bool, cardinality int) {
func (s *SetMatrix[T]) Remove(key string, value T) (removed bool, cardinality int) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -80,7 +80,7 @@ func (s *SetMatrix[K, V]) Remove(key K, value V) (removed bool, cardinality int)
}
// Cardinality returns the number of elements in the set for a key.
func (s *SetMatrix[K, V]) Cardinality(key K) (cardinality int, ok bool) {
func (s *SetMatrix[T]) Cardinality(key string) (cardinality int, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -93,7 +93,7 @@ func (s *SetMatrix[K, V]) Cardinality(key K) (cardinality int, ok bool) {
// String returns the string version of the set.
// The empty string is returned if there is no set for key.
func (s *SetMatrix[K, V]) String(key K) (v string, ok bool) {
func (s *SetMatrix[T]) String(key string) (v string, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.matrix[key]
@@ -104,10 +104,10 @@ func (s *SetMatrix[K, V]) String(key K) (v string, ok bool) {
}
// Keys returns all the keys in the map.
func (s *SetMatrix[K, V]) Keys() []K {
func (s *SetMatrix[T]) Keys() []string {
s.mu.Lock()
defer s.mu.Unlock()
keys := make([]K, 0, len(s.matrix))
keys := make([]string, 0, len(s.matrix))
for k := range s.matrix {
keys = append(keys, k)
}

View File

@@ -9,7 +9,7 @@ import (
)
func TestSetSerialInsertDelete(t *testing.T) {
var s SetMatrix[string, string]
var s SetMatrix[string]
b, i := s.Insert("a", "1")
if !b || i != 1 {
@@ -135,7 +135,7 @@ func TestSetSerialInsertDelete(t *testing.T) {
}
}
func insertDeleteRotuine(ctx context.Context, endCh chan int, s *SetMatrix[string, string], key, value string) {
func insertDeleteRotuine(ctx context.Context, endCh chan int, s *SetMatrix[string], key, value string) {
for {
select {
case <-ctx.Done():
@@ -158,7 +158,7 @@ func insertDeleteRotuine(ctx context.Context, endCh chan int, s *SetMatrix[strin
}
func TestSetParallelInsertDelete(t *testing.T) {
var s SetMatrix[string, string]
var s SetMatrix[string]
parallelRoutines := 6
endCh := make(chan int)
// Let the routines running and competing for 10s

View File

@@ -457,7 +457,7 @@ func getSvcRecords(t *testing.T, n *Network, key string) (addrs []netip.Addr, fo
sr, ok := n.ctrlr.svcRecords[n.id]
assert.Assert(t, ok)
lookup := func(svcMap *setmatrix.SetMatrix[string, svcMapEntry]) bool {
lookup := func(svcMap *setmatrix.SetMatrix[svcMapEntry]) bool {
mapEntryList, ok := svcMap.Get(key)
if !ok {
return false

View File

@@ -57,9 +57,9 @@ type svcMapEntry struct {
}
type svcInfo struct {
svcMap setmatrix.SetMatrix[string, svcMapEntry]
svcIPv6Map setmatrix.SetMatrix[string, svcMapEntry]
ipMap setmatrix.SetMatrix[string, ipInfo]
svcMap setmatrix.SetMatrix[svcMapEntry]
svcIPv6Map setmatrix.SetMatrix[svcMapEntry]
ipMap setmatrix.SetMatrix[ipInfo]
service map[string][]servicePorts
}
@@ -1370,7 +1370,7 @@ func (n *Network) updateSvcRecord(ctx context.Context, ep *Endpoint, isAdd bool)
}
}
func addIPToName(ipMap *setmatrix.SetMatrix[string, ipInfo], name, serviceID string, ip net.IP) {
func addIPToName(ipMap *setmatrix.SetMatrix[ipInfo], name, serviceID string, ip net.IP) {
reverseIP := netutils.ReverseIP(ip.String())
ipMap.Insert(reverseIP, ipInfo{
name: name,
@@ -1378,7 +1378,7 @@ func addIPToName(ipMap *setmatrix.SetMatrix[string, ipInfo], name, serviceID str
})
}
func delIPToName(ipMap *setmatrix.SetMatrix[string, ipInfo], name, serviceID string, ip net.IP) {
func delIPToName(ipMap *setmatrix.SetMatrix[ipInfo], name, serviceID string, ip net.IP) {
reverseIP := netutils.ReverseIP(ip.String())
ipMap.Remove(reverseIP, ipInfo{
name: name,
@@ -1386,7 +1386,7 @@ func delIPToName(ipMap *setmatrix.SetMatrix[string, ipInfo], name, serviceID str
})
}
func addNameToIP(svcMap *setmatrix.SetMatrix[string, svcMapEntry], name, serviceID string, epIP net.IP) {
func addNameToIP(svcMap *setmatrix.SetMatrix[svcMapEntry], name, serviceID string, epIP net.IP) {
// Since DNS name resolution is case-insensitive, Use the lower-case form
// of the name as the key into svcMap
lowerCaseName := strings.ToLower(name)
@@ -1396,7 +1396,7 @@ func addNameToIP(svcMap *setmatrix.SetMatrix[string, svcMapEntry], name, service
})
}
func delNameToIP(svcMap *setmatrix.SetMatrix[string, svcMapEntry], name, serviceID string, epIP net.IP) {
func delNameToIP(svcMap *setmatrix.SetMatrix[svcMapEntry], name, serviceID string, epIP net.IP) {
lowerCaseName := strings.ToLower(name)
svcMap.Remove(lowerCaseName, svcMapEntry{
ip: epIP.String(),

View File

@@ -148,13 +148,8 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
// Update our local clock if the received messages has newer time.
nDB.tableClock.Witness(tEvent.LTime)
nDB.Lock()
// Hold the lock until after we broadcast the event to watchers so that
// the new watch receives either the synthesized event or the event we
// broadcast, never both.
defer nDB.Unlock()
// Ignore the table events for networks that are in the process of going away
nDB.RLock()
networks := nDB.networks[nDB.config.NodeID]
network, ok := networks[tEvent.NetworkID]
// Check if the owner of the event is still part of the network
@@ -166,24 +161,33 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
break
}
}
nDB.RUnlock()
if !ok || network.leaving || !nodePresent {
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
return false
}
var entryPresent bool
prev, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
nDB.Lock()
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
if err == nil {
entryPresent = true
// We have the latest state. Ignore the event
// since it is stale.
if prev.ltime >= tEvent.LTime {
if e.ltime >= tEvent.LTime {
nDB.Unlock()
return false
}
} else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
nDB.Unlock()
// We don't know the entry, the entry is being deleted and the message is an async message
// In this case the safest approach is to ignore it, it is possible that the queue grew so much to
// exceed the garbage collection time (the residual reap time that is in the message is not being
// updated, to avoid inserting too many messages in the queue).
// Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
return false
}
e := &entry{
e = &entry{
ltime: tEvent.LTime,
node: tEvent.NodeName,
value: tEvent.Value,
@@ -200,55 +204,35 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
e.reapTime = nDB.config.reapEntryInterval
}
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
nDB.Unlock()
if !entryPresent && tEvent.Type == TableEventTypeDelete {
// We will rebroadcast the message for an unknown entry if all the conditions are met:
// 1) the message was received from a bulk sync
// 2) we had already synced this network (during the network join)
// 3) the residual reapTime is higher than 1/6 of the total reapTime.
//
// If the residual reapTime is lower or equal to 1/6 of the total reapTime
// don't bother broadcasting it around as most likely the cluster is already aware of it.
// This also reduces the possibility that deletion of entries close to their garbage collection
// ends up circling around forever.
//
// The safest approach is to not rebroadcast async messages for unknown entries.
// It is possible that the queue grew so much to exceed the garbage collection time
// (the residual reap time that is in the message is not being updated, to avoid
// inserting too many messages in the queue).
if err != nil && tEvent.Type == TableEventTypeDelete {
// Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
// We had saved the state so to speed up convergence and be able to avoid accepting create events.
// Now we will rebroadcast the message if 2 conditions are met:
// 1) we had already synced this network (during the network join)
// 2) the residual reapTime is higher than 1/6 of the total reapTime.
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it
// This also reduce the possibility that deletion of entries close to their garbage collection ends up circling around
// forever
// log.G(ctx).Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
return isBulkSync && network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
}
var op opType
value := tEvent.Value
switch tEvent.Type {
case TableEventTypeCreate, TableEventTypeUpdate:
// Gossip messages could arrive out-of-order so it is possible
// for an entry's UPDATE event to be received before its CREATE
// event. The local watchers should not need to care about such
// nuances. Broadcast events to watchers based only on what
// changed in the local NetworkDB state.
case TableEventTypeCreate:
op = opCreate
if entryPresent && !prev.deleting {
op = opUpdate
}
case TableEventTypeUpdate:
op = opUpdate
case TableEventTypeDelete:
if !entryPresent || prev.deleting {
goto SkipBroadcast
}
op = opDelete
// Broadcast the value most recently observed by watchers,
// which may be different from the value in the DELETE event
// (e.g. if the DELETE event was received out-of-order).
value = prev.value
default:
// TODO(thaJeztah): make switch exhaustive; add networkdb.TableEventTypeInvalid
}
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, value))
SkipBroadcast:
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
return network.inSync
}
@@ -424,12 +408,7 @@ func (d *delegate) NotifyMsg(buf []byte) {
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
for _, m := range msgs {
limit -= overhead + len(m)
}
if limit > 0 {
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
}
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
return msgs
}

View File

@@ -252,27 +252,14 @@ func DefaultConfig() *Config {
// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
nDB := new(c)
log.G(context.TODO()).Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
if err := nDB.clusterInit(); err != nil {
return nil, err
}
return nDB, nil
}
func new(c *Config) *NetworkDB {
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod
return &NetworkDB{
config: c,
indexes: map[int]*iradix.Tree[*entry]{
byTable: iradix.New[*entry](),
byNetwork: iradix.New[*entry](),
},
nDB := &NetworkDB{
config: c,
indexes: make(map[int]*iradix.Tree[*entry]),
networks: make(map[string]map[string]*network),
nodes: make(map[string]*node),
failedNodes: make(map[string]*node),
@@ -281,6 +268,16 @@ func new(c *Config) *NetworkDB {
bulkSyncAckTbl: make(map[string]chan struct{}),
broadcaster: events.NewBroadcaster(),
}
nDB.indexes[byTable] = iradix.New[*entry]()
nDB.indexes[byNetwork] = iradix.New[*entry]()
log.G(context.TODO()).Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
if err := nDB.clusterInit(); err != nil {
return nil, err
}
return nDB, nil
}
// Join joins this NetworkDB instance with a list of peer NetworkDB
@@ -432,11 +429,8 @@ type TableElem struct {
// GetTableByNetwork walks the networkdb by the give table and network id and
// returns a map of keys and values
func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem {
nDB.RLock()
root := nDB.indexes[byTable].Root()
nDB.RUnlock()
entries := make(map[string]*TableElem)
root.WalkPrefix([]byte(fmt.Sprintf("/%s/%s", tname, nid)), func(k []byte, v *entry) bool {
nDB.indexes[byTable].Root().WalkPrefix([]byte(fmt.Sprintf("/%s/%s", tname, nid)), func(k []byte, v *entry) bool {
if v.deleting {
return false
}
@@ -591,14 +585,21 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
// value. The walk stops if the passed function returns a true.
func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
nDB.RLock()
root := nDB.indexes[byTable].Root()
values := make(map[string]*entry)
nDB.indexes[byTable].Root().WalkPrefix([]byte("/"+tname), func(path []byte, v *entry) bool {
values[string(path)] = v
return false
})
nDB.RUnlock()
root.WalkPrefix([]byte("/"+tname), func(path []byte, v *entry) bool {
params := strings.Split(string(path[1:]), "/")
for k, v := range values {
params := strings.Split(k[1:], "/")
nid := params[1]
key := params[2]
return fn(nid, key, v.value, v.deleting)
})
if fn(nid, key, v.value, v.deleting) {
return nil
}
}
return nil
}

View File

@@ -430,22 +430,6 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) {
dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
// Shake out any data races.
done := make(chan struct{})
defer close(done)
for _, db := range dbs {
go func(db *NetworkDB) {
for {
select {
case <-done:
return
default:
}
_ = db.GetTableByNetwork("test_table", "network1")
}
}(db)
}
for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
if i == j {

View File

@@ -2,7 +2,6 @@ package networkdb
import (
"net"
"strings"
"github.com/docker/go-events"
)
@@ -43,8 +42,7 @@ type DeleteEvent event
// network or any combination of the tuple. If any of the
// filter is an empty string it acts as a wildcard for that
// field. Watch returns a channel of events, where the events will be
// sent. The watch channel is initialized with synthetic create events for all
// the existing table entries not owned by this node which match the filters.
// sent.
func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) {
var matcher events.Matcher
@@ -79,45 +77,6 @@ func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) {
sink = events.NewFilter(sink, matcher)
}
// Synthesize events for all the existing table entries not owned by
// this node so that the watcher receives all state without racing with
// any concurrent mutations to the table.
nDB.RLock()
defer nDB.RUnlock()
if tname == "" {
var prefix []byte
if nid != "" {
prefix = []byte("/" + nid + "/")
} else {
prefix = []byte("/")
}
nDB.indexes[byNetwork].Root().WalkPrefix(prefix, func(path []byte, v *entry) bool {
if !v.deleting && v.node != nDB.config.NodeID {
tuple := strings.SplitN(string(path[1:]), "/", 3)
if len(tuple) == 3 {
entryNid, entryTname, key := tuple[0], tuple[1], tuple[2]
sink.Write(makeEvent(opCreate, entryTname, entryNid, key, v.value))
}
}
return false
})
} else {
prefix := []byte("/" + tname + "/")
if nid != "" {
prefix = append(prefix, []byte(nid+"/")...)
}
nDB.indexes[byTable].Root().WalkPrefix(prefix, func(path []byte, v *entry) bool {
if !v.deleting && v.node != nDB.config.NodeID {
tuple := strings.SplitN(string(path[1:]), "/", 3)
if len(tuple) == 3 {
entryTname, entryNid, key := tuple[0], tuple[1], tuple[2]
sink.Write(makeEvent(opCreate, entryTname, entryNid, key, v.value))
}
}
return false
})
}
nDB.broadcaster.Add(sink)
return ch, func() {
nDB.broadcaster.Remove(sink)

View File

@@ -1,273 +0,0 @@
package networkdb
import (
"net"
"testing"
"time"
"github.com/docker/go-events"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
func TestWatch_out_of_order(t *testing.T) {
nDB := new(DefaultConfig())
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
assert.Assert(t, nDB.JoinNetwork("network1"))
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
Name: "node1",
Addr: net.IPv4(1, 2, 3, 4),
})
d := &delegate{nDB}
msgs := messageBuffer{t: t}
appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1")
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: 1,
NodeName: "node1",
NetworkID: "network1",
})
appendTableEvent(1, TableEventTypeCreate, "tombstone1", []byte("a"))
appendTableEvent(2, TableEventTypeDelete, "tombstone1", []byte("b"))
appendTableEvent(3, TableEventTypeCreate, "key1", []byte("value1"))
d.NotifyMsg(msgs.Compound())
msgs.Reset()
nDB.CreateEntry("table1", "network1", "local1", []byte("should not see me in watch events"))
watch, cancel := nDB.Watch("table1", "network1")
defer cancel()
got := drainChannel(watch.C)
assert.Check(t, is.DeepEqual(got, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key1", Value: []byte("value1")}),
}))
// Receive events from node1, with events not received or received out of order
// Create, (hidden update), delete
appendTableEvent(4, TableEventTypeCreate, "key2", []byte("a"))
appendTableEvent(6, TableEventTypeDelete, "key2", []byte("b"))
// (Hidden recreate), delete
appendTableEvent(8, TableEventTypeDelete, "key2", []byte("c"))
// (Hidden recreate), update
appendTableEvent(10, TableEventTypeUpdate, "key2", []byte("d"))
// Update, create
appendTableEvent(11, TableEventTypeUpdate, "key3", []byte("b"))
appendTableEvent(10, TableEventTypeCreate, "key3", []byte("a"))
// (Hidden create), update, update
appendTableEvent(13, TableEventTypeUpdate, "key4", []byte("b"))
appendTableEvent(14, TableEventTypeUpdate, "key4", []byte("c"))
// Delete, create
appendTableEvent(16, TableEventTypeDelete, "key5", []byte("a"))
appendTableEvent(15, TableEventTypeCreate, "key5", []byte("a"))
// (Hidden recreate), delete
appendTableEvent(18, TableEventTypeDelete, "key5", []byte("b"))
d.NotifyMsg(msgs.Compound())
msgs.Reset()
got = drainChannel(watch.C)
assert.Check(t, is.DeepEqual(got, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")}),
// Delete value should match last observed value,
// irrespective of the content of the delete event over the wire.
DeleteEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")}),
// Updates to previously-deleted keys should be observed as creates.
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("d")}),
// Out-of-order update events should be observed as creates.
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key3", Value: []byte("b")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key4", Value: []byte("b")}),
UpdateEvent(event{Table: "table1", NetworkID: "network1", Key: "key4", Value: []byte("c")}),
// key5 should not appear in the events.
}))
}
func TestWatch_filters(t *testing.T) {
nDB := new(DefaultConfig())
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
assert.Assert(t, nDB.JoinNetwork("network1"))
assert.Assert(t, nDB.JoinNetwork("network2"))
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
Name: "node1",
Addr: net.IPv4(1, 2, 3, 4),
})
var ltime serf.LamportClock
msgs := messageBuffer{t: t}
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: "network1",
})
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: "network2",
})
for _, nid := range []string{"network1", "network2"} {
for _, tname := range []string{"table1", "table2"} {
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeCreate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".dead",
Value: []byte("deaddead"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeDelete,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".dead",
Value: []byte("deaddead"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeCreate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".update",
Value: []byte("initial"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeCreate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname,
Value: []byte("a"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeUpdate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".update",
Value: []byte("updated"),
})
}
}
(&delegate{nDB}).NotifyMsg(msgs.Compound())
watchAll, cancel := nDB.Watch("", "")
defer cancel()
watchNetwork1Tables, cancel := nDB.Watch("", "network1")
defer cancel()
watchTable1AllNetworks, cancel := nDB.Watch("table1", "")
defer cancel()
watchTable1Network1, cancel := nDB.Watch("table1", "network1")
defer cancel()
var gotAll, gotNetwork1Tables, gotTable1AllNetworks, gotTable1Network1 []events.Event
L:
for {
select {
case ev := <-watchAll.C:
gotAll = append(gotAll, ev)
case ev := <-watchNetwork1Tables.C:
gotNetwork1Tables = append(gotNetwork1Tables, ev)
case ev := <-watchTable1AllNetworks.C:
gotTable1AllNetworks = append(gotTable1AllNetworks, ev)
case ev := <-watchTable1Network1.C:
gotTable1Network1 = append(gotTable1Network1, ev)
case <-time.After(time.Second):
break L
}
}
assert.Check(t, is.DeepEqual(gotAll, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")}),
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table2", NetworkID: "network2", Key: "network2.table2", Value: []byte("a")}),
CreateEvent(event{Table: "table2", NetworkID: "network2", Key: "network2.table2.update", Value: []byte("updated")}),
}))
assert.Check(t, is.DeepEqual(gotNetwork1Tables, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")}),
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")}),
}))
assert.Check(t, is.DeepEqual(gotTable1AllNetworks, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")}),
}))
assert.Check(t, is.DeepEqual(gotTable1Network1, []events.Event{
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
}))
}
func drainChannel(ch <-chan events.Event) []events.Event {
var events []events.Event
for {
select {
case ev := <-ch:
events = append(events, ev)
case <-time.After(time.Second):
return events
}
}
}
type messageBuffer struct {
t *testing.T
msgs [][]byte
}
func (mb *messageBuffer) Append(typ MessageType, msg any) {
mb.t.Helper()
buf, err := encodeMessage(typ, msg)
if err != nil {
mb.t.Fatalf("failed to encode message: %v", err)
}
mb.msgs = append(mb.msgs, buf)
}
func (mb *messageBuffer) Compound() []byte {
return makeCompoundMessage(mb.msgs)
}
func (mb *messageBuffer) Reset() {
mb.msgs = nil
}
func tableEventHelper(mb *messageBuffer, nodeName, networkID, tableName string) func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
return func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
mb.t.Helper()
mb.Append(MessageTypeTableEvent, &TableEvent{
Type: typ,
LTime: ltime,
NodeName: nodeName,
NetworkID: networkID,
TableName: tableName,
Key: key,
Value: value,
})
}
}

View File

@@ -804,20 +804,24 @@ func (n *Namespace) prepAdvertiseAddrs(ctx context.Context, i *Interface, ifInde
// original name and moving it out of the sandbox.
func (n *Namespace) RemoveInterface(i *Interface) error {
close(i.stopCh)
n.mu.Lock()
isDefault := n.isDefault
nlh := n.nlHandle
n.mu.Unlock()
// Find the network interface identified by the DstName attribute.
iface, err := n.nlHandle.LinkByName(i.DstName())
iface, err := nlh.LinkByName(i.DstName())
if err != nil {
return err
}
// Down the interface before configuring
if err := n.nlHandle.LinkSetDown(iface); err != nil {
if err := nlh.LinkSetDown(iface); err != nil {
return err
}
// TODO(aker): Why are we doing this? This would fail if the initial interface set up failed before the "dest interface" was moved into its own namespace; see https://github.com/moby/moby/pull/46315/commits/108595c2fe852a5264b78e96f9e63cda284990a6#r1331253578
err = n.nlHandle.LinkSetName(iface, i.SrcName())
err = nlh.LinkSetName(iface, i.SrcName())
if err != nil {
log.G(context.TODO()).Debugf("LinkSetName failed for interface %s: %v", i.SrcName(), err)
return err
@@ -825,13 +829,13 @@ func (n *Namespace) RemoveInterface(i *Interface) error {
// if it is a bridge just delete it.
if i.Bridge() {
if err := n.nlHandle.LinkDel(iface); err != nil {
if err := nlh.LinkDel(iface); err != nil {
return fmt.Errorf("failed deleting bridge %q: %v", i.SrcName(), err)
}
} else if !n.isDefault {
} else if !isDefault {
// Move the network interface to caller namespace.
// TODO(aker): What's this really doing? There are no calls to LinkDel in this package: is this code really used? (Interface.Remove() has 3 callers); see https://github.com/moby/moby/pull/46315/commits/108595c2fe852a5264b78e96f9e63cda284990a6#r1331265335
if err := n.nlHandle.LinkSetNsFd(iface, ns.ParseHandlerInt()); err != nil {
if err := nlh.LinkSetNsFd(iface, ns.ParseHandlerInt()); err != nil {
log.G(context.TODO()).Debugf("LinkSetNsFd failed for interface %s: %v", i.SrcName(), err)
return err
}

View File

@@ -232,6 +232,7 @@ type Namespace struct {
defRoute4SrcName string
defRoute6SrcName string
staticRoutes []*types.StaticRoute
neighbors []*neigh
isDefault bool // isDefault is true when Namespace represents the host network namespace. It is safe to access it concurrently.
ipv6LoEnabledOnce sync.Once
ipv6LoEnabledCached bool

View File

@@ -1,12 +1,12 @@
package osl
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"os"
"strings"
"github.com/containerd/log"
"github.com/vishvananda/netlink"
@@ -14,115 +14,145 @@ import (
// NeighborSearchError indicates that the neighbor is already present
type NeighborSearchError struct {
ip net.IP
mac net.HardwareAddr
linkName string
present bool
ip net.IP
mac net.HardwareAddr
present bool
}
func (n NeighborSearchError) Error() string {
var b strings.Builder
b.WriteString("neighbor entry ")
if n.present {
b.WriteString("already exists ")
} else {
b.WriteString("not found ")
}
b.WriteString("for IP ")
b.WriteString(n.ip.String())
b.WriteString(", mac ")
b.WriteString(n.mac.String())
if n.linkName != "" {
b.WriteString(", link ")
b.WriteString(n.linkName)
}
return b.String()
return fmt.Sprintf("Search neighbor failed for IP %v, mac %v, present in db:%t", n.ip, n.mac, n.present)
}
// DeleteNeighbor deletes a neighbor entry from the sandbox.
//
// To delete an entry inserted by [AddNeighbor] the caller must provide the same
// parameters used to add it.
func (n *Namespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, options ...NeighOption) error {
nlnh, linkName, err := n.nlNeigh(dstIP, dstMac, options...)
if err != nil {
return err
type neigh struct {
dstIP net.IP
dstMac net.HardwareAddr
linkName string
linkDst string
family int
}
func (n *Namespace) findNeighbor(dstIP net.IP, dstMac net.HardwareAddr) *neigh {
n.mu.Lock()
defer n.mu.Unlock()
for _, nh := range n.neighbors {
if nh.dstIP.Equal(dstIP) && bytes.Equal(nh.dstMac, dstMac) {
return nh
}
}
if err := n.nlHandle.NeighDel(nlnh); err != nil {
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
"error": err,
}).Warn("error deleting neighbor entry")
if errors.Is(err, os.ErrNotExist) {
return NeighborSearchError{dstIP, dstMac, linkName, false}
return nil
}
// DeleteNeighbor deletes neighbor entry from the sandbox.
func (n *Namespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr) error {
nh := n.findNeighbor(dstIP, dstMac)
if nh == nil {
return NeighborSearchError{dstIP, dstMac, false}
}
n.mu.Lock()
nlh := n.nlHandle
n.mu.Unlock()
var linkIndex int
if nh.linkDst != "" {
iface, err := nlh.LinkByName(nh.linkDst)
if err != nil {
return fmt.Errorf("could not find interface with destination name %s: %v", nh.linkDst, err)
}
return fmt.Errorf("could not delete neighbor %+v: %w", nlnh, err)
linkIndex = iface.Attrs().Index
}
nlnh := &netlink.Neigh{
LinkIndex: linkIndex,
IP: dstIP,
State: netlink.NUD_PERMANENT,
Family: nh.family,
}
if nh.family > 0 {
nlnh.HardwareAddr = dstMac
nlnh.Flags = netlink.NTF_SELF
}
// If the kernel deletion fails for the neighbor entry still remove it
// from the namespace cache, otherwise kernel update can fail if the
// neighbor moves back to the same host again.
if err := nlh.NeighDel(nlnh); err != nil && !errors.Is(err, os.ErrNotExist) {
log.G(context.TODO()).Warnf("Deleting neighbor IP %s, mac %s failed, %v", dstIP, dstMac, err)
}
// Delete the dynamic entry in the bridge
if nlnh.Family > 0 {
nlnh.Flags = netlink.NTF_MASTER
if err := n.nlHandle.NeighDel(nlnh); err != nil && !errors.Is(err, os.ErrNotExist) {
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
"error": err,
}).Warn("error deleting dynamic neighbor entry")
if nh.family > 0 {
if err := nlh.NeighDel(&netlink.Neigh{
LinkIndex: linkIndex,
IP: dstIP,
Family: nh.family,
HardwareAddr: dstMac,
Flags: netlink.NTF_MASTER,
}); err != nil && !errors.Is(err, os.ErrNotExist) {
log.G(context.TODO()).WithError(err).Warn("error while deleting neighbor entry")
}
}
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
}).Debug("Neighbor entry deleted")
n.mu.Lock()
for i, neighbor := range n.neighbors {
if neighbor.dstIP.Equal(dstIP) && bytes.Equal(neighbor.dstMac, dstMac) {
n.neighbors = append(n.neighbors[:i], n.neighbors[i+1:]...)
break
}
}
n.mu.Unlock()
log.G(context.TODO()).Debugf("Neighbor entry deleted for IP %v, mac %v", dstIP, dstMac)
return nil
}
// AddNeighbor adds a neighbor entry into the sandbox.
func (n *Namespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, options ...NeighOption) error {
nlnh, linkName, err := n.nlNeigh(dstIP, dstMac, options...)
if err != nil {
return err
}
func (n *Namespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, force bool, options ...NeighOption) error {
var (
iface netlink.Link
err error
neighborAlreadyPresent bool
)
if err := n.nlHandle.NeighAdd(nlnh); err != nil {
if errors.Is(err, os.ErrExist) {
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
"neigh": fmt.Sprintf("%+v", nlnh),
}).Warn("Neighbor entry already present")
return NeighborSearchError{dstIP, dstMac, linkName, true}
} else {
return fmt.Errorf("could not add neighbor entry %+v: %w", nlnh, err)
// If the namespace already has the neighbor entry but the AddNeighbor is called
// because of a miss notification (force flag) program the kernel anyway.
nh := n.findNeighbor(dstIP, dstMac)
if nh != nil {
neighborAlreadyPresent = true
log.G(context.TODO()).Warnf("Neighbor entry already present for IP %v, mac %v neighbor:%+v forceUpdate:%t", dstIP, dstMac, nh, force)
if !force {
return NeighborSearchError{dstIP, dstMac, true}
}
}
log.G(context.TODO()).WithFields(log.Fields{
"ip": dstIP,
"mac": dstMac,
"ifc": linkName,
}).Debug("Neighbor entry added")
nh = &neigh{
dstIP: dstIP,
dstMac: dstMac,
}
return nil
}
type neigh struct {
linkName string
family int
}
func (n *Namespace) nlNeigh(dstIP net.IP, dstMac net.HardwareAddr, options ...NeighOption) (*netlink.Neigh, string, error) {
var nh neigh
nh.processNeighOptions(options...)
if nh.linkName != "" {
nh.linkDst = n.findDst(nh.linkName, false)
if nh.linkDst == "" {
return fmt.Errorf("could not find the interface with name %s", nh.linkName)
}
}
n.mu.Lock()
nlh := n.nlHandle
n.mu.Unlock()
if nh.linkDst != "" {
iface, err = nlh.LinkByName(nh.linkDst)
if err != nil {
return fmt.Errorf("could not find interface with destination name %s: %v", nh.linkDst, err)
}
}
nlnh := &netlink.Neigh{
IP: dstIP,
HardwareAddr: dstMac,
@@ -134,17 +164,22 @@ func (n *Namespace) nlNeigh(dstIP net.IP, dstMac net.HardwareAddr, options ...Ne
nlnh.Flags = netlink.NTF_SELF
}
if nh.linkName != "" {
linkDst := n.findDst(nh.linkName, false)
if linkDst == "" {
return nil, nh.linkName, fmt.Errorf("could not find the interface with name %s", nh.linkName)
}
iface, err := n.nlHandle.LinkByName(linkDst)
if err != nil {
return nil, nh.linkName, fmt.Errorf("could not find interface with destination name %s: %w", linkDst, err)
}
if nh.linkDst != "" {
nlnh.LinkIndex = iface.Attrs().Index
}
return nlnh, nh.linkName, nil
if err := nlh.NeighSet(nlnh); err != nil {
return fmt.Errorf("could not add neighbor entry:%+v error:%v", nlnh, err)
}
if neighborAlreadyPresent {
return nil
}
n.mu.Lock()
n.neighbors = append(n.neighbors, nh)
n.mu.Unlock()
log.G(context.TODO()).Debugf("Neighbor entry added for IP:%v, mac:%v on ifc:%s", dstIP, dstMac, nh.linkName)
return nil
}

View File

@@ -57,7 +57,7 @@ type service struct {
// associated with it. At stable state the endpoint ID expected is 1
// but during transition and service change it is possible to have
// temporary more than 1
ipToEndpoint setmatrix.SetMatrix[string, string]
ipToEndpoint setmatrix.SetMatrix[string]
deleted bool

View File

@@ -108,12 +108,16 @@ type Identity struct {
}
// Chown changes the numeric uid and gid of the named file to id.UID and id.GID.
//
// Deprecated: this method is deprecated and will be removed in the next release.
func (id Identity) Chown(name string) error {
return os.Chown(name, id.UID, id.GID)
}
// IdentityMapping contains a mappings of UIDs and GIDs.
// The zero value represents an empty mapping.
//
// Deprecated: this type is deprecated and will be removed in the next release.
type IdentityMapping struct {
UIDMaps []IDMap `json:"UIDMaps"`
GIDMaps []IDMap `json:"GIDMaps"`

View File

@@ -298,12 +298,17 @@ func isCIDRMatch(cidrs []*registry.NetIPNet, URLHost string) bool {
//
// It is used by the daemon to validate the daemon configuration.
func ValidateMirror(mirrorURL string) (string, error) {
// Fast path for missing scheme, as url.Parse splits by ":", which can
// cause the hostname to be considered the "scheme" when using "hostname:port".
if scheme, _, ok := strings.Cut(mirrorURL, "://"); !ok || scheme == "" {
return "", invalidParamf("invalid mirror: no scheme specified for %q: must use either 'https://' or 'http://'", mirrorURL)
}
uri, err := url.Parse(mirrorURL)
if err != nil {
return "", invalidParamWrapf(err, "invalid mirror: %q is not a valid URI", mirrorURL)
}
if uri.Scheme != "http" && uri.Scheme != "https" {
return "", invalidParamf("invalid mirror: unsupported scheme %q in %q", uri.Scheme, uri)
return "", invalidParamf("invalid mirror: unsupported scheme %q in %q: must use either 'https://' or 'http://'", uri.Scheme, uri)
}
if uri.RawQuery != "" || uri.Fragment != "" {
return "", invalidParamf("invalid mirror: query or fragment at end of the URI %q", uri)

View File

@@ -77,9 +77,17 @@ func TestValidateMirror(t *testing.T) {
input: "!invalid!://%as%",
expectedErr: `invalid mirror: "!invalid!://%as%" is not a valid URI: parse "!invalid!://%as%": first path segment in URL cannot contain colon`,
},
{
input: "mirror-1.example.com",
expectedErr: `invalid mirror: no scheme specified for "mirror-1.example.com": must use either 'https://' or 'http://'`,
},
{
input: "mirror-1.example.com:5000",
expectedErr: `invalid mirror: no scheme specified for "mirror-1.example.com:5000": must use either 'https://' or 'http://'`,
},
{
input: "ftp://mirror-1.example.com",
expectedErr: `invalid mirror: unsupported scheme "ftp" in "ftp://mirror-1.example.com"`,
expectedErr: `invalid mirror: unsupported scheme "ftp" in "ftp://mirror-1.example.com": must use either 'https://' or 'http://'`,
},
{
input: "http://mirror-1.example.com/?q=foo",
@@ -235,7 +243,7 @@ func TestNewServiceConfig(t *testing.T) {
opts: ServiceOptions{
Mirrors: []string{"example.com:5000"},
},
errStr: `invalid mirror: unsupported scheme "example.com" in "example.com:5000"`,
errStr: `invalid mirror: no scheme specified for "example.com:5000": must use either 'https://' or 'http://'`,
},
{
doc: "valid mirror",

View File

@@ -27,7 +27,7 @@ require (
github.com/cloudflare/cfssl v1.6.4
github.com/containerd/cgroups/v3 v3.0.5
github.com/containerd/containerd/api v1.9.0
github.com/containerd/containerd/v2 v2.1.2
github.com/containerd/containerd/v2 v2.1.3
github.com/containerd/continuity v0.4.5
github.com/containerd/errdefs v1.0.0
github.com/containerd/errdefs/pkg v0.3.0
@@ -62,14 +62,14 @@ require (
github.com/miekg/dns v1.1.66
github.com/mistifyio/go-zfs/v3 v3.0.1
github.com/mitchellh/copystructure v1.2.0
github.com/moby/buildkit v0.23.0-rc1
github.com/moby/buildkit v0.23.1
github.com/moby/docker-image-spec v1.3.1
github.com/moby/go-archive v0.1.0
github.com/moby/ipvs v1.1.0
github.com/moby/locker v1.0.1
github.com/moby/patternmatcher v0.6.0
github.com/moby/pubsub v1.0.0
github.com/moby/swarmkit/v2 v2.0.0-20250103191802-8c1959736554
github.com/moby/swarmkit/v2 v2.0.0
github.com/moby/sys/atomicwriter v0.1.0
github.com/moby/sys/mount v0.3.4
github.com/moby/sys/mountinfo v0.7.2
@@ -81,7 +81,7 @@ require (
github.com/moby/sys/userns v0.1.0
github.com/moby/term v0.5.2
github.com/morikuni/aec v1.0.0
github.com/opencontainers/cgroups v0.0.2
github.com/opencontainers/cgroups v0.0.3
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.1
github.com/opencontainers/runtime-spec v1.2.1

View File

@@ -125,8 +125,8 @@ github.com/containerd/console v1.0.5 h1:R0ymNeydRqH2DmakFNdmjR2k0t7UPuiOV/N/27/q
github.com/containerd/console v1.0.5/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk=
github.com/containerd/containerd/api v1.9.0 h1:HZ/licowTRazus+wt9fM6r/9BQO7S0vD5lMcWspGIg0=
github.com/containerd/containerd/api v1.9.0/go.mod h1:GhghKFmTR3hNtyznBoQ0EMWr9ju5AqHjcZPsSpTKutI=
github.com/containerd/containerd/v2 v2.1.2 h1:4ZQxB+FVYmwXZgpBcKfar6ieppm3KC5C6FRKvtJ6DRU=
github.com/containerd/containerd/v2 v2.1.2/go.mod h1:8C5QV9djwsYDNhxfTCFjWtTBZrqjditQ4/ghHSYjnHM=
github.com/containerd/containerd/v2 v2.1.3 h1:eMD2SLcIQPdMlnlNF6fatlrlRLAeDaiGPGwmRKLZKNs=
github.com/containerd/containerd/v2 v2.1.3/go.mod h1:8C5QV9djwsYDNhxfTCFjWtTBZrqjditQ4/ghHSYjnHM=
github.com/containerd/continuity v0.4.5 h1:ZRoN1sXq9u7V6QoHMcVWGhOwDFqZ4B9i5H6un1Wh0x4=
github.com/containerd/continuity v0.4.5/go.mod h1:/lNJvtJKUQStBzpVQ1+rasXO1LAWtUQssk28EZvJ3nE=
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
@@ -384,8 +384,8 @@ github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:F
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mndrix/tap-go v0.0.0-20171203230836-629fa407e90b/go.mod h1:pzzDgJWZ34fGzaAZGFW22KVZDfyrYW+QABMrWnJBnSs=
github.com/moby/buildkit v0.23.0-rc1 h1:RIAEITsycLbXUt//rEPEfZUFnKUcm1cvpuWOfOidiWU=
github.com/moby/buildkit v0.23.0-rc1/go.mod h1:v5jMDvQgUyidk3wu3NvVAAd5JJo83nfet9Gf/o0+EAQ=
github.com/moby/buildkit v0.23.1 h1:CZtFmPRF+IFG1C8QfPnktGO1Dzzt5JSwtQ5eDqIh+ag=
github.com/moby/buildkit v0.23.1/go.mod h1:keNXljNmKX1T0AtM0bMObc8OV6mA9cOuquVbPcRpU/Y=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/go-archive v0.1.0 h1:Kk/5rdW/g+H8NHdJW2gsXyZ7UnzvJNOy6VKJqueWdcQ=
@@ -398,8 +398,8 @@ github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkV
github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc=
github.com/moby/pubsub v1.0.0 h1:jkp/imWsmJz2f6LyFsk7EkVeN2HxR/HTTOY8kHrsxfA=
github.com/moby/pubsub v1.0.0/go.mod h1:bXSO+3h5MNXXCaEG+6/NlAIk7MMZbySZlnB+cUQhKKc=
github.com/moby/swarmkit/v2 v2.0.0-20250103191802-8c1959736554 h1:DMHJbgyNZWyrPKYjCYt2IxEO7KA0eSd4fo6KQsv2W84=
github.com/moby/swarmkit/v2 v2.0.0-20250103191802-8c1959736554/go.mod h1:mTTGIAz/59OGZR5Qe+QByIe3Nxc+sSuJkrsStFhr6Lg=
github.com/moby/swarmkit/v2 v2.0.0 h1:jkWQKQaJ4ltA61/mC9UdPe1McLma55RUcacTO+pPweY=
github.com/moby/swarmkit/v2 v2.0.0/go.mod h1:mTTGIAz/59OGZR5Qe+QByIe3Nxc+sSuJkrsStFhr6Lg=
github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw=
github.com/moby/sys/atomicwriter v0.1.0/go.mod h1:Ul8oqv2ZMNHOceF643P6FKPXeCmYtlQMvpizfsSoaWs=
github.com/moby/sys/mount v0.3.4 h1:yn5jq4STPztkkzSKpZkLcmjue+bZJ0u2AuQY1iNI1Ww=
@@ -440,8 +440,8 @@ github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOT
github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y=
github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opencontainers/cgroups v0.0.2 h1:A+mAPPMfgKNCEZUUtibESFx06uvhAmvo8sSz3Abwk7o=
github.com/opencontainers/cgroups v0.0.2/go.mod h1:s8lktyhlGUqM7OSRL5P7eAW6Wb+kWPNvt4qvVfzA5vs=
github.com/opencontainers/cgroups v0.0.3 h1:Jc9dWh/0YLGjdy6J/9Ln8NM5BfTA4W2BY0GMozy3aDU=
github.com/opencontainers/cgroups v0.0.3/go.mod h1:s8lktyhlGUqM7OSRL5P7eAW6Wb+kWPNvt4qvVfzA5vs=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=

View File

@@ -200,7 +200,7 @@ func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected dig
}
if size != 0 && copied < size-ws.Offset {
// Short writes would return its own error, this indicates a read failure
return fmt.Errorf("failed to read expected number of bytes: %w", io.ErrUnexpectedEOF)
return fmt.Errorf("short read: expected %d bytes but got %d: %w", size-ws.Offset, copied, io.ErrUnexpectedEOF)
}
if err := cw.Commit(ctx, size, expected, opts...); err != nil {
if errors.Is(err, ErrReset) {

View File

@@ -18,8 +18,12 @@ package docker
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
remoteerrors "github.com/containerd/containerd/v2/core/remotes/errors"
)
// ErrorCoder is the base interface for ErrorCode and Error allowing
@@ -281,3 +285,21 @@ func (errs *Errors) UnmarshalJSON(data []byte) error {
*errs = newErrs
return nil
}
func unexpectedResponseErr(resp *http.Response) (retErr error) {
retErr = remoteerrors.NewUnexpectedStatusErr(resp)
// Decode registry error if provided
if rerr := retErr.(remoteerrors.ErrUnexpectedStatus); len(rerr.Body) > 0 {
var registryErr Errors
if err := json.Unmarshal(rerr.Body, &registryErr); err == nil && registryErr.Len() > 0 {
// Join the unexpected error with the typed errors, when printed it will
// show the unexpected error message and the registry errors. The body
// is always excluded from the unexpected error message. This also allows
// clients to decode into either type.
retErr = errors.Join(rerr, registryErr)
}
}
return
}

View File

@@ -442,7 +442,7 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
chunkSize := int64(r.performances.ConcurrentLayerFetchBuffer)
parallelism := int64(r.performances.MaxConcurrentDownloads)
if chunkSize < minChunkSize {
if chunkSize < minChunkSize || req.body != nil {
parallelism = 1
}
log.G(ctx).WithField("initial_parallelism", r.performances.MaxConcurrentDownloads).
@@ -452,7 +452,9 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
Debug("fetching layer")
req.setMediaType(mediatype)
req.header.Set("Accept-Encoding", "zstd;q=1.0, gzip;q=0.8, deflate;q=0.5")
req.setOffset(offset)
if parallelism > 1 || offset > 0 {
req.setOffset(offset)
}
if err := r.Acquire(ctx, 1); err != nil {
return nil, err
@@ -478,7 +480,11 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
})
remaining, _ := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 0)
if parallelism > 1 && req.body == nil {
if remaining <= chunkSize {
parallelism = 1
}
if parallelism > 1 {
// If we have a content length, we can use multiple requests to fetch
// the content in parallel. This will make download of bigger bodies
// faster, at the cost of parallelism more requests and max

View File

@@ -36,7 +36,6 @@ import (
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
remoteserrors "github.com/containerd/containerd/v2/core/remotes/errors"
)
type dockerPusher struct {
@@ -149,8 +148,8 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
return nil, fmt.Errorf("content %v on remote: %w", desc.Digest, errdefs.ErrAlreadyExists)
}
} else if resp.StatusCode != http.StatusNotFound {
err := remoteserrors.NewUnexpectedStatusErr(resp)
log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response")
err := unexpectedResponseErr(resp)
log.G(ctx).WithError(err).Debug("unexpected response")
resp.Body.Close()
return nil, err
}
@@ -224,8 +223,8 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
})
return nil, fmt.Errorf("content %v on remote: %w", desc.Digest, errdefs.ErrAlreadyExists)
default:
err := remoteserrors.NewUnexpectedStatusErr(resp)
log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response")
err := unexpectedResponseErr(resp)
log.G(ctx).WithError(err).Debug("unexpected response")
return nil, err
}
@@ -299,8 +298,8 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusNoContent:
default:
err := remoteserrors.NewUnexpectedStatusErr(resp)
log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response")
err := unexpectedResponseErr(resp)
log.G(ctx).WithError(err).Debug("unexpected response")
pushw.setError(err)
return
}
@@ -513,7 +512,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
default:
return remoteserrors.NewUnexpectedStatusErr(resp)
return unexpectedResponseErr(resp)
}
status, err := pw.tracker.GetStatus(pw.ref)

View File

@@ -19,7 +19,6 @@ package docker
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
@@ -39,7 +38,6 @@ import (
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
remoteerrors "github.com/containerd/containerd/v2/core/remotes/errors"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/containerd/v2/pkg/reference"
"github.com/containerd/containerd/v2/pkg/tracing"
@@ -332,13 +330,13 @@ func (r *dockerResolver) Resolve(ctx context.Context, ref string) (string, ocisp
}
if resp.StatusCode > 399 {
if firstErrPriority < 3 {
firstErr = remoteerrors.NewUnexpectedStatusErr(resp)
firstErr = unexpectedResponseErr(resp)
firstErrPriority = 3
}
log.G(ctx).Infof("%s after status: %s", nextHostOrFail(i), resp.Status)
continue // try another host
}
return "", ocispec.Descriptor{}, remoteerrors.NewUnexpectedStatusErr(resp)
return "", ocispec.Descriptor{}, unexpectedResponseErr(resp)
}
size := resp.ContentLength
contentType := getManifestMediaType(resp)
@@ -653,11 +651,8 @@ func withErrorCheck(r *request, resp *http.Response) error {
if resp.StatusCode == http.StatusNotFound {
return fmt.Errorf("content at %v not found: %w", r.String(), errdefs.ErrNotFound)
}
var registryErr Errors
if err := json.NewDecoder(resp.Body).Decode(&registryErr); err != nil || registryErr.Len() < 1 {
return fmt.Errorf("unexpected status code %v: %v", r.String(), resp.Status)
}
return fmt.Errorf("unexpected status code %v: %s - Server message: %s", r.String(), resp.Status, registryErr.Error())
return unexpectedResponseErr(resp)
}
return nil
}

View File

@@ -20,16 +20,23 @@ import (
"fmt"
"io"
"net/http"
"github.com/containerd/typeurl/v2"
)
var _ error = ErrUnexpectedStatus{}
func init() {
typeurl.Register(&ErrUnexpectedStatus{}, "github.com/containerd/containerd/v2/core/remotes/errors", "ErrUnexpectedStatus")
}
// ErrUnexpectedStatus is returned if a registry API request returned with unexpected HTTP status
type ErrUnexpectedStatus struct {
Status string
StatusCode int
Body []byte
RequestURL, RequestMethod string
Status string `json:"status"`
StatusCode int `json:"statusCode"`
Body []byte `json:"body,omitempty"`
RequestURL string `json:"requestURL,omitempty"`
RequestMethod string `json:"requestMethod,omitempty"`
}
func (e ErrUnexpectedStatus) Error() string {

View File

@@ -28,15 +28,17 @@ import (
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
transfertypes "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/containerd/v2/core/transfer"
tstreaming "github.com/containerd/containerd/v2/core/transfer/streaming"
"github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/errdefs"
"github.com/containerd/errdefs/pkg/errgrpc"
"github.com/containerd/log"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl/v2"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/containerd/v2/core/transfer"
tstreaming "github.com/containerd/containerd/v2/core/transfer/streaming"
"github.com/containerd/containerd/v2/pkg/oci"
)
type proxyTransferrer struct {
@@ -150,7 +152,7 @@ func (p *proxyTransferrer) Transfer(ctx context.Context, src interface{}, dst in
Options: apiOpts,
}
_, err = p.client.Transfer(ctx, req)
return err
return errgrpc.ToNative(err)
}
func (p *proxyTransferrer) marshalAny(ctx context.Context, i interface{}) (typeurl.Any, error) {
switch m := i.(type) {

View File

@@ -24,7 +24,7 @@ var (
Package = "github.com/containerd/containerd/v2"
// Version holds the complete version number. Filled in at linking time.
Version = "2.1.2+unknown"
Version = "2.1.3+unknown"
// Revision is filled with the VCS (e.g. git) revision being used to build
// the program at linking time.

View File

@@ -12,6 +12,7 @@ import (
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
"github.com/containerd/containerd/v2/core/remotes/docker"
cerrdefs "github.com/containerd/errdefs"
distreference "github.com/distribution/reference"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/pb"
@@ -225,7 +226,7 @@ func (r *Resolver) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, er
// Resolve attempts to resolve the reference into a name and descriptor.
func (r *Resolver) Resolve(ctx context.Context, ref string) (string, ocispecs.Descriptor, error) {
if r.mode == ResolveModePreferLocal && r.is != nil {
if img, err := r.is.Get(ctx, ref); err == nil {
if img, err := getImageByRef(ctx, r.is, ref); err == nil {
return ref, img.Target, nil
}
}
@@ -237,7 +238,7 @@ func (r *Resolver) Resolve(ctx context.Context, ref string) (string, ocispecs.De
}
if r.mode == ResolveModeDefault && r.is != nil {
if img, err := r.is.Get(ctx, ref); err == nil {
if img, err := getImageByRef(ctx, r.is, ref); err == nil {
return ref, img.Target, nil
}
}
@@ -245,6 +246,30 @@ func (r *Resolver) Resolve(ctx context.Context, ref string) (string, ocispecs.De
return "", ocispecs.Descriptor{}, err
}
func getImageByRef(ctx context.Context, is images.Store, ref string) (images.Image, error) {
named, err := distreference.ParseNormalizedNamed(ref)
if err != nil {
return images.Image{}, err
}
name := named.Name()
tag := "latest"
if t, ok := named.(distreference.Tagged); ok {
tag = t.Tag()
}
name = name + ":" + tag
img, err := is.Get(ctx, name)
if err != nil {
return images.Image{}, err
}
if c, ok := named.(distreference.Canonical); ok {
if img.Target.Digest != c.Digest() {
return images.Image{}, errors.WithStack(cerrdefs.ErrNotFound)
}
}
return img, nil
}
type ResolveMode int
const (

View File

@@ -787,9 +787,11 @@ func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGrou
// Try to make branches even until either all branches are
// full, or all tasks have been scheduled.
for tasksScheduled != n && len(noRoom) != len(tree.next) {
converging := true
for tasksScheduled != n && len(noRoom) != len(tree.next) && converging {
desiredTasksPerBranch := (tasksInUsableBranches + n - tasksScheduled) / (len(tree.next) - len(noRoom))
remainder := (tasksInUsableBranches + n - tasksScheduled) % (len(tree.next) - len(noRoom))
converging = false
for _, subtree := range tree.next {
if noRoom != nil {
@@ -799,6 +801,7 @@ func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGrou
}
subtreeTasks := subtree.tasks
if subtreeTasks < desiredTasksPerBranch || (subtreeTasks == desiredTasksPerBranch && remainder > 0) {
converging = true
tasksToAssign := desiredTasksPerBranch - subtreeTasks
if remainder > 0 {
tasksToAssign++

View File

@@ -29,7 +29,7 @@ type Cgroup struct {
ScopePrefix string `json:"scope_prefix,omitempty"`
// Resources contains various cgroups settings to apply.
*Resources `json:"Resources,omitempty"`
*Resources
// Systemd tells if systemd should be used to manage cgroups.
Systemd bool `json:"Systemd,omitempty"`

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"strconv"
@@ -413,16 +414,30 @@ func WriteCgroupProc(dir string, pid int) error {
return err
}
// Since the OCI spec is designed for cgroup v1, in some cases
// there is need to convert from the cgroup v1 configuration to cgroup v2
// the formula for cpuShares is y = (1 + ((x - 2) * 9999) / 262142)
// convert from [2-262144] to [1-10000]
// 262144 comes from Linux kernel definition "#define MAX_SHARES (1UL << 18)"
// ConvertCPUSharesToCgroupV2Value converts CPU shares, used by cgroup v1,
// to CPU weight, used by cgroup v2.
//
// Cgroup v1 CPU shares has a range of [2^1...2^18], i.e. [2...262144],
// and the default value is 1024.
//
// Cgroup v2 CPU weight has a range of [10^0...10^4], i.e. [1...10000],
// and the default value is 100.
func ConvertCPUSharesToCgroupV2Value(cpuShares uint64) uint64 {
// The value of 0 means "unset".
if cpuShares == 0 {
return 0
}
return (1 + ((cpuShares-2)*9999)/262142)
if cpuShares <= 2 {
return 1
}
if cpuShares >= 262144 {
return 10000
}
l := math.Log2(float64(cpuShares))
// Quadratic function which fits min, max, and default.
exponent := (l*l+125*l)/612.0 - 7.0/34.0
return uint64(math.Ceil(math.Pow(10, exponent)))
}
// ConvertMemorySwapToCgroupV2Value converts MemorySwap value from OCI spec

8
vendor/modules.txt vendored
View File

@@ -323,7 +323,7 @@ github.com/containerd/containerd/api/types/runc/options
github.com/containerd/containerd/api/types/runtimeoptions/v1
github.com/containerd/containerd/api/types/task
github.com/containerd/containerd/api/types/transfer
# github.com/containerd/containerd/v2 v2.1.2
# github.com/containerd/containerd/v2 v2.1.3
## explicit; go 1.23.0
github.com/containerd/containerd/v2/client
github.com/containerd/containerd/v2/cmd/containerd/server/config
@@ -757,7 +757,7 @@ github.com/mitchellh/hashstructure/v2
# github.com/mitchellh/reflectwalk v1.0.2
## explicit
github.com/mitchellh/reflectwalk
# github.com/moby/buildkit v0.23.0-rc1
# github.com/moby/buildkit v0.23.1
## explicit; go 1.23.0
github.com/moby/buildkit/api/services/control
github.com/moby/buildkit/api/types
@@ -945,7 +945,7 @@ github.com/moby/patternmatcher/ignorefile
# github.com/moby/pubsub v1.0.0
## explicit; go 1.19
github.com/moby/pubsub
# github.com/moby/swarmkit/v2 v2.0.0-20250103191802-8c1959736554
# github.com/moby/swarmkit/v2 v2.0.0
## explicit; go 1.18
github.com/moby/swarmkit/v2/agent
github.com/moby/swarmkit/v2/agent/configs
@@ -1053,7 +1053,7 @@ github.com/morikuni/aec
# github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
## explicit
github.com/munnerz/goautoneg
# github.com/opencontainers/cgroups v0.0.2
# github.com/opencontainers/cgroups v0.0.3
## explicit; go 1.23.0
github.com/opencontainers/cgroups
github.com/opencontainers/cgroups/devices/config