mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
When handling updates to existing entries, it is often necessary to know what the previous value was. NetworkDB knows the previous and new values when it broadcasts an update event for an entry. Include both values in the update event so the watchers do not have to do their own parallel bookkeeping. Unify the event types under WatchEvent as representing the operation kind in the type system has been inconvenient, not useful. The operation is now implied by the nilness of the Value and Prev event fields. Signed-off-by: Cory Snider <csnider@mirantis.com>
331 lines
11 KiB
Go
331 lines
11 KiB
Go
package networkdb
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/docker/go-events"
|
|
"github.com/hashicorp/memberlist"
|
|
"github.com/hashicorp/serf/serf"
|
|
"gotest.tools/v3/assert"
|
|
is "gotest.tools/v3/assert/cmp"
|
|
)
|
|
|
|
func TestWatch_out_of_order(t *testing.T) {
|
|
nDB := newNetworkDB(DefaultConfig())
|
|
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
|
|
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
|
|
assert.Assert(t, nDB.JoinNetwork("network1"))
|
|
|
|
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
|
|
Name: "node1",
|
|
Addr: net.IPv4(1, 2, 3, 4),
|
|
})
|
|
|
|
d := &delegate{nDB}
|
|
|
|
msgs := messageBuffer{t: t}
|
|
appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1")
|
|
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
|
|
Type: NetworkEventTypeJoin,
|
|
LTime: 1,
|
|
NodeName: "node1",
|
|
NetworkID: "network1",
|
|
})
|
|
appendTableEvent(1, TableEventTypeCreate, "tombstone1", []byte("a"))
|
|
appendTableEvent(2, TableEventTypeDelete, "tombstone1", []byte("b"))
|
|
appendTableEvent(3, TableEventTypeCreate, "key1", []byte("value1"))
|
|
d.NotifyMsg(msgs.Compound())
|
|
msgs.Reset()
|
|
|
|
nDB.CreateEntry("table1", "network1", "local1", []byte("should not see me in watch events"))
|
|
watch, cancel := nDB.Watch("table1", "network1")
|
|
defer cancel()
|
|
|
|
got := drainChannel(watch.C)
|
|
assert.Check(t, is.DeepEqual(got, []events.Event{
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key1", Value: []byte("value1")},
|
|
}))
|
|
|
|
// Receive events from node1, with events not received or received out of order
|
|
// Create, (hidden update), delete
|
|
appendTableEvent(4, TableEventTypeCreate, "key2", []byte("a"))
|
|
appendTableEvent(6, TableEventTypeDelete, "key2", []byte("b"))
|
|
// (Hidden recreate), delete
|
|
appendTableEvent(8, TableEventTypeDelete, "key2", []byte("c"))
|
|
// (Hidden recreate), update
|
|
appendTableEvent(10, TableEventTypeUpdate, "key2", []byte("d"))
|
|
|
|
// Update, create
|
|
appendTableEvent(11, TableEventTypeUpdate, "key3", []byte("b"))
|
|
appendTableEvent(10, TableEventTypeCreate, "key3", []byte("a"))
|
|
|
|
// (Hidden create), update, update
|
|
appendTableEvent(13, TableEventTypeUpdate, "key4", []byte("b"))
|
|
appendTableEvent(14, TableEventTypeUpdate, "key4", []byte("c"))
|
|
|
|
// Delete, create
|
|
appendTableEvent(16, TableEventTypeDelete, "key5", []byte("a"))
|
|
appendTableEvent(15, TableEventTypeCreate, "key5", []byte("a"))
|
|
// (Hidden recreate), delete
|
|
appendTableEvent(18, TableEventTypeDelete, "key5", []byte("b"))
|
|
|
|
d.NotifyMsg(msgs.Compound())
|
|
msgs.Reset()
|
|
|
|
got = drainChannel(watch.C)
|
|
assert.Check(t, is.DeepEqual(got, []events.Event{
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")},
|
|
// Delete value should match last observed value,
|
|
// irrespective of the content of the delete event over the wire.
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key2", Prev: []byte("a")},
|
|
// Updates to previously-deleted keys should be observed as creates.
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("d")},
|
|
|
|
// Out-of-order update events should be observed as creates.
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key3", Value: []byte("b")},
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key4", Value: []byte("b")},
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key4", Prev: []byte("b"), Value: []byte("c")},
|
|
|
|
// key5 should not appear in the events.
|
|
}))
|
|
}
|
|
|
|
func TestWatch_filters(t *testing.T) {
|
|
nDB := newNetworkDB(DefaultConfig())
|
|
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
|
|
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
|
|
assert.Assert(t, nDB.JoinNetwork("network1"))
|
|
assert.Assert(t, nDB.JoinNetwork("network2"))
|
|
|
|
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
|
|
Name: "node1",
|
|
Addr: net.IPv4(1, 2, 3, 4),
|
|
})
|
|
|
|
var ltime serf.LamportClock
|
|
msgs := messageBuffer{t: t}
|
|
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
|
|
Type: NetworkEventTypeJoin,
|
|
LTime: ltime.Increment(),
|
|
NodeName: "node1",
|
|
NetworkID: "network1",
|
|
})
|
|
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
|
|
Type: NetworkEventTypeJoin,
|
|
LTime: ltime.Increment(),
|
|
NodeName: "node1",
|
|
NetworkID: "network2",
|
|
})
|
|
for _, nid := range []string{"network1", "network2"} {
|
|
for _, tname := range []string{"table1", "table2"} {
|
|
msgs.Append(MessageTypeTableEvent, &TableEvent{
|
|
Type: TableEventTypeCreate,
|
|
LTime: ltime.Increment(),
|
|
NodeName: "node1",
|
|
NetworkID: nid,
|
|
TableName: tname,
|
|
Key: nid + "." + tname + ".dead",
|
|
Value: []byte("deaddead"),
|
|
})
|
|
msgs.Append(MessageTypeTableEvent, &TableEvent{
|
|
Type: TableEventTypeDelete,
|
|
LTime: ltime.Increment(),
|
|
NodeName: "node1",
|
|
NetworkID: nid,
|
|
TableName: tname,
|
|
Key: nid + "." + tname + ".dead",
|
|
Value: []byte("deaddead"),
|
|
})
|
|
msgs.Append(MessageTypeTableEvent, &TableEvent{
|
|
Type: TableEventTypeCreate,
|
|
LTime: ltime.Increment(),
|
|
NodeName: "node1",
|
|
NetworkID: nid,
|
|
TableName: tname,
|
|
Key: nid + "." + tname + ".update",
|
|
Value: []byte("initial"),
|
|
})
|
|
msgs.Append(MessageTypeTableEvent, &TableEvent{
|
|
Type: TableEventTypeCreate,
|
|
LTime: ltime.Increment(),
|
|
NodeName: "node1",
|
|
NetworkID: nid,
|
|
TableName: tname,
|
|
Key: nid + "." + tname,
|
|
Value: []byte("a"),
|
|
})
|
|
msgs.Append(MessageTypeTableEvent, &TableEvent{
|
|
Type: TableEventTypeUpdate,
|
|
LTime: ltime.Increment(),
|
|
NodeName: "node1",
|
|
NetworkID: nid,
|
|
TableName: tname,
|
|
Key: nid + "." + tname + ".update",
|
|
Value: []byte("updated"),
|
|
})
|
|
}
|
|
}
|
|
(&delegate{nDB}).NotifyMsg(msgs.Compound())
|
|
|
|
watchAll, cancel := nDB.Watch("", "")
|
|
defer cancel()
|
|
watchNetwork1Tables, cancel := nDB.Watch("", "network1")
|
|
defer cancel()
|
|
watchTable1AllNetworks, cancel := nDB.Watch("table1", "")
|
|
defer cancel()
|
|
watchTable1Network1, cancel := nDB.Watch("table1", "network1")
|
|
defer cancel()
|
|
|
|
var gotAll, gotNetwork1Tables, gotTable1AllNetworks, gotTable1Network1 []events.Event
|
|
L:
|
|
for {
|
|
select {
|
|
case ev := <-watchAll.C:
|
|
gotAll = append(gotAll, ev)
|
|
case ev := <-watchNetwork1Tables.C:
|
|
gotNetwork1Tables = append(gotNetwork1Tables, ev)
|
|
case ev := <-watchTable1AllNetworks.C:
|
|
gotTable1AllNetworks = append(gotTable1AllNetworks, ev)
|
|
case ev := <-watchTable1Network1.C:
|
|
gotTable1Network1 = append(gotTable1Network1, ev)
|
|
case <-time.After(time.Second):
|
|
break L
|
|
}
|
|
}
|
|
|
|
assert.Check(t, is.DeepEqual(gotAll, []events.Event{
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")},
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")},
|
|
WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")},
|
|
WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")},
|
|
WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")},
|
|
WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")},
|
|
WatchEvent{Table: "table2", NetworkID: "network2", Key: "network2.table2", Value: []byte("a")},
|
|
WatchEvent{Table: "table2", NetworkID: "network2", Key: "network2.table2.update", Value: []byte("updated")},
|
|
}))
|
|
assert.Check(t, is.DeepEqual(gotNetwork1Tables, []events.Event{
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")},
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")},
|
|
WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")},
|
|
WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")},
|
|
}))
|
|
assert.Check(t, is.DeepEqual(gotTable1AllNetworks, []events.Event{
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")},
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")},
|
|
WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")},
|
|
WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")},
|
|
}))
|
|
assert.Check(t, is.DeepEqual(gotTable1Network1, []events.Event{
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")},
|
|
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")},
|
|
}))
|
|
}
|
|
|
|
func TestLeaveRejoinOutOfOrder(t *testing.T) {
|
|
// Regression test for https://github.com/moby/moby/issues/47728
|
|
|
|
nDB := newNetworkDB(DefaultConfig())
|
|
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
|
|
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
|
|
assert.Assert(t, nDB.JoinNetwork("network1"))
|
|
|
|
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
|
|
Name: "node1",
|
|
Addr: net.IPv4(1, 2, 3, 4),
|
|
})
|
|
|
|
d := &delegate{nDB}
|
|
|
|
msgs := messageBuffer{t: t}
|
|
appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1")
|
|
|
|
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
|
|
Type: NetworkEventTypeJoin,
|
|
LTime: 1,
|
|
NodeName: "node1",
|
|
NetworkID: "network1",
|
|
})
|
|
// Simulate node1 leaving, rejoining, and creating an entry,
|
|
// but the table events are broadcast before the network events.
|
|
appendTableEvent(1, TableEventTypeCreate, "key1", []byte("a"))
|
|
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
|
|
Type: NetworkEventTypeLeave,
|
|
LTime: 2,
|
|
NodeName: "node1",
|
|
NetworkID: "network1",
|
|
})
|
|
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
|
|
Type: NetworkEventTypeJoin,
|
|
LTime: 3,
|
|
NodeName: "node1",
|
|
NetworkID: "network1",
|
|
})
|
|
// Simulate a bulk sync or receiving a rebroadcasted copy of the table
|
|
// event from another node.
|
|
appendTableEvent(1, TableEventTypeCreate, "key1", []byte("a"))
|
|
|
|
d.NotifyMsg(msgs.Compound())
|
|
|
|
got := make(map[string]string)
|
|
nDB.WalkTable("table1", func(nw, key string, value []byte, deleted bool) bool {
|
|
got[nw+"/"+key] = fmt.Sprintf("%s (deleted=%t)", value, deleted)
|
|
return false
|
|
})
|
|
want := map[string]string{
|
|
"network1/key1": "a (deleted=false)",
|
|
}
|
|
assert.Check(t, is.DeepEqual(got, want))
|
|
}
|
|
|
|
func drainChannel(ch <-chan events.Event) []events.Event {
|
|
var events []events.Event
|
|
for {
|
|
select {
|
|
case ev := <-ch:
|
|
events = append(events, ev)
|
|
case <-time.After(time.Second):
|
|
return events
|
|
}
|
|
}
|
|
}
|
|
|
|
type messageBuffer struct {
|
|
t *testing.T
|
|
msgs [][]byte
|
|
}
|
|
|
|
func (mb *messageBuffer) Append(typ MessageType, msg any) {
|
|
mb.t.Helper()
|
|
buf, err := encodeMessage(typ, msg)
|
|
if err != nil {
|
|
mb.t.Fatalf("failed to encode message: %v", err)
|
|
}
|
|
mb.msgs = append(mb.msgs, buf)
|
|
}
|
|
|
|
func (mb *messageBuffer) Compound() []byte {
|
|
return makeCompoundMessage(mb.msgs)
|
|
}
|
|
|
|
func (mb *messageBuffer) Reset() {
|
|
mb.msgs = nil
|
|
}
|
|
|
|
func tableEventHelper(mb *messageBuffer, nodeName, networkID, tableName string) func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
|
|
return func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
|
|
mb.t.Helper()
|
|
mb.Append(MessageTypeTableEvent, &TableEvent{
|
|
Type: typ,
|
|
LTime: ltime,
|
|
NodeName: nodeName,
|
|
NetworkID: networkID,
|
|
TableName: tableName,
|
|
Key: key,
|
|
Value: value,
|
|
})
|
|
}
|
|
}
|