mirror of
https://github.com/moby/moby.git
synced 2026-01-11 10:41:43 +00:00
Adds a per-stats OSType field to allow handle the platform-specific fields. Before this change, the client had to get the OSType field from the server's API response header and copy it to each record. Older daemon versions don't have this field, so the client still needs to handle fallbacks. Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
124 lines
3.1 KiB
Go
124 lines
3.1 KiB
Go
package stats
|
|
|
|
import (
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
containertypes "github.com/moby/moby/api/types/container"
|
|
"github.com/moby/moby/v2/daemon/container"
|
|
"github.com/moby/pubsub"
|
|
)
|
|
|
|
// Collector manages and provides container resource stats
|
|
type Collector struct {
|
|
m sync.Mutex
|
|
cond *sync.Cond
|
|
supervisor supervisor
|
|
interval time.Duration
|
|
publishers map[*container.Container]*pubsub.Publisher
|
|
}
|
|
|
|
// NewCollector creates a stats collector that will poll the supervisor with the specified interval
|
|
func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
|
|
s := &Collector{
|
|
interval: interval,
|
|
supervisor: supervisor,
|
|
publishers: make(map[*container.Container]*pubsub.Publisher),
|
|
}
|
|
s.cond = sync.NewCond(&s.m)
|
|
return s
|
|
}
|
|
|
|
type supervisor interface {
|
|
// GetContainerStats collects all the stats related to a container
|
|
GetContainerStats(container *container.Container) (*containertypes.StatsResponse, error)
|
|
}
|
|
|
|
// Collect registers the container with the collector and adds it to
|
|
// the event loop for collection on the specified interval returning
|
|
// a channel for the subscriber to receive on.
|
|
func (s *Collector) Collect(c *container.Container) chan any {
|
|
s.cond.L.Lock()
|
|
defer s.cond.L.Unlock()
|
|
|
|
publisher, exists := s.publishers[c]
|
|
if !exists {
|
|
publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
|
|
s.publishers[c] = publisher
|
|
}
|
|
|
|
s.cond.Broadcast()
|
|
return publisher.Subscribe()
|
|
}
|
|
|
|
// StopCollection closes the channels for all subscribers and removes
|
|
// the container from metrics collection.
|
|
func (s *Collector) StopCollection(c *container.Container) {
|
|
s.m.Lock()
|
|
if publisher, exists := s.publishers[c]; exists {
|
|
publisher.Close()
|
|
delete(s.publishers, c)
|
|
}
|
|
s.m.Unlock()
|
|
}
|
|
|
|
// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
|
|
func (s *Collector) Unsubscribe(c *container.Container, ch chan any) {
|
|
s.m.Lock()
|
|
publisher := s.publishers[c]
|
|
if publisher != nil {
|
|
publisher.Evict(ch)
|
|
if publisher.Len() == 0 {
|
|
delete(s.publishers, c)
|
|
}
|
|
}
|
|
s.m.Unlock()
|
|
}
|
|
|
|
// Run starts the collectors and will indefinitely collect stats from the supervisor
|
|
func (s *Collector) Run() {
|
|
type publishersPair struct {
|
|
container *container.Container
|
|
publisher *pubsub.Publisher
|
|
}
|
|
// we cannot determine the capacity here.
|
|
// it will grow enough in first iteration
|
|
var pairs []publishersPair
|
|
|
|
for {
|
|
s.cond.L.Lock()
|
|
for len(s.publishers) == 0 {
|
|
s.cond.Wait()
|
|
}
|
|
|
|
// it does not make sense in the first iteration,
|
|
// but saves allocations in further iterations
|
|
pairs = pairs[:0]
|
|
|
|
for ctr, publisher := range s.publishers {
|
|
// copy pointers here to release the lock ASAP
|
|
pairs = append(pairs, publishersPair{
|
|
container: ctr,
|
|
publisher: publisher,
|
|
})
|
|
}
|
|
|
|
s.cond.L.Unlock()
|
|
|
|
for _, pair := range pairs {
|
|
stats, err := s.supervisor.GetContainerStats(pair.container)
|
|
if err != nil {
|
|
stats = &containertypes.StatsResponse{
|
|
ID: pair.container.ID,
|
|
Name: pair.container.Name,
|
|
OSType: runtime.GOOS,
|
|
}
|
|
}
|
|
pair.publisher.Publish(*stats)
|
|
}
|
|
|
|
time.Sleep(s.interval)
|
|
}
|
|
}
|