mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
Most of the code in the filters package relates to the unmarshaling, validation and application of filters from client requests. None of this is necessary or particularly useful for Go SDK users. Move the full-fat filters package into daemon/internal and switch all the daemon code to import that package so we are free to iterate upon the code without worrying about source-code interface compatibility. Signed-off-by: Cory Snider <csnider@mirantis.com>
287 lines
11 KiB
Go
287 lines
11 KiB
Go
package daemon
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/containerd/log"
|
|
gogotypes "github.com/gogo/protobuf/types"
|
|
"github.com/moby/moby/api/types/events"
|
|
"github.com/moby/moby/v2/daemon/container"
|
|
daemonevents "github.com/moby/moby/v2/daemon/events"
|
|
"github.com/moby/moby/v2/daemon/internal/filters"
|
|
"github.com/moby/moby/v2/daemon/libnetwork"
|
|
swarmapi "github.com/moby/swarmkit/v2/api"
|
|
)
|
|
|
|
// LogContainerEvent generates an event related to a container with only the default attributes.
|
|
func (daemon *Daemon) LogContainerEvent(container *container.Container, action events.Action) {
|
|
daemon.LogContainerEventWithAttributes(container, action, map[string]string{})
|
|
}
|
|
|
|
// LogContainerEventWithAttributes generates an event related to a container with specific given attributes.
|
|
func (daemon *Daemon) LogContainerEventWithAttributes(container *container.Container, action events.Action, attributes map[string]string) {
|
|
copyAttributes(attributes, container.Config.Labels)
|
|
if container.Config.Image != "" {
|
|
attributes["image"] = container.Config.Image
|
|
}
|
|
attributes["name"] = strings.TrimPrefix(container.Name, "/")
|
|
daemon.EventsService.Log(action, events.ContainerEventType, events.Actor{
|
|
ID: container.ID,
|
|
Attributes: attributes,
|
|
})
|
|
}
|
|
|
|
// LogPluginEvent generates an event related to a plugin with only the default attributes.
|
|
func (daemon *Daemon) LogPluginEvent(pluginID, refName string, action events.Action) {
|
|
daemon.EventsService.Log(action, events.PluginEventType, events.Actor{
|
|
ID: pluginID,
|
|
Attributes: map[string]string{"name": refName},
|
|
})
|
|
}
|
|
|
|
// LogVolumeEvent generates an event related to a volume.
|
|
func (daemon *Daemon) LogVolumeEvent(volumeID string, action events.Action, attributes map[string]string) {
|
|
daemon.EventsService.Log(action, events.VolumeEventType, events.Actor{
|
|
ID: volumeID,
|
|
Attributes: attributes,
|
|
})
|
|
}
|
|
|
|
// LogNetworkEvent generates an event related to a network with only the default attributes.
|
|
func (daemon *Daemon) LogNetworkEvent(nw *libnetwork.Network, action events.Action) {
|
|
daemon.LogNetworkEventWithAttributes(nw, action, map[string]string{})
|
|
}
|
|
|
|
// LogNetworkEventWithAttributes generates an event related to a network with specific given attributes.
|
|
func (daemon *Daemon) LogNetworkEventWithAttributes(nw *libnetwork.Network, action events.Action, attributes map[string]string) {
|
|
attributes["name"] = nw.Name()
|
|
attributes["type"] = nw.Type()
|
|
daemon.EventsService.Log(action, events.NetworkEventType, events.Actor{
|
|
ID: nw.ID(),
|
|
Attributes: attributes,
|
|
})
|
|
}
|
|
|
|
// LogDaemonEventWithAttributes generates an event related to the daemon itself with specific given attributes.
|
|
func (daemon *Daemon) LogDaemonEventWithAttributes(action events.Action, attributes map[string]string) {
|
|
if daemon.EventsService != nil {
|
|
if name := hostName(context.TODO()); name != "" {
|
|
attributes["name"] = name
|
|
}
|
|
daemon.EventsService.Log(action, events.DaemonEventType, events.Actor{
|
|
ID: daemon.id,
|
|
Attributes: attributes,
|
|
})
|
|
}
|
|
}
|
|
|
|
// SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events.
|
|
func (daemon *Daemon) SubscribeToEvents(since, until time.Time, filter filters.Args) ([]events.Message, chan any) {
|
|
return daemon.EventsService.SubscribeTopic(since, until, daemonevents.NewFilter(filter))
|
|
}
|
|
|
|
// UnsubscribeFromEvents stops the event subscription for a client by closing the
|
|
// channel where the daemon sends events to.
|
|
func (daemon *Daemon) UnsubscribeFromEvents(listener chan any) {
|
|
daemon.EventsService.Evict(listener)
|
|
}
|
|
|
|
// copyAttributes guarantees that labels are not mutated by event triggers.
|
|
func copyAttributes(attributes, labels map[string]string) {
|
|
if labels == nil {
|
|
return
|
|
}
|
|
for k, v := range labels {
|
|
attributes[k] = v
|
|
}
|
|
}
|
|
|
|
// ProcessClusterNotifications gets changes from store and add them to event list
|
|
func (daemon *Daemon) ProcessClusterNotifications(ctx context.Context, watchStream chan *swarmapi.WatchMessage) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case message, ok := <-watchStream:
|
|
if !ok {
|
|
log.G(ctx).Debug("cluster event channel has stopped")
|
|
return
|
|
}
|
|
daemon.generateClusterEvent(message)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (daemon *Daemon) generateClusterEvent(msg *swarmapi.WatchMessage) {
|
|
for _, event := range msg.Events {
|
|
if event.Object == nil {
|
|
log.G(context.TODO()).Errorf("event without object: %v", event)
|
|
continue
|
|
}
|
|
switch v := event.Object.GetObject().(type) {
|
|
case *swarmapi.Object_Node:
|
|
daemon.logNodeEvent(event.Action, v.Node, event.OldObject.GetNode())
|
|
case *swarmapi.Object_Service:
|
|
daemon.logServiceEvent(event.Action, v.Service, event.OldObject.GetService())
|
|
case *swarmapi.Object_Network:
|
|
daemon.logNetworkEvent(event.Action, v.Network)
|
|
case *swarmapi.Object_Secret:
|
|
daemon.logSecretEvent(event.Action, v.Secret)
|
|
case *swarmapi.Object_Config:
|
|
daemon.logConfigEvent(event.Action, v.Config)
|
|
default:
|
|
log.G(context.TODO()).Warnf("unrecognized event: %v", event)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (daemon *Daemon) logNetworkEvent(action swarmapi.WatchActionKind, net *swarmapi.Network) {
|
|
daemon.logClusterEvent(action, net.ID, events.NetworkEventType, eventTimestamp(net.Meta, action), map[string]string{
|
|
"name": net.Spec.Annotations.Name,
|
|
})
|
|
}
|
|
|
|
func (daemon *Daemon) logSecretEvent(action swarmapi.WatchActionKind, secret *swarmapi.Secret) {
|
|
daemon.logClusterEvent(action, secret.ID, events.SecretEventType, eventTimestamp(secret.Meta, action), map[string]string{
|
|
"name": secret.Spec.Annotations.Name,
|
|
})
|
|
}
|
|
|
|
func (daemon *Daemon) logConfigEvent(action swarmapi.WatchActionKind, config *swarmapi.Config) {
|
|
daemon.logClusterEvent(action, config.ID, events.ConfigEventType, eventTimestamp(config.Meta, action), map[string]string{
|
|
"name": config.Spec.Annotations.Name,
|
|
})
|
|
}
|
|
|
|
func (daemon *Daemon) logNodeEvent(action swarmapi.WatchActionKind, node *swarmapi.Node, oldNode *swarmapi.Node) {
|
|
name := node.Spec.Annotations.Name
|
|
if name == "" && node.Description != nil {
|
|
name = node.Description.Hostname
|
|
}
|
|
attributes := map[string]string{
|
|
"name": name,
|
|
}
|
|
eventTime := eventTimestamp(node.Meta, action)
|
|
// In an update event, display the changes in attributes
|
|
if action == swarmapi.WatchActionKindUpdate && oldNode != nil {
|
|
if node.Spec.Availability != oldNode.Spec.Availability {
|
|
attributes["availability.old"] = strings.ToLower(oldNode.Spec.Availability.String())
|
|
attributes["availability.new"] = strings.ToLower(node.Spec.Availability.String())
|
|
}
|
|
if node.Role != oldNode.Role {
|
|
attributes["role.old"] = strings.ToLower(oldNode.Role.String())
|
|
attributes["role.new"] = strings.ToLower(node.Role.String())
|
|
}
|
|
if node.Status.State != oldNode.Status.State {
|
|
attributes["state.old"] = strings.ToLower(oldNode.Status.State.String())
|
|
attributes["state.new"] = strings.ToLower(node.Status.State.String())
|
|
}
|
|
// This handles change within manager role
|
|
if node.ManagerStatus != nil && oldNode.ManagerStatus != nil {
|
|
// leader change
|
|
if node.ManagerStatus.Leader != oldNode.ManagerStatus.Leader {
|
|
if node.ManagerStatus.Leader {
|
|
attributes["leader.old"] = "false"
|
|
attributes["leader.new"] = "true"
|
|
} else {
|
|
attributes["leader.old"] = "true"
|
|
attributes["leader.new"] = "false"
|
|
}
|
|
}
|
|
if node.ManagerStatus.Reachability != oldNode.ManagerStatus.Reachability {
|
|
attributes["reachability.old"] = strings.ToLower(oldNode.ManagerStatus.Reachability.String())
|
|
attributes["reachability.new"] = strings.ToLower(node.ManagerStatus.Reachability.String())
|
|
}
|
|
}
|
|
}
|
|
|
|
daemon.logClusterEvent(action, node.ID, events.NodeEventType, eventTime, attributes)
|
|
}
|
|
|
|
func (daemon *Daemon) logServiceEvent(action swarmapi.WatchActionKind, service *swarmapi.Service, oldService *swarmapi.Service) {
|
|
attributes := map[string]string{
|
|
"name": service.Spec.Annotations.Name,
|
|
}
|
|
eventTime := eventTimestamp(service.Meta, action)
|
|
|
|
if action == swarmapi.WatchActionKindUpdate && oldService != nil {
|
|
// check image
|
|
if x, ok := service.Spec.Task.GetRuntime().(*swarmapi.TaskSpec_Container); ok {
|
|
containerSpec := x.Container
|
|
if y, ok := oldService.Spec.Task.GetRuntime().(*swarmapi.TaskSpec_Container); ok {
|
|
oldContainerSpec := y.Container
|
|
if containerSpec.Image != oldContainerSpec.Image {
|
|
attributes["image.old"] = oldContainerSpec.Image
|
|
attributes["image.new"] = containerSpec.Image
|
|
}
|
|
} else {
|
|
// This should not happen.
|
|
log.G(context.TODO()).Errorf("service %s runtime changed from %T to %T", service.Spec.Annotations.Name, oldService.Spec.Task.GetRuntime(), service.Spec.Task.GetRuntime())
|
|
}
|
|
}
|
|
// check replicated count change
|
|
if x, ok := service.Spec.GetMode().(*swarmapi.ServiceSpec_Replicated); ok {
|
|
replicas := x.Replicated.Replicas
|
|
if y, ok := oldService.Spec.GetMode().(*swarmapi.ServiceSpec_Replicated); ok {
|
|
oldReplicas := y.Replicated.Replicas
|
|
if replicas != oldReplicas {
|
|
attributes["replicas.old"] = strconv.FormatUint(oldReplicas, 10)
|
|
attributes["replicas.new"] = strconv.FormatUint(replicas, 10)
|
|
}
|
|
} else {
|
|
// This should not happen.
|
|
log.G(context.TODO()).Errorf("service %s mode changed from %T to %T", service.Spec.Annotations.Name, oldService.Spec.GetMode(), service.Spec.GetMode())
|
|
}
|
|
}
|
|
if service.UpdateStatus != nil {
|
|
if oldService.UpdateStatus == nil {
|
|
attributes["updatestate.new"] = strings.ToLower(service.UpdateStatus.State.String())
|
|
} else if service.UpdateStatus.State != oldService.UpdateStatus.State {
|
|
attributes["updatestate.old"] = strings.ToLower(oldService.UpdateStatus.State.String())
|
|
attributes["updatestate.new"] = strings.ToLower(service.UpdateStatus.State.String())
|
|
}
|
|
}
|
|
}
|
|
daemon.logClusterEvent(action, service.ID, events.ServiceEventType, eventTime, attributes)
|
|
}
|
|
|
|
var clusterEventAction = map[swarmapi.WatchActionKind]events.Action{
|
|
swarmapi.WatchActionKindCreate: events.ActionCreate,
|
|
swarmapi.WatchActionKindUpdate: events.ActionUpdate,
|
|
swarmapi.WatchActionKindRemove: events.ActionRemove,
|
|
}
|
|
|
|
func (daemon *Daemon) logClusterEvent(action swarmapi.WatchActionKind, id string, eventType events.Type, eventTime time.Time, attributes map[string]string) {
|
|
daemon.EventsService.PublishMessage(events.Message{
|
|
Action: clusterEventAction[action],
|
|
Type: eventType,
|
|
Actor: events.Actor{
|
|
ID: id,
|
|
Attributes: attributes,
|
|
},
|
|
Scope: "swarm",
|
|
Time: eventTime.UTC().Unix(),
|
|
TimeNano: eventTime.UTC().UnixNano(),
|
|
})
|
|
}
|
|
|
|
func eventTimestamp(meta swarmapi.Meta, action swarmapi.WatchActionKind) time.Time {
|
|
var eventTime time.Time
|
|
switch action {
|
|
case swarmapi.WatchActionKindCreate:
|
|
eventTime, _ = gogotypes.TimestampFromProto(meta.CreatedAt)
|
|
case swarmapi.WatchActionKindUpdate:
|
|
eventTime, _ = gogotypes.TimestampFromProto(meta.UpdatedAt)
|
|
case swarmapi.WatchActionKindRemove:
|
|
// There is no timestamp from store message for remove operations.
|
|
// Use current time.
|
|
eventTime = time.Now()
|
|
default:
|
|
// TODO(thaJeztah): make switch exhaustive: anything to do for swarmapi.WatchActionKindUnknown or "other" ?
|
|
}
|
|
return eventTime
|
|
}
|