daemon/metrics: Move out to internal/metrics

Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
This commit is contained in:
Paweł Gronowski
2024-12-23 15:48:07 +01:00
parent 048fece105
commit 51c2689427
16 changed files with 329 additions and 337 deletions

View File

@@ -5,6 +5,7 @@ import (
"errors"
"time"
"github.com/docker/docker/internal/metrics"
"github.com/docker/docker/pkg/archive"
)
@@ -25,6 +26,6 @@ func (daemon *Daemon) ContainerChanges(ctx context.Context, name string) ([]arch
if err != nil {
return nil, err
}
containerActions.WithValues("changes").UpdateSince(start)
metrics.ContainerActions.WithValues("changes").UpdateSince(start)
return c, nil
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/builder/dockerfile"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/internal/metrics"
"github.com/pkg/errors"
)
@@ -186,6 +187,6 @@ func (daemon *Daemon) CreateImageFromContainer(ctx context.Context, name string,
"imageID": id.String(),
"imageRef": imageRef,
})
containerActions.WithValues("commit").UpdateSince(start)
metrics.ContainerActions.WithValues("commit").UpdateSince(start)
return id.String(), nil
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/docker/docker/daemon/config"
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/internal/metrics"
"github.com/docker/docker/internal/multierror"
"github.com/docker/docker/internal/sliceutil"
"github.com/docker/docker/libnetwork"
@@ -260,7 +261,7 @@ func (daemon *Daemon) updateNetwork(cfg *config.Config, ctr *container.Container
return fmt.Errorf("Update network failed: Failure in refresh sandbox %s: %v", sid, err)
}
networkActions.WithValues("update").UpdateSince(start)
metrics.NetworkActions.WithValues("update").UpdateSince(start)
return nil
}
@@ -441,7 +442,7 @@ func (daemon *Daemon) allocateNetwork(ctx context.Context, cfg *config.Config, c
if _, err := ctr.WriteHostConfig(); err != nil {
return err
}
networkActions.WithValues("allocate").UpdateSince(start)
metrics.NetworkActions.WithValues("allocate").UpdateSince(start)
return nil
}
@@ -769,7 +770,7 @@ func (daemon *Daemon) connectToNetwork(ctx context.Context, cfg *config.Config,
ctr.NetworkSettings.Ports = getPortMapInfo(sb)
daemon.LogNetworkEventWithAttributes(n, events.ActionConnect, map[string]string{"container": ctr.ID})
networkActions.WithValues("connect").UpdateSince(start)
metrics.NetworkActions.WithValues("connect").UpdateSince(start)
return nil
}
@@ -969,7 +970,7 @@ func (daemon *Daemon) releaseNetwork(ctx context.Context, ctr *container.Contain
for _, nw := range networks {
daemon.tryDetachContainerFromClusterNetwork(nw, ctr)
}
networkActions.WithValues("release").UpdateSince(start)
metrics.NetworkActions.WithValues("release").UpdateSince(start)
}
func errRemovalContainer(containerID string) error {

View File

@@ -17,6 +17,7 @@ import (
"github.com/docker/docker/daemon/images"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/internal/metrics"
"github.com/docker/docker/internal/multierror"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/runconfig"
@@ -112,7 +113,7 @@ func (daemon *Daemon) containerCreate(ctx context.Context, daemonCfg *configStor
if err != nil {
return containertypes.CreateResponse{Warnings: warnings}, err
}
containerActions.WithValues("create").UpdateSince(start)
metrics.ContainerActions.WithValues("create").UpdateSince(start)
if warnings == nil {
warnings = make([]string, 0) // Create an empty slice to avoid https://github.com/moby/moby/issues/38222
@@ -227,7 +228,7 @@ func (daemon *Daemon) create(ctx context.Context, daemonCfg *config.Config, opts
if err := daemon.register(ctx, ctr); err != nil {
return nil, err
}
stateCtr.set(ctr.ID, "stopped")
metrics.StateCtr.Set(ctr.ID, "stopped")
daemon.LogContainerEvent(ctr, events.ActionCreate)
return ctr, nil
}

View File

@@ -54,6 +54,7 @@ import (
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/internal/metrics"
"github.com/docker/docker/layer"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/libnetwork"
@@ -94,36 +95,35 @@ type configStore struct {
// Daemon holds information about the Docker daemon.
type Daemon struct {
id string
repository string
containers container.Store
containersReplica *container.ViewDB
execCommands *container.ExecStore
imageService ImageService
configStore atomic.Pointer[configStore]
configReload sync.Mutex
statsCollector *stats.Collector
defaultLogConfig containertypes.LogConfig
registryService *registry.Service
EventsService *events.Events
netController *libnetwork.Controller
volumes *volumesservice.VolumesService
root string
sysInfoOnce sync.Once
sysInfo *sysinfo.SysInfo
shutdown bool
idMapping idtools.IdentityMapping
PluginStore *plugin.Store // TODO: remove
pluginManager *plugin.Manager
linkIndex *linkIndex
containerdClient *containerd.Client
containerd libcontainerdtypes.Client
defaultIsolation containertypes.Isolation // Default isolation mode on Windows
clusterProvider cluster.Provider
cluster Cluster
genericResources []swarm.GenericResource
metricsPluginListener net.Listener
ReferenceStore refstore.Store
id string
repository string
containers container.Store
containersReplica *container.ViewDB
execCommands *container.ExecStore
imageService ImageService
configStore atomic.Pointer[configStore]
configReload sync.Mutex
statsCollector *stats.Collector
defaultLogConfig containertypes.LogConfig
registryService *registry.Service
EventsService *events.Events
netController *libnetwork.Controller
volumes *volumesservice.VolumesService
root string
sysInfoOnce sync.Once
sysInfo *sysinfo.SysInfo
shutdown bool
idMapping idtools.IdentityMapping
PluginStore *plugin.Store // TODO: remove
pluginManager *plugin.Manager
linkIndex *linkIndex
containerdClient *containerd.Client
containerd libcontainerdtypes.Client
defaultIsolation containertypes.Isolation // Default isolation mode on Windows
clusterProvider cluster.Provider
cluster Cluster
genericResources []swarm.GenericResource
ReferenceStore refstore.Store
machineMemory uint64
@@ -903,11 +903,9 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
d.registryService = registryService
dlogger.RegisterPluginGetter(d.PluginStore)
metricsSockPath, err := d.listenMetricsSock(&cfgStore.Config)
if err != nil {
if err := metrics.RegisterPlugin(d.PluginStore, filepath.Join(cfgStore.ExecRoot, "metrics.sock")); err != nil {
return nil, err
}
registerMetricsPluginCallback(d.PluginStore, metricsSockPath)
backoffConfig := backoff.DefaultConfig
backoffConfig.MaxDelay = 3 * time.Second
@@ -1194,7 +1192,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
log.G(ctx).Warn(w)
}
engineInfo.WithValues(
metrics.EngineInfo.WithValues(
dockerversion.Version,
dockerversion.GitCommit,
info.Architecture,
@@ -1205,8 +1203,8 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
info.OSVersion,
info.ID,
).Set(1)
engineCpus.Set(float64(info.NCPU))
engineMemory.Set(float64(info.MemTotal))
metrics.EngineCPUs.Set(float64(info.NCPU))
metrics.EngineMemory.Set(float64(info.MemTotal))
log.G(ctx).WithFields(log.Fields{
"version": dockerversion.Version,
@@ -1285,7 +1283,7 @@ func (daemon *Daemon) Shutdown(ctx context.Context) error {
// check if there are any running containers, if none we should do some cleanup
if ls, err := daemon.Containers(ctx, &containertypes.ListOptions{}); len(ls) != 0 || err != nil {
// metrics plugins still need some cleanup
daemon.cleanupMetricsPlugins()
metrics.CleanupPlugin(daemon.PluginStore)
return err
}
}
@@ -1328,7 +1326,7 @@ func (daemon *Daemon) Shutdown(ctx context.Context) error {
daemon.DaemonLeavesCluster()
}
daemon.cleanupMetricsPlugins()
metrics.CleanupPlugin(daemon.PluginStore)
// Shutdown plugins after containers and layerstore. Don't change the order.
daemon.pluginShutdown()

View File

@@ -18,6 +18,7 @@ import (
"github.com/docker/docker/daemon/config"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/internal/containerfs"
"github.com/docker/docker/internal/metrics"
"github.com/opencontainers/selinux/go-selinux"
"github.com/pkg/errors"
)
@@ -54,7 +55,7 @@ func (daemon *Daemon) containerRm(cfg *config.Config, name string, opts *backend
}
err = daemon.cleanupContainer(ctr, *opts)
containerActions.WithValues("delete").UpdateSince(start)
metrics.ContainerActions.WithValues("delete").UpdateSince(start)
return err
}
@@ -183,7 +184,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, config ba
daemon.releaseName(name)
}
container.SetRemoved()
stateCtr.del(container.ID)
metrics.StateCtr.Delete(container.ID)
daemon.LogContainerEvent(container, events.ActionDestroy)
return nil

View File

@@ -15,6 +15,7 @@ import (
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/strslice"
"github.com/docker/docker/container"
"github.com/docker/docker/internal/metrics"
)
const (
@@ -123,7 +124,7 @@ func (p *cmdProbe) run(ctx context.Context, d *Daemon, cntr *container.Container
return nil, err
}
case <-execConfig.Started:
healthCheckStartDuration.UpdateSince(startTime)
metrics.HealthCheckStartDuration.UpdateSince(startTime)
}
if !tm.Stop() {
@@ -290,10 +291,10 @@ func monitor(d *Daemon, c *container.Container, stop chan struct{}, probe probe)
ctx, cancelProbe := context.WithCancel(context.Background())
results := make(chan *containertypes.HealthcheckResult, 1)
go func() {
healthChecksCounter.Inc()
metrics.HealthChecksCounter.Inc()
result, err := probe.run(ctx, d, c)
if err != nil {
healthChecksFailedCounter.Inc()
metrics.HealthChecksFailedCounter.Inc()
log.G(ctx).Warnf("Health check for container %s error: %v", c.ID, err)
results <- &containertypes.HealthcheckResult{
ExitCode: -1,

View File

@@ -22,6 +22,7 @@ import (
"github.com/docker/docker/daemon/internal/filedescriptors"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/internal/metrics"
"github.com/docker/docker/internal/platform"
"github.com/docker/docker/pkg/meminfo"
"github.com/docker/docker/pkg/parsers/kernel"
@@ -45,7 +46,7 @@ func doWithTrace[T any](ctx context.Context, name string, f func() T) T {
// multiple things and is often used for debugging.
// The only case valid early return is when the caller doesn't want the result anymore (ie context cancelled).
func (daemon *Daemon) SystemInfo(ctx context.Context) (*system.Info, error) {
defer gometrics.StartTimer(hostInfoFunctions.WithValues("system_info"))()
defer gometrics.StartTimer(metrics.HostInfoFunctions.WithValues("system_info"))()
sysInfo := daemon.RawSysInfo()
cfg := daemon.config()
@@ -104,7 +105,7 @@ func (daemon *Daemon) SystemInfo(ctx context.Context) (*system.Info, error) {
// multiple things and is often used for debugging.
// The only case valid early return is when the caller doesn't want the result anymore (ie context cancelled).
func (daemon *Daemon) SystemVersion(ctx context.Context) (types.Version, error) {
defer gometrics.StartTimer(hostInfoFunctions.WithValues("system_version"))()
defer gometrics.StartTimer(metrics.HostInfoFunctions.WithValues("system_version"))()
kernelVer := kernelVersion(ctx)
cfg := daemon.config()
@@ -208,7 +209,7 @@ func (daemon *Daemon) fillSecurityOptions(v *system.Info, sysInfo *sysinfo.SysIn
}
func (daemon *Daemon) fillContainerStates(v *system.Info) {
cRunning, cPaused, cStopped := stateCtr.get()
cRunning, cPaused, cStopped := metrics.StateCtr.Get()
v.Containers = cRunning + cPaused + cStopped
v.ContainersPaused = cPaused
v.ContainersRunning = cRunning
@@ -324,7 +325,7 @@ func operatingSystem(ctx context.Context) (operatingSystem string) {
ctx, span := tracing.StartSpan(ctx, "operatingSystem")
defer span.End()
defer gometrics.StartTimer(hostInfoFunctions.WithValues("operating_system"))()
defer gometrics.StartTimer(metrics.HostInfoFunctions.WithValues("operating_system"))()
if s, err := operatingsystem.GetOperatingSystem(); err != nil {
log.G(ctx).WithError(err).Warn("Could not get operating system name")
@@ -345,7 +346,7 @@ func osVersion(ctx context.Context) (version string) {
ctx, span := tracing.StartSpan(ctx, "osVersion")
defer span.End()
defer gometrics.StartTimer(hostInfoFunctions.WithValues("os_version"))()
defer gometrics.StartTimer(metrics.HostInfoFunctions.WithValues("os_version"))()
version, err := operatingsystem.GetOperatingSystemVersion()
if err != nil {

View File

@@ -1,191 +0,0 @@
package daemon // import "github.com/docker/docker/daemon"
import (
"context"
"sync"
"github.com/containerd/log"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/docker/pkg/plugins"
gometrics "github.com/docker/go-metrics"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
const metricsPluginType = "MetricsCollector"
var (
metricsNS = gometrics.NewNamespace("engine", "daemon", nil)
containerActions = metricsNS.NewLabeledTimer("container_actions", "The number of seconds it takes to process each container action", "action")
networkActions = metricsNS.NewLabeledTimer("network_actions", "The number of seconds it takes to process each network action", "action")
hostInfoFunctions = metricsNS.NewLabeledTimer("host_info_functions", "The number of seconds it takes to call functions gathering info about the host", "function")
engineInfo = metricsNS.NewLabeledGauge("engine", "The information related to the engine and the OS it is running on", gometrics.Unit("info"),
"version",
"commit",
"architecture",
"graphdriver",
"kernel",
"os",
"os_type",
"os_version",
"daemon_id", // ID is a randomly generated unique identifier (e.g. UUID4)
)
engineCpus = metricsNS.NewGauge("engine_cpus", "The number of cpus that the host system of the engine has", gometrics.Unit("cpus"))
engineMemory = metricsNS.NewGauge("engine_memory", "The number of bytes of memory that the host system of the engine has", gometrics.Bytes)
healthChecksCounter = metricsNS.NewCounter("health_checks", "The total number of health checks")
healthChecksFailedCounter = metricsNS.NewCounter("health_checks_failed", "The total number of failed health checks")
healthCheckStartDuration = metricsNS.NewTimer("health_check_start_duration", "The number of seconds it takes to prepare to run health checks")
stateCtr = newStateCounter(metricsNS, metricsNS.NewDesc("container_states", "The count of containers in various states", gometrics.Unit("containers"), "state"))
)
func init() {
for _, a := range []string{
"start",
"changes",
"commit",
"create",
"delete",
} {
containerActions.WithValues(a).Update(0)
}
gometrics.Register(metricsNS)
}
type stateCounter struct {
mu sync.RWMutex
states map[string]string
desc *prometheus.Desc
}
func newStateCounter(ns *gometrics.Namespace, desc *prometheus.Desc) *stateCounter {
c := &stateCounter{
states: make(map[string]string),
desc: desc,
}
ns.Add(c)
return c
}
func (ctr *stateCounter) get() (running int, paused int, stopped int) {
ctr.mu.RLock()
defer ctr.mu.RUnlock()
states := map[string]int{
"running": 0,
"paused": 0,
"stopped": 0,
}
for _, state := range ctr.states {
states[state]++
}
return states["running"], states["paused"], states["stopped"]
}
func (ctr *stateCounter) set(id, label string) {
ctr.mu.Lock()
ctr.states[id] = label
ctr.mu.Unlock()
}
func (ctr *stateCounter) del(id string) {
ctr.mu.Lock()
delete(ctr.states, id)
ctr.mu.Unlock()
}
func (ctr *stateCounter) Describe(ch chan<- *prometheus.Desc) {
ch <- ctr.desc
}
func (ctr *stateCounter) Collect(ch chan<- prometheus.Metric) {
running, paused, stopped := ctr.get()
ch <- prometheus.MustNewConstMetric(ctr.desc, prometheus.GaugeValue, float64(running), "running")
ch <- prometheus.MustNewConstMetric(ctr.desc, prometheus.GaugeValue, float64(paused), "paused")
ch <- prometheus.MustNewConstMetric(ctr.desc, prometheus.GaugeValue, float64(stopped), "stopped")
}
func (daemon *Daemon) cleanupMetricsPlugins() {
ls := daemon.PluginStore.GetAllManagedPluginsByCap(metricsPluginType)
var wg sync.WaitGroup
wg.Add(len(ls))
for _, plugin := range ls {
p := plugin
go func() {
defer wg.Done()
adapter, err := makePluginAdapter(p)
if err != nil {
log.G(context.TODO()).WithError(err).WithField("plugin", p.Name()).Error("Error creating metrics plugin adapter")
return
}
if err := adapter.StopMetrics(); err != nil {
log.G(context.TODO()).WithError(err).WithField("plugin", p.Name()).Error("Error stopping plugin metrics collection")
}
}()
}
wg.Wait()
if daemon.metricsPluginListener != nil {
daemon.metricsPluginListener.Close()
}
}
type metricsPlugin interface {
StartMetrics() error
StopMetrics() error
}
func makePluginAdapter(p plugingetter.CompatPlugin) (metricsPlugin, error) {
if pc, ok := p.(plugingetter.PluginWithV1Client); ok {
return &metricsPluginAdapter{pc.Client(), p.Name()}, nil
}
pa, ok := p.(plugingetter.PluginAddr)
if !ok {
return nil, errdefs.System(errors.Errorf("got unknown plugin type %T", p))
}
if pa.Protocol() != plugins.ProtocolSchemeHTTPV1 {
return nil, errors.Errorf("plugin protocol not supported: %s", pa.Protocol())
}
addr := pa.Addr()
client, err := plugins.NewClientWithTimeout(addr.Network()+"://"+addr.String(), nil, pa.Timeout())
if err != nil {
return nil, errors.Wrap(err, "error creating metrics plugin client")
}
return &metricsPluginAdapter{client, p.Name()}, nil
}
type metricsPluginAdapter struct {
c *plugins.Client
name string
}
func (a *metricsPluginAdapter) StartMetrics() error {
type metricsPluginResponse struct {
Err string
}
var res metricsPluginResponse
if err := a.c.Call(metricsPluginType+".StartMetrics", nil, &res); err != nil {
return errors.Wrap(err, "could not start metrics plugin")
}
if res.Err != "" {
return errors.New(res.Err)
}
return nil
}
func (a *metricsPluginAdapter) StopMetrics() error {
if err := a.c.Call(metricsPluginType+".StopMetrics", nil, nil); err != nil {
return errors.Wrap(err, "error stopping metrics collector")
}
return nil
}

View File

@@ -1,71 +0,0 @@
//go:build !windows
package daemon // import "github.com/docker/docker/daemon"
import (
"context"
"net"
"net/http"
"path/filepath"
"strings"
"time"
"github.com/containerd/log"
"github.com/docker/docker/daemon/config"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/docker/plugin"
gometrics "github.com/docker/go-metrics"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
func (daemon *Daemon) listenMetricsSock(cfg *config.Config) (string, error) {
path := filepath.Join(cfg.ExecRoot, "metrics.sock")
unix.Unlink(path)
l, err := net.Listen("unix", path)
if err != nil {
return "", errors.Wrap(err, "error setting up metrics plugin listener")
}
mux := http.NewServeMux()
mux.Handle("/metrics", gometrics.Handler())
go func() {
log.G(context.TODO()).Debugf("metrics API listening on %s", l.Addr())
srv := &http.Server{
Handler: mux,
ReadHeaderTimeout: 5 * time.Minute, // "G112: Potential Slowloris Attack (gosec)"; not a real concern for our use, so setting a long timeout.
}
if err := srv.Serve(l); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
log.G(context.TODO()).WithError(err).Error("error serving metrics API")
}
}()
daemon.metricsPluginListener = l
return path, nil
}
func registerMetricsPluginCallback(store *plugin.Store, sockPath string) {
store.RegisterRuntimeOpt(metricsPluginType, func(s *specs.Spec) {
f := plugin.WithSpecMounts([]specs.Mount{
{Type: "bind", Source: sockPath, Destination: "/run/docker/metrics.sock", Options: []string{"bind", "ro"}},
})
f(s)
})
store.Handle(metricsPluginType, func(name string, client *plugins.Client) {
// Use lookup since nothing in the system can really reference it, no need
// to protect against removal
p, err := store.Get(name, metricsPluginType, plugingetter.Lookup)
if err != nil {
return
}
adapter, err := makePluginAdapter(p)
if err != nil {
log.G(context.TODO()).WithError(err).WithField("plugin", p.Name()).Error("Error creating plugin adapter")
}
if err := adapter.StartMetrics(); err != nil {
log.G(context.TODO()).WithError(err).WithField("plugin", p.Name()).Error("Error starting metrics collector plugin")
}
})
}

View File

@@ -1,15 +0,0 @@
//go:build windows
package daemon // import "github.com/docker/docker/daemon"
import (
"github.com/docker/docker/daemon/config"
"github.com/docker/docker/pkg/plugingetter"
)
func registerMetricsPluginCallback(getter plugingetter.PluginGetter, sockPath string) {
}
func (daemon *Daemon) listenMetricsSock(*config.Config) (string, error) {
return "", nil
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/config"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/internal/metrics"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/restartmanager"
"github.com/pkg/errors"
@@ -19,11 +20,11 @@ import (
func (daemon *Daemon) setStateCounter(c *container.Container) {
switch c.StateString() {
case "paused":
stateCtr.set(c.ID, "paused")
metrics.StateCtr.Set(c.ID, "paused")
case "running":
stateCtr.set(c.ID, "running")
metrics.StateCtr.Set(c.ID, "running")
default:
stateCtr.set(c.ID, "stopped")
metrics.StateCtr.Set(c.ID, "stopped")
}
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/docker/docker/container"
mobyc8dstore "github.com/docker/docker/daemon/containerd"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/internal/metrics"
"github.com/docker/docker/libcontainerd"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
@@ -253,7 +254,7 @@ func (daemon *Daemon) containerStart(ctx context.Context, daemonCfg *configStore
}
daemon.LogContainerEvent(container, events.ActionStart)
containerActions.WithValues("start").UpdateSince(start)
metrics.ContainerActions.WithValues("start").UpdateSince(start)
return nil
}

119
internal/metrics/metrics.go Normal file
View File

@@ -0,0 +1,119 @@
package metrics
import (
"sync"
gometrics "github.com/docker/go-metrics"
"github.com/prometheus/client_golang/prometheus"
)
var (
metricsNS = gometrics.NewNamespace("engine", "daemon", nil)
// ContainerActions tracks the time taken to process container operations
ContainerActions = metricsNS.NewLabeledTimer("container_actions", "The number of seconds it takes to process each container action", "action")
// NetworkActions tracks the time taken to process network operations
NetworkActions = metricsNS.NewLabeledTimer("network_actions", "The number of seconds it takes to process each network action", "action")
// HostInfoFunctions tracks the time taken to gather host information
HostInfoFunctions = metricsNS.NewLabeledTimer("host_info_functions", "The number of seconds it takes to call functions gathering info about the host", "function")
// EngineInfo provides information about the engine and its environment
EngineInfo = metricsNS.NewLabeledGauge("engine", "The information related to the engine and the OS it is running on", gometrics.Unit("info"),
"version",
"commit",
"architecture",
"graphdriver",
"kernel",
"os",
"os_type",
"os_version",
"daemon_id",
)
// EngineCPUs tracks the number of CPUs available to the engine
EngineCPUs = metricsNS.NewGauge("engine_cpus", "The number of cpus that the host system of the engine has", gometrics.Unit("cpus"))
// EngineMemory tracks the amount of memory available to the engine
EngineMemory = metricsNS.NewGauge("engine_memory", "The number of bytes of memory that the host system of the engine has", gometrics.Bytes)
// HealthChecksCounter tracks the total number of health checks
HealthChecksCounter = metricsNS.NewCounter("health_checks", "The total number of health checks")
// HealthChecksFailedCounter tracks the number of failed health checks
HealthChecksFailedCounter = metricsNS.NewCounter("health_checks_failed", "The total number of failed health checks")
// HealthCheckStartDuration tracks the time taken to prepare health checks
HealthCheckStartDuration = metricsNS.NewTimer("health_check_start_duration", "The number of seconds it takes to prepare to run health checks")
// StateCtr tracks container states
StateCtr = newStateCounter(metricsNS, metricsNS.NewDesc("container_states", "The count of containers in various states", gometrics.Unit("containers"), "state"))
)
func init() {
for _, a := range []string{
"start",
"changes",
"commit",
"create",
"delete",
} {
ContainerActions.WithValues(a).Update(0)
}
gometrics.Register(metricsNS)
}
// StateCounter tracks container states
type StateCounter struct {
mu sync.RWMutex
states map[string]string
desc *prometheus.Desc
}
func newStateCounter(ns *gometrics.Namespace, desc *prometheus.Desc) *StateCounter {
c := &StateCounter{
states: make(map[string]string),
desc: desc,
}
ns.Add(c)
return c
}
// Get returns the count of containers in running, paused, and stopped states
func (ctr *StateCounter) Get() (running int, paused int, stopped int) {
ctr.mu.RLock()
defer ctr.mu.RUnlock()
states := map[string]int{
"running": 0,
"paused": 0,
"stopped": 0,
}
for _, state := range ctr.states {
states[state]++
}
return states["running"], states["paused"], states["stopped"]
}
// Set updates the state for a container
func (ctr *StateCounter) Set(id, label string) {
ctr.mu.Lock()
ctr.states[id] = label
ctr.mu.Unlock()
}
// Delete removes a container's state
func (ctr *StateCounter) Delete(id string) {
ctr.mu.Lock()
delete(ctr.states, id)
ctr.mu.Unlock()
}
// Describe implements prometheus.Collector
func (ctr *StateCounter) Describe(ch chan<- *prometheus.Desc) {
ch <- ctr.desc
}
// Collect implements prometheus.Collector
func (ctr *StateCounter) Collect(ch chan<- prometheus.Metric) {
running, paused, stopped := ctr.Get()
ch <- prometheus.MustNewConstMetric(ctr.desc, prometheus.GaugeValue, float64(running), "running")
ch <- prometheus.MustNewConstMetric(ctr.desc, prometheus.GaugeValue, float64(paused), "paused")
ch <- prometheus.MustNewConstMetric(ctr.desc, prometheus.GaugeValue, float64(stopped), "stopped")
}

View File

@@ -0,0 +1,132 @@
//go:build !windows
package metrics
import (
"context"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/containerd/log"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/docker/plugin"
gometrics "github.com/docker/go-metrics"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
const pluginType = "MetricsCollector"
// Plugin represents a metrics collector plugin
type Plugin interface {
StartMetrics() error
StopMetrics() error
}
type metricsPluginAdapter struct {
client *plugins.Client
}
func (a *metricsPluginAdapter) StartMetrics() error {
return a.client.Call("/MetricsCollector.StartMetrics", nil, nil)
}
func (a *metricsPluginAdapter) StopMetrics() error {
return a.client.Call("/MetricsCollector.StopMetrics", nil, nil)
}
func makePluginAdapter(p plugingetter.CompatPlugin) (Plugin, error) {
adapted := p.Client()
return &metricsPluginAdapter{adapted}, nil
}
// RegisterPlugin starts the metrics server listener and registers the metrics plugin
// callback with the plugin store
func RegisterPlugin(store *plugin.Store, path string) error {
if err := listen(path); err != nil {
return err
}
store.RegisterRuntimeOpt(pluginType, func(s *specs.Spec) {
f := plugin.WithSpecMounts([]specs.Mount{
{Type: "bind", Source: path, Destination: "/run/docker/metrics.sock", Options: []string{"bind", "ro"}},
})
f(s)
})
store.Handle(pluginType, func(name string, client *plugins.Client) {
// Use lookup since nothing in the system can really reference it, no need
// to protect against removal
p, err := store.Get(name, pluginType, plugingetter.Lookup)
if err != nil {
return
}
adapter, err := makePluginAdapter(p)
if err != nil {
log.G(context.TODO()).WithError(err).WithField("plugin", p.Name()).Error("Error creating plugin adapter")
}
if err := adapter.StartMetrics(); err != nil {
log.G(context.TODO()).WithError(err).WithField("plugin", p.Name()).Error("Error starting metrics collector plugin")
}
})
return nil
}
// CleanupPlugin stops metrics collection for all plugins
func CleanupPlugin(store plugingetter.PluginGetter) {
ls := store.GetAllManagedPluginsByCap(pluginType)
var wg sync.WaitGroup
wg.Add(len(ls))
for _, plugin := range ls {
p := plugin
go func() {
defer wg.Done()
adapter, err := makePluginAdapter(p)
if err != nil {
log.G(context.TODO()).WithError(err).WithField("plugin", p.Name()).Error("Error creating metrics plugin adapter")
return
}
if err := adapter.StopMetrics(); err != nil {
log.G(context.TODO()).WithError(err).WithField("plugin", p.Name()).Error("Error stopping plugin metrics collection")
}
}()
}
wg.Wait()
if listener != nil {
_ = listener.Close()
}
}
var listener net.Listener
func listen(path string) error {
_ = os.Remove(path)
l, err := net.Listen("unix", path)
if err != nil {
return errors.Wrap(err, "error setting up metrics plugin listener")
}
mux := http.NewServeMux()
mux.Handle("/metrics", gometrics.Handler())
go func() {
log.G(context.TODO()).Debugf("metrics API listening on %s", l.Addr())
srv := &http.Server{
Handler: mux,
ReadHeaderTimeout: 5 * time.Minute, // "G112: Potential Slowloris Attack (gosec)"; not a real concern for our use, so setting a long timeout.
}
if err := srv.Serve(l); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
log.G(context.TODO()).WithError(err).Error("error serving metrics API")
}
}()
listener = l
return nil
}

View File

@@ -0,0 +1,11 @@
//go:build windows
package metrics
import (
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/docker/plugin"
)
func RegisterPlugin(*plugin.Store, string) error { return nil }
func CleanupPlugin(plugingetter.PluginGetter) {}