mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
260 lines
6.7 KiB
Go
260 lines
6.7 KiB
Go
package datastore
|
|
|
|
import (
|
|
"errors"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
|
|
store "github.com/moby/moby/v2/daemon/libnetwork/internal/kvstore"
|
|
"github.com/moby/moby/v2/daemon/libnetwork/internal/kvstore/boltdb"
|
|
"github.com/moby/moby/v2/daemon/libnetwork/types"
|
|
)
|
|
|
|
// ErrKeyModified is raised for an atomic update when the update is working on a stale state
|
|
var (
|
|
ErrKeyModified = store.ErrKeyModified
|
|
ErrKeyNotFound = store.ErrKeyNotFound
|
|
)
|
|
|
|
type Store struct {
|
|
mu sync.Mutex
|
|
store store.Store
|
|
cache *cache
|
|
}
|
|
|
|
// KVObject is Key/Value interface used by objects to be part of the Store.
|
|
type KVObject interface {
|
|
// Key method lets an object provide the Key to be used in KV Store
|
|
Key() []string
|
|
// KeyPrefix method lets an object return immediate parent key that can be used for tree walk
|
|
KeyPrefix() []string
|
|
// Value method lets an object marshal its content to be stored in the KV store
|
|
Value() []byte
|
|
// SetValue is used by the datastore to set the object's value when loaded from the data store.
|
|
SetValue([]byte) error
|
|
// Index method returns the latest DB Index as seen by the object
|
|
Index() uint64
|
|
// SetIndex method allows the datastore to store the latest DB Index into the object
|
|
SetIndex(uint64)
|
|
// Exists returns true if the object exists in the datastore, false if it hasn't been stored yet.
|
|
// When SetIndex() is called, the object has been stored.
|
|
Exists() bool
|
|
// Skip provides a way for a KV Object to avoid persisting it in the KV Store
|
|
Skip() bool
|
|
// New returns a new object which is created based on the
|
|
// source object
|
|
New() KVObject
|
|
// CopyTo deep copies the contents of the implementing object
|
|
// to the passed destination object
|
|
CopyTo(KVObject) error
|
|
}
|
|
|
|
const (
|
|
// NetworkKeyPrefix is the prefix for network key in the kv store
|
|
NetworkKeyPrefix = "network"
|
|
// EndpointKeyPrefix is the prefix for endpoint key in the kv store
|
|
EndpointKeyPrefix = "endpoint"
|
|
)
|
|
|
|
var (
|
|
defaultRootChain = []string{"docker", "network", "v1.0"}
|
|
rootChain = defaultRootChain
|
|
)
|
|
|
|
const DefaultBucket = "libnetwork"
|
|
|
|
// Key provides convenient method to create a Key
|
|
func Key(key ...string) string {
|
|
var b strings.Builder
|
|
for _, parts := range [][]string{rootChain, key} {
|
|
for _, part := range parts {
|
|
b.WriteString(part)
|
|
b.WriteString("/")
|
|
}
|
|
}
|
|
return b.String()
|
|
}
|
|
|
|
// New creates a new Store instance.
|
|
func New(dir, bucket string) (*Store, error) {
|
|
if dir == "" {
|
|
return nil, errors.New("empty dir")
|
|
}
|
|
if bucket == "" {
|
|
return nil, errors.New("empty bucket")
|
|
}
|
|
|
|
s, err := boltdb.New(path.Join(dir, "local-kv.db"), bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Store{store: s, cache: newCache(s)}, nil
|
|
}
|
|
|
|
// Close closes the data store.
|
|
func (ds *Store) Close() {
|
|
ds.store.Close()
|
|
}
|
|
|
|
// PutObjectAtomic provides an atomic add and update operation for a Record.
|
|
func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
|
|
ds.mu.Lock()
|
|
defer ds.mu.Unlock()
|
|
|
|
if kvObject == nil {
|
|
return types.InvalidParameterErrorf("invalid KV Object: nil")
|
|
}
|
|
|
|
kvObjValue := kvObject.Value()
|
|
|
|
if kvObjValue == nil {
|
|
return types.InvalidParameterErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
|
|
}
|
|
|
|
if !kvObject.Skip() {
|
|
var previous *store.KVPair
|
|
if kvObject.Exists() {
|
|
previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
|
|
}
|
|
|
|
pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
|
|
if err != nil {
|
|
if errors.Is(err, store.ErrKeyExists) {
|
|
return ErrKeyModified
|
|
}
|
|
return err
|
|
}
|
|
|
|
kvObject.SetIndex(pair.LastIndex)
|
|
}
|
|
|
|
// If persistent store is skipped, sequencing needs to
|
|
// happen in cache.
|
|
return ds.cache.add(kvObject, kvObject.Skip())
|
|
}
|
|
|
|
// GetObject gets data from the store and unmarshals to the specified object.
|
|
func (ds *Store) GetObject(o KVObject) error {
|
|
ds.mu.Lock()
|
|
defer ds.mu.Unlock()
|
|
|
|
return ds.cache.get(o)
|
|
}
|
|
|
|
func (ds *Store) ensureParent(parent string) error {
|
|
exists, err := ds.store.Exists(parent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if exists {
|
|
return nil
|
|
}
|
|
return ds.store.Put(parent, []byte{})
|
|
}
|
|
|
|
// List returns of a list of KVObjects belonging to the parent key. The caller
|
|
// must pass a KVObject of the same type as the objects that need to be listed.
|
|
func (ds *Store) List(kvObject KVObject) ([]KVObject, error) {
|
|
ds.mu.Lock()
|
|
defer ds.mu.Unlock()
|
|
|
|
return ds.cache.list(kvObject)
|
|
}
|
|
|
|
func (ds *Store) iterateKVPairsFromStore(key string, ctor KVObject, callback func(string, KVObject)) error {
|
|
// Make sure the parent key exists
|
|
if err := ds.ensureParent(key); err != nil {
|
|
return err
|
|
}
|
|
|
|
kvList, err := ds.store.List(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, kvPair := range kvList {
|
|
if len(kvPair.Value) == 0 {
|
|
continue
|
|
}
|
|
|
|
dstO := ctor.New()
|
|
if err := dstO.SetValue(kvPair.Value); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Make sure the object has a correct view of the DB index in
|
|
// case we need to modify it and update the DB.
|
|
dstO.SetIndex(kvPair.LastIndex)
|
|
callback(kvPair.Key, dstO)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Map returns a Map of KVObjects.
|
|
func (ds *Store) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
|
|
ds.mu.Lock()
|
|
defer ds.mu.Unlock()
|
|
|
|
results := map[string]KVObject{}
|
|
err := ds.iterateKVPairsFromStore(key, kvObject, func(key string, val KVObject) {
|
|
// Trim the leading & trailing "/" to make it consistent across all stores
|
|
results[strings.Trim(key, "/")] = val
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
// DeleteObject deletes a kvObject from the on-disk DB and the in-memory cache.
|
|
// Unlike DeleteObjectAtomic, it doesn't check the optimistic lock of the
|
|
// passed kvObject.
|
|
func (ds *Store) DeleteObject(kvObject KVObject) error {
|
|
ds.mu.Lock()
|
|
defer ds.mu.Unlock()
|
|
|
|
if kvObject == nil {
|
|
return types.InvalidParameterErrorf("invalid KV Object: nil")
|
|
}
|
|
|
|
if !kvObject.Skip() {
|
|
if err := ds.store.Delete(Key(kvObject.Key()...)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// cleanup the cache only if AtomicDelete went through successfully
|
|
// If persistent store is skipped, sequencing needs to
|
|
// happen in cache.
|
|
return ds.cache.del(kvObject, false)
|
|
}
|
|
|
|
// DeleteObjectAtomic performs atomic delete on a record.
|
|
func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
|
|
ds.mu.Lock()
|
|
defer ds.mu.Unlock()
|
|
|
|
if kvObject == nil {
|
|
return types.InvalidParameterErrorf("invalid KV Object: nil")
|
|
}
|
|
|
|
previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
|
|
|
|
if !kvObject.Skip() {
|
|
if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
|
|
if errors.Is(err, store.ErrKeyExists) {
|
|
return ErrKeyModified
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
// cleanup the cache only if AtomicDelete went through successfully
|
|
// If persistent store is skipped, sequencing needs to
|
|
// happen in cache.
|
|
return ds.cache.del(kvObject, kvObject.Skip())
|
|
}
|