diff --git a/daemon/attach.go b/daemon/attach.go index f233cdca86..4313a7f21d 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -141,7 +141,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach if !ok { return logger.ErrReadLogsNotSupported{} } - logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1}) + logs := cLog.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1}) defer logs.ConsumerGone() LogLoop: diff --git a/daemon/logger/adapter.go b/daemon/logger/adapter.go index 95ed5a859e..bd5cf5f226 100644 --- a/daemon/logger/adapter.go +++ b/daemon/logger/adapter.go @@ -87,7 +87,7 @@ type pluginAdapterWithRead struct { *pluginAdapter } -func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { +func (a *pluginAdapterWithRead) ReadLogs(ctx context.Context, config ReadConfig) *LogWatcher { watcher := NewLogWatcher() go func() { @@ -101,6 +101,10 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { dec := logdriver.NewLogEntryDecoder(stream) for { + if ctx.Err() != nil { + return + } + var buf logdriver.LogEntry if err := dec.Decode(&buf); err != nil { if err == io.EOF { @@ -127,6 +131,8 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { // send the message unless the consumer is gone select { case watcher.Msg <- msg: + case <-ctx.Done(): + return case <-watcher.WatchConsumerGone(): return } diff --git a/daemon/logger/adapter_test.go b/daemon/logger/adapter_test.go index 0c8f98c018..5c89f350d8 100644 --- a/daemon/logger/adapter_test.go +++ b/daemon/logger/adapter_test.go @@ -1,6 +1,7 @@ package logger // import "github.com/docker/docker/daemon/logger" import ( + "context" "encoding/binary" "io" "sync" @@ -154,7 +155,7 @@ func TestAdapterReadLogs(t *testing.T) { lr, ok := l.(LogReader) assert.Check(t, ok, "Logger does not implement LogReader") - lw := lr.ReadLogs(ReadConfig{}) + lw := lr.ReadLogs(context.TODO(), ReadConfig{}) for _, x := range testMsg { select { @@ -173,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) { } lw.ConsumerGone() - lw = lr.ReadLogs(ReadConfig{Follow: true}) + lw = lr.ReadLogs(context.TODO(), ReadConfig{Follow: true}) for _, x := range testMsg { select { case msg := <-lw.Msg: diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index cc79a72a51..1b9fda5995 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -20,6 +20,8 @@ const ( waitInterval = 250 * time.Millisecond ) +var _ logger.LogReader = (*journald)(nil) + // Fields which we know are not user-provided attribute fields. var wellKnownFields = map[string]bool{ "MESSAGE": true, @@ -190,11 +192,20 @@ func (r *reader) initialSeekTail() (bool, error) { // wait blocks until the journal has new data to read, the reader's drain // deadline is exceeded, or the log reading consumer is gone. -func (r *reader) wait() (bool, error) { +func (r *reader) wait(ctx context.Context) (bool, error) { + deadline := r.drainDeadline + if d, ok := ctx.Deadline(); ok && d.Before(deadline) { + deadline = d + } + for { + if err := ctx.Err(); err != nil { + return false, err + } + dur := waitInterval - if !r.drainDeadline.IsZero() { - dur = time.Until(r.drainDeadline) + if !deadline.IsZero() { + dur = time.Until(deadline) if dur < 0 { // Container is gone but we haven't found the end of the // logs before the deadline. Maybe it was dropped by @@ -213,6 +224,8 @@ func (r *reader) wait() (bool, error) { select { case <-r.logWatcher.WatchConsumerGone(): return false, nil + case <-ctx.Done(): + return false, ctx.Err() case <-r.s.closed: // Container is gone; don't wait indefinitely for journal entries that will never arrive. if r.maxOrdinal >= r.s.ordinal.Load() { @@ -228,12 +241,15 @@ func (r *reader) wait() (bool, error) { // nextWait blocks until there is a new journal entry to read, and advances the // journal read pointer to it. -func (r *reader) nextWait() (bool, error) { +func (r *reader) nextWait(ctx context.Context) (bool, error) { for { + if err := ctx.Err(); err != nil { + return false, err + } if ok, err := r.j.Next(); err != nil || ok { return ok, err } - if ok, err := r.wait(); err != nil || !ok { + if ok, err := r.wait(ctx); err != nil || !ok { return false, err } } @@ -247,7 +263,7 @@ func (r *reader) nextWait() (bool, error) { // - the watch consumer is gone, or // - (if until is nonzero) a log entry is read which has a timestamp after // until -func (r *reader) drainJournal() (bool, error) { +func (r *reader) drainJournal(ctx context.Context) (bool, error) { for i := 0; ; i++ { // Read the entry's timestamp. timestamp, err := r.j.Realtime() @@ -297,6 +313,8 @@ func (r *reader) drainJournal() (bool, error) { select { case <-r.logWatcher.WatchConsumerGone(): return false, nil + case <-ctx.Done(): + return false, ctx.Err() case r.logWatcher.Msg <- msg: } } @@ -306,21 +324,25 @@ func (r *reader) drainJournal() (bool, error) { if i != 0 && i%1024 == 0 { if _, err := r.j.Process(); err != nil { // log a warning but ignore it for now - log.G(context.TODO()).WithField("container", r.s.vars[fieldContainerIDFull]). + log.G(ctx).WithField("container", r.s.vars[fieldContainerIDFull]). WithField("error", err). Warn("journald: error processing journal") } } + if err := ctx.Err(); err != nil { + return false, err + } + if ok, err := r.j.Next(); err != nil || !ok { return true, err } } } -func (r *reader) readJournal() error { +func (r *reader) readJournal(ctx context.Context) error { caughtUp := r.s.ordinal.Load() - if more, err := r.drainJournal(); err != nil || !more { + if more, err := r.drainJournal(ctx); err != nil || !more { return err } @@ -343,10 +365,10 @@ func (r *reader) readJournal() error { default: } - if more, err := r.nextWait(); err != nil || !more { + if more, err := r.nextWait(ctx); err != nil || !more { return err } - if more, err := r.drainJournal(); err != nil || !more { + if more, err := r.drainJournal(ctx); err != nil || !more { return err } if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp { @@ -355,7 +377,7 @@ func (r *reader) readJournal() error { } } -func (r *reader) readLogs() { +func (r *reader) readLogs(ctx context.Context) { defer close(r.logWatcher.Msg) // Make sure the ready channel is closed in the event of an early @@ -425,7 +447,7 @@ func (r *reader) readLogs() { // which case the position will be unaffected by subsequent logging, or // the read pointer is in the conceptual position corresponding to the // first journal entry to send once it is logged in the future. - if more, err := r.nextWait(); err != nil || !more { + if more, err := r.nextWait(ctx); err != nil || !more { if err != nil { r.logWatcher.Err <- err } @@ -433,7 +455,7 @@ func (r *reader) readLogs() { } } - if err := r.readJournal(); err != nil { + if err := r.readJournal(ctx); err != nil { r.logWatcher.Err <- err return } @@ -447,14 +469,14 @@ func (r *reader) signalReady() { } } -func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { +func (s *journald) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { r := &reader{ s: s, logWatcher: logger.NewLogWatcher(), config: config, ready: make(chan struct{}), } - go r.readLogs() + go r.readLogs(ctx) // Block until the reader is in position to read from the current config // location to prevent race conditions in tests. <-r.ready diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index bea8ceedb3..6627074fe2 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -12,10 +12,12 @@ import ( "github.com/docker/docker/pkg/tailfile" ) +var _ logger.LogReader = (*JSONFileLogger)(nil) + // ReadLogs implements the logger's LogReader interface for the logs // created by this driver. -func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - return l.writer.ReadLogs(config) +func (l *JSONFileLogger) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { + return l.writer.ReadLogs(ctx, config) } func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { @@ -79,6 +81,6 @@ func decodeFunc(rdr io.Reader) loggerutils.Decoder { } } -func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { +func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (loggerutils.SizeReaderAt, int, error) { return tailfile.NewTailReader(ctx, r, req) } diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index dd56be0ff3..122617fec0 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -3,6 +3,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo import ( "bufio" "bytes" + "context" "fmt" "io" "os" @@ -62,7 +63,7 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) { } }() - lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true}) + lw := jsonlogger.(*JSONFileLogger).ReadLogs(context.TODO(), logger.ReadConfig{Follow: true}) for { select { case _, ok := <-lw.Msg: diff --git a/daemon/logger/local/read.go b/daemon/logger/local/read.go index cb5f9f0cd3..14771a52db 100644 --- a/daemon/logger/local/read.go +++ b/daemon/logger/local/read.go @@ -18,11 +18,11 @@ import ( // logger.defaultBufSize caps the size of Line field. const maxMsgLen int = 1e6 // 1MB. -func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - return d.logfile.ReadLogs(config) +func (d *driver) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { + return d.logfile.ReadLogs(ctx, config) } -func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { +func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (loggerutils.SizeReaderAt, int, error) { size := r.Size() if req < 0 { return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req)) diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index d3e9da1053..868ac8b168 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -8,6 +8,7 @@ package logger // import "github.com/docker/docker/daemon/logger" import ( + "context" "sync" "time" @@ -88,7 +89,7 @@ type ReadConfig struct { // LogReader is the interface for reading log messages for loggers that support reading. type LogReader interface { // ReadLogs reads logs from underlying logging backend. - ReadLogs(ReadConfig) *LogWatcher + ReadLogs(context.Context, ReadConfig) *LogWatcher } // LogWatcher is used when consuming logs read from the LogReader interface. diff --git a/daemon/logger/loggertest/logreader.go b/daemon/logger/loggertest/logreader.go index 0573a6add4..d1769221b7 100644 --- a/daemon/logger/loggertest/logreader.go +++ b/daemon/logger/loggertest/logreader.go @@ -1,6 +1,7 @@ package loggertest // import "github.com/docker/docker/daemon/logger/loggertest" import ( + "context" "fmt" "runtime" "strings" @@ -93,63 +94,63 @@ func (tr Reader) testTail(t *testing.T, live bool) { t.Run("Exact", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected, compareLog) }) t.Run("LessThanAvailable", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: 2}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: 2}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[len(mm)-2:], compareLog) }) t.Run("MoreThanAvailable", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: 100}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: 100}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected, compareLog) }) t.Run("All", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: -1}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected, compareLog) }) t.Run("Since", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog) }) t.Run("MoreThanSince", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog) }) t.Run("LessThanSince", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[2:], compareLog) }) t.Run("Until", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[:2], compareLog) }) t.Run("SinceAndUntil", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[1:2], compareLog) }) @@ -182,7 +183,7 @@ func (tr Reader) testTailEmptyLogs(t *testing.T, live bool) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), ([]*logger.Message)(nil), cmpopts.EquateEmpty()) }) @@ -204,7 +205,7 @@ func (tr Reader) TestFollow(t *testing.T) { ContainerID: fmt.Sprintf("followstart%d", i), ContainerName: fmt.Sprintf("logloglog%d", i), })(t) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: tail, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: tail, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -232,7 +233,7 @@ func (tr Reader) TestFollow(t *testing.T) { mm := makeTestMessages() expected := logMessages(t, l, mm[0:1]) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -257,7 +258,7 @@ func (tr Reader) TestFollow(t *testing.T) { mm := makeTestMessages() - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -282,7 +283,7 @@ func (tr Reader) TestFollow(t *testing.T) { mm := makeTestMessages() - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -307,7 +308,7 @@ func (tr Reader) TestFollow(t *testing.T) { mm := makeTestMessages() - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -334,7 +335,7 @@ func (tr Reader) TestFollow(t *testing.T) { logMessages(t, l, mm[0:2]) syncLogger(t, l) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 0, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: 0, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -361,7 +362,7 @@ func (tr Reader) TestFollow(t *testing.T) { expected := logMessages(t, l, mm[0:2])[1:] syncLogger(t, l) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 1, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: 1, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -390,7 +391,7 @@ func (tr Reader) TestFollow(t *testing.T) { assert.NilError(t, l.Close()) l = factory(t) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -430,7 +431,7 @@ func (tr Reader) TestConcurrent(t *testing.T) { } // Follow all logs - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Follow: true, Tail: -1}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Follow: true, Tail: -1}) defer lw.ConsumerGone() // Log concurrently from two sources and close log diff --git a/daemon/logger/loggerutils/cache/local_cache.go b/daemon/logger/loggerutils/cache/local_cache.go index d5adfd4ffa..3466c1e79f 100644 --- a/daemon/logger/loggerutils/cache/local_cache.go +++ b/daemon/logger/loggerutils/cache/local_cache.go @@ -24,6 +24,8 @@ var builtInCacheLogOpts = map[string]bool{ cacheDisabledKey: true, } +var _ logger.LogReader = (*loggerWithCache)(nil) + // WithLocalCache wraps the passed in logger with a logger caches all writes locally // in addition to writing to the passed in logger. func WithLocalCache(l logger.Logger, info logger.Info) (logger.Logger, error) { @@ -85,8 +87,8 @@ func (l *loggerWithCache) Name() string { return l.l.Name() } -func (l *loggerWithCache) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - return l.cache.(logger.LogReader).ReadLogs(config) +func (l *loggerWithCache) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { + return l.cache.(logger.LogReader).ReadLogs(ctx, config) } func (l *loggerWithCache) Close() error { diff --git a/daemon/logger/loggerutils/follow.go b/daemon/logger/loggerutils/follow.go index 6131bcea7c..7a6b8e50f0 100644 --- a/daemon/logger/loggerutils/follow.go +++ b/daemon/logger/loggerutils/follow.go @@ -22,8 +22,8 @@ type follow struct { } // Do follows the log file as it is written, starting from f at read. -func (fl *follow) Do(f *os.File, read logPos) { - fl.log = log.G(context.TODO()).WithFields(log.Fields{ +func (fl *follow) Do(ctx context.Context, f *os.File, read logPos) { + fl.log = log.G(ctx).WithFields(log.Fields{ "module": "logger", "file": f.Name(), }) @@ -38,7 +38,7 @@ func (fl *follow) Do(f *os.File, read logPos) { }() for { - wrote, ok := fl.nextPos(read) + wrote, ok := fl.nextPos(ctx, read) if !ok { return } @@ -49,7 +49,7 @@ func (fl *follow) Do(f *os.File, read logPos) { fl.Watcher.Err <- err return } - if !fl.forward(f) { + if !fl.forward(ctx, f) { return } @@ -91,7 +91,7 @@ func (fl *follow) Do(f *os.File, read logPos) { read.size = 0 } - if !fl.forward(io.NewSectionReader(f, read.size, wrote.size-read.size)) { + if !fl.forward(ctx, io.NewSectionReader(f, read.size, wrote.size-read.size)) { return } read = wrote @@ -100,9 +100,11 @@ func (fl *follow) Do(f *os.File, read logPos) { // nextPos waits until the write position of the LogFile being followed has // advanced from current and returns the new position. -func (fl *follow) nextPos(current logPos) (next logPos, ok bool) { +func (fl *follow) nextPos(ctx context.Context, current logPos) (next logPos, ok bool) { var st logReadState select { + case <-ctx.Done(): + return current, false case <-fl.Watcher.WatchConsumerGone(): return current, false case st = <-fl.LogFile.read: @@ -135,7 +137,7 @@ func (fl *follow) nextPos(current logPos) (next logPos, ok bool) { // forward decodes log messages from r and forwards them to the log watcher. // // The return value, cont, signals whether following should continue. -func (fl *follow) forward(r io.Reader) (cont bool) { +func (fl *follow) forward(ctx context.Context, r io.Reader) (cont bool) { fl.Decoder.Reset(r) - return fl.Forwarder.Do(fl.Watcher, fl.Decoder) + return fl.Forwarder.Do(ctx, fl.Watcher, fl.Decoder.Decode) } diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 61490c8d1a..73e3742482 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -9,14 +9,18 @@ import ( "io/fs" "math" "os" + "slices" "strconv" "sync" "time" + "github.com/containerd/containerd/tracing" "github.com/containerd/log" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/pkg/pools" "github.com/pkg/errors" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // rotateFileMetadata is a metadata of the gzip header of the compressed log file @@ -107,16 +111,11 @@ type SizeReaderAt interface { Size() int64 } -type readAtCloser interface { - io.ReaderAt - io.Closer -} - // GetTailReaderFunc is used to truncate a reader to only read as much as is required // in order to get the passed in number of log lines. // It returns the sectioned reader, the number of lines that the section reader // contains, and any error that occurs. -type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error) +type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr SizeReaderAt, nLines int, err error) // NewLogFile creates new LogFile func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) { @@ -377,7 +376,12 @@ func (w *LogFile) Close() error { // ReadLogs decodes entries from log files. // // It is the caller's responsibility to call ConsumerGone on the LogWatcher. -func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { +func (w *LogFile) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { + ctx, span := tracing.StartSpan(ctx, "logger.LogFile.ReadLogs") + defer span.End() + + span.SetAttributes(tracing.Attribute("config", config)) + watcher := logger.NewLogWatcher() // Lock out filesystem operations so that we can capture the read // position and atomically open the corresponding log file, without the @@ -389,19 +393,104 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { st := <-w.read pos := st.pos w.read <- st - go w.readLogsLocked(pos, config, watcher) + go w.readLogsLocked(ctx, pos, config, watcher) return watcher } +// tailFiles must be called with w.fsopMu locked for reads. +// w.fsopMu.RUnlock() is called before returning. +func (w *LogFile) tailFiles(ctx context.Context, config logger.ReadConfig, watcher *logger.LogWatcher, current SizeReaderAt, dec Decoder, fwd *forwarder) (cont bool) { + if config.Tail == 0 { + w.fsopMu.RUnlock() + return true + } + + ctx, span := tracing.StartSpan(ctx, "logger.Logfile.TailLogs") + defer func() { + span.SetAttributes(attribute.Bool("continue", cont)) + span.End() + }() + + files, err := w.openRotatedFiles(ctx, config) + w.fsopMu.RUnlock() + + if err != nil { + // TODO: Should we allow this to continue (as in set `cont=true`) and not error out the log stream? + err = errors.Wrap(err, "error opening rotated log files") + span.SetStatus(err) + watcher.Err <- err + return false + } + + if current.Size() > 0 { + files = append(files, &sizeReaderAtOpener{current, "current"}) + } + + return tailFiles(ctx, files, watcher, dec, w.getTailReader, config.Tail, fwd) +} + +type sizeReaderAtOpener struct { + SizeReaderAt + ref string +} + +func (o *sizeReaderAtOpener) ReaderAt(context.Context) (sizeReaderAtCloser, error) { + return &sizeReaderAtWithCloser{o, nil}, nil +} + +func (o *sizeReaderAtOpener) Close() {} + +func (o *sizeReaderAtOpener) Ref() string { + return o.ref +} + +type sizeReaderAtWithCloser struct { + SizeReaderAt + close func() error +} + +func (r *sizeReaderAtWithCloser) ReadAt(p []byte, offset int64) (int, error) { + if r.SizeReaderAt == nil { + return 0, io.EOF + } + return r.SizeReaderAt.ReadAt(p, offset) +} + +func (r *sizeReaderAtWithCloser) Read(p []byte) (int, error) { + if r.SizeReaderAt == nil { + return 0, io.EOF + } + return r.SizeReaderAt.Read(p) +} + +func (r *sizeReaderAtWithCloser) Size() int64 { + if r.SizeReaderAt == nil { + return 0 + } + return r.SizeReaderAt.Size() +} + +func (r *sizeReaderAtWithCloser) Close() error { + if r.close != nil { + return r.close() + } + return nil +} + // readLogsLocked is the bulk of the implementation of ReadLogs. // // w.fsopMu must be locked for reading when calling this method. // w.fsopMu.RUnlock() is called before returning. -func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) { +func (w *LogFile) readLogsLocked(ctx context.Context, currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) { + ctx, span := tracing.StartSpan(ctx, "logger.Logfile.ReadLogsLocked") + defer span.End() + defer close(watcher.Msg) currentFile, err := open(w.f.Name()) if err != nil { + w.fsopMu.RUnlock() + span.SetStatus(err) watcher.Err <- err return } @@ -410,53 +499,13 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa dec := w.createDecoder(nil) defer dec.Close() - currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size) fwd := newForwarder(config) - if config.Tail != 0 { - // TODO(@cpuguy83): Instead of opening every file, only get the files which - // are needed to tail. - // This is especially costly when compression is enabled. - files, err := w.openRotatedFiles(config) - if err != nil { - watcher.Err <- err - return - } + // At this point, w.tailFiles is responsible for unlocking w.fsopmu + ok := w.tailFiles(ctx, config, watcher, io.NewSectionReader(currentFile, 0, currentPos.size), dec, fwd) - closeFiles := func() { - for _, f := range files { - f.Close() - } - } - - readers := make([]SizeReaderAt, 0, len(files)+1) - for _, f := range files { - switch ff := f.(type) { - case SizeReaderAt: - readers = append(readers, ff) - case interface{ Stat() (fs.FileInfo, error) }: - stat, err := ff.Stat() - if err != nil { - watcher.Err <- errors.Wrap(err, "error reading size of rotated file") - closeFiles() - return - } - readers = append(readers, io.NewSectionReader(f, 0, stat.Size())) - default: - panic(fmt.Errorf("rotated file value %#v (%[1]T) has neither Size() nor Stat() methods", f)) - } - } - if currentChunk.Size() > 0 { - readers = append(readers, currentChunk) - } - - ok := tailFiles(readers, watcher, dec, w.getTailReader, config.Tail, fwd) - closeFiles() - if !ok { - return - } - } else { - w.fsopMu.RUnlock() + if !ok { + return } if !config.Follow { @@ -468,96 +517,47 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa Watcher: watcher, Decoder: dec, Forwarder: fwd, - }).Do(currentFile, currentPos) + }).Do(ctx, currentFile, currentPos) } -// openRotatedFiles returns a slice of files open for reading, in order from -// oldest to newest, and calls w.fsopMu.RUnlock() before returning. -// -// This method must only be called with w.fsopMu locked for reading. -func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []readAtCloser, err error) { - type rotatedFile struct { - f *os.File - compressed bool +type fileOpener interface { + ReaderAt(context.Context) (ra sizeReaderAtCloser, err error) + Close() + Ref() string +} + +// simpleFileOpener just holds a reference to an already open file +type simpleFileOpener struct { + f *os.File + sz int64 + closed bool +} + +func (o *simpleFileOpener) ReaderAt(context.Context) (sizeReaderAtCloser, error) { + if o.closed { + return nil, errors.New("file is closed") } - var q []rotatedFile - defer func() { + if o.sz == 0 { + stat, err := o.f.Stat() if err != nil { - for _, qq := range q { - qq.f.Close() - } - for _, f := range files { - f.Close() - } + return nil, errors.Wrap(err, "error stating file") } - }() - - q, err = func() (q []rotatedFile, err error) { - defer w.fsopMu.RUnlock() - - q = make([]rotatedFile, 0, w.maxFiles) - for i := w.maxFiles; i > 1; i-- { - var f rotatedFile - f.f, err = open(fmt.Sprintf("%s.%d", w.f.Name(), i-1)) - if err != nil { - if !errors.Is(err, fs.ErrNotExist) { - return nil, errors.Wrap(err, "error opening rotated log file") - } - f.compressed = true - f.f, err = open(fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)) - if err != nil { - if !errors.Is(err, fs.ErrNotExist) { - return nil, errors.Wrap(err, "error opening file for decompression") - } - continue - } - } - q = append(q, f) - } - return q, nil - }() - if err != nil { - return nil, err + o.sz = stat.Size() } - - for len(q) > 0 { - qq := q[0] - q = q[1:] - if qq.compressed { - defer qq.f.Close() - f, err := w.maybeDecompressFile(qq.f, config) - if err != nil { - return nil, err - } - if f != nil { - // The log before `config.Since` does not need to read - files = append(files, f) - } - } else { - files = append(files, qq.f) - } - } - return files, nil + return &sizeReaderAtWithCloser{io.NewSectionReader(o.f, 0, o.sz), nil}, nil } -func (w *LogFile) maybeDecompressFile(cf *os.File, config logger.ReadConfig) (readAtCloser, error) { - rc, err := gzip.NewReader(cf) - if err != nil { - return nil, errors.Wrap(err, "error making gzip reader for compressed log file") - } - defer rc.Close() - - // Extract the last log entry timestramp from the gzip header - extra := &rotateFileMetadata{} - err = json.Unmarshal(rc.Header.Extra, extra) - if err == nil && !extra.LastTime.IsZero() && extra.LastTime.Before(config.Since) { - return nil, nil - } - tmpf, err := w.decompress.Do(cf) - return tmpf, errors.Wrap(err, "error decompressing log file") +func (o *simpleFileOpener) Ref() string { + return o.f.Name() } +func (o *simpleFileOpener) Close() { + _ = o.f.Close() + o.closed = true +} + +// converter function used by shareTempFileConverter func decompress(dst io.WriteSeeker, src io.ReadSeeker) error { if _, err := src.Seek(0, io.SeekStart); err != nil { return err @@ -573,12 +573,209 @@ func decompress(dst io.WriteSeeker, src io.ReadSeeker) error { return rc.Close() } -func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) { - ctx, cancel := context.WithCancel(context.Background()) +// compressedFileOpener holds a reference to compressed a log file and will +// lazily open a decompressed version of the file. +type compressedFileOpener struct { + closed bool + + f *os.File + + lf *LogFile + ifBefore time.Time +} + +func (cfo *compressedFileOpener) ReaderAt(ctx context.Context) (_ sizeReaderAtCloser, retErr error) { + _, span := tracing.StartSpan(ctx, "logger.Logfile.Compressed.ReaderAt") + defer func() { + if retErr != nil { + span.SetStatus(retErr) + } + span.End() + }() + + span.SetAttributes(attribute.String("file", cfo.f.Name())) + + if cfo.closed { + return nil, errors.New("compressed file closed") + } + + gzr, err := gzip.NewReader(cfo.f) + if err != nil { + return nil, err + } + defer gzr.Close() + + // Extract the last log entry timestamp from the gzip header + // Use this to determine if we even need to read this file based on inputs + extra := &rotateFileMetadata{} + err = json.Unmarshal(gzr.Header.Extra, extra) + if err == nil && !extra.LastTime.IsZero() && extra.LastTime.Before(cfo.ifBefore) { + span.SetAttributes(attribute.Bool("skip", true)) + return &sizeReaderAtWithCloser{}, nil + } + if err == nil { + span.SetAttributes(attribute.Stringer("lastLogTime", extra.LastTime)) + } + + span.AddEvent("Start decompress") + return cfo.lf.decompress.Do(cfo.f) +} + +func (cfo *compressedFileOpener) Close() { + cfo.closed = true + cfo.f.Close() +} + +func (cfo *compressedFileOpener) Ref() string { + return cfo.f.Name() +} + +type emptyFileOpener struct{} + +func (emptyFileOpener) ReaderAt(context.Context) (sizeReaderAtCloser, error) { + return &sizeReaderAtWithCloser{}, nil +} + +func (emptyFileOpener) Close() {} + +func (emptyFileOpener) Ref() string { + return "null" +} + +// openRotatedFiles returns a slice of files open for reading, in order from +// oldest to newest, and calls w.fsopMu.RUnlock() before returning. +// +// This method must only be called with w.fsopMu locked for reading. +func (w *LogFile) openRotatedFiles(ctx context.Context, config logger.ReadConfig) (_ []fileOpener, retErr error) { + var out []fileOpener + + defer func() { + if retErr != nil { + for _, fo := range out { + fo.Close() + } + } + }() + + for i := w.maxFiles; i > 1; i-- { + fo, err := w.openRotatedFile(ctx, i-1, config) + if err != nil { + return nil, err + } + out = append(out, fo) + } + + return out, nil +} + +func (w *LogFile) openRotatedFile(ctx context.Context, i int, config logger.ReadConfig) (fileOpener, error) { + f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i)) + if err == nil { + return &simpleFileOpener{ + f: f, + }, nil + } + + if !errors.Is(err, fs.ErrNotExist) { + return nil, errors.Wrap(err, "error opening rotated log file") + } + + f, err = open(fmt.Sprintf("%s.%d.gz", w.f.Name(), i)) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return nil, errors.Wrap(err, "error opening file for decompression") + } + return &emptyFileOpener{}, nil + } + + return &compressedFileOpener{ + f: f, + lf: w, + ifBefore: config.Since, + }, nil +} + +// This is used to improve type safety around tailing logs +// Some log readers require the log file to be closed, so this makes sure all +// implementers have a closer even if it may be a no-op. +// This is opposed to asserting a type. +type sizeReaderAtCloser interface { + SizeReaderAt + io.Closer +} + +func getTailFiles(ctx context.Context, files []fileOpener, nLines int, getTailReader GetTailReaderFunc) (_ []sizeReaderAtCloser, retErr error) { + ctx, span := tracing.StartSpan(ctx, "logger.Logfile.CollectTailFiles") + span.SetAttributes(attribute.Int("requested_lines", nLines)) + + defer func() { + if retErr != nil { + span.SetStatus(retErr) + } + span.End() + }() + out := make([]sizeReaderAtCloser, 0, len(files)) + + defer func() { + if retErr != nil { + for _, ra := range out { + if err := ra.Close(); err != nil { + log.G(ctx).WithError(err).Warn("Error closing log reader") + } + } + } + }() + + if nLines <= 0 { + for _, fo := range files { + span.AddEvent("Open file", trace.WithAttributes(attribute.String("file", fo.Ref()))) + + ra, err := fo.ReaderAt(ctx) + if err != nil { + return nil, err + } + out = append(out, ra) + + } + return out, nil + } + + for i := len(files) - 1; i >= 0 && nLines > 0; i-- { + if err := ctx.Err(); err != nil { + return nil, errors.Wrap(err, "stopping parsing files to tail due to error") + } + + fo := files[i] + + fileAttr := attribute.String("file", fo.Ref()) + span.AddEvent("Open file", trace.WithAttributes(fileAttr)) + + ra, err := fo.ReaderAt(ctx) + if err != nil { + return nil, err + } + + span.AddEvent("Scan file to tail", trace.WithAttributes(fileAttr, attribute.Int("remaining_lines", nLines))) + + tail, n, err := getTailReader(ctx, ra, nLines) + if err != nil { + ra.Close() + log.G(ctx).WithError(err).Warn("Error scanning log file for tail file request, skipping") + continue + } + nLines -= n + out = append(out, &sizeReaderAtWithCloser{tail, ra.Close}) + } + + slices.Reverse(out) + + return out, nil +} + +func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) { + ctx, cancel := context.WithCancel(ctx) defer cancel() - cont = true - // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here. go func() { select { case <-ctx.Done(): @@ -587,27 +784,64 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge } }() - readers := make([]io.Reader, 0, len(files)) + readers, err := getTailFiles(ctx, files, nLines, getTailReader) + if err != nil { + watcher.Err <- err + return false + } - if nLines > 0 { - for i := len(files) - 1; i >= 0 && nLines > 0; i-- { - tail, n, err := getTailReader(ctx, files[i], nLines) - if err != nil { - watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing") - return false + var idx int + defer func() { + // Make sure all are released if there is an early return. + if !cont { + for _, r := range readers[idx:] { + if err := r.Close(); err != nil { + log.G(ctx).WithError(err).Debug("Error closing log reader") + } } - nLines -= n - readers = append([]io.Reader{tail}, readers...) } - } else { - for _, r := range files { - readers = append(readers, r) + }() + + for _, ra := range readers { + ra := ra + select { + case <-watcher.WatchConsumerGone(): + return false + case <-ctx.Done(): + return false + default: + } + + dec.Reset(ra) + + cancel := context.AfterFunc(ctx, func() { + if err := ra.Close(); err != nil { + log.G(ctx).WithError(err).Debug("Error closing log reader") + } + }) + + ok := fwd.Do(ctx, watcher, func() (*logger.Message, error) { + msg, err := dec.Decode() + if err != nil && !errors.Is(err, io.EOF) { + // We have an error decoding the stream, but we don't want to error out + // the whole log reader. + // If we return anything other than EOF then the forwarder will return + // false and we'll exit the loop. + // Instead just log the error here and return an EOF so we can move to + // the next file. + log.G(ctx).WithError(err).Warn("Error decoding log file") + return nil, io.EOF + } + return msg, err + }) + cancel() + idx++ + if !ok { + return false } } - rdr := io.MultiReader(readers...) - dec.Reset(rdr) - return fwd.Do(watcher, dec) + return true } type forwarder struct { @@ -622,16 +856,35 @@ func newForwarder(config logger.ReadConfig) *forwarder { // conditions to watcher. Do returns cont=true iff it has read all messages from // dec without encountering a message with a timestamp which is after the // configured until time. -func (fwd *forwarder) Do(watcher *logger.LogWatcher, dec Decoder) (cont bool) { +func (fwd *forwarder) Do(ctx context.Context, watcher *logger.LogWatcher, next func() (*logger.Message, error)) (cont bool) { + ctx, span := tracing.StartSpan(ctx, "logger.Logfile.Forward") + defer func() { + span.SetAttributes(attribute.Bool("continue", cont)) + span.End() + }() + for { - msg, err := dec.Decode() + select { + case <-watcher.WatchConsumerGone(): + span.AddEvent("watch consumer gone") + return false + case <-ctx.Done(): + span.AddEvent(ctx.Err().Error()) + return false + default: + } + + msg, err := next() if err != nil { if errors.Is(err, io.EOF) { + span.AddEvent("EOF") return true } - watcher.Err <- err + span.SetStatus(err) + log.G(ctx).WithError(err).Debug("Error while decoding log entry, not continuing") return false } + if !fwd.since.IsZero() { if msg.Timestamp.Before(fwd.since) { continue @@ -643,10 +896,16 @@ func (fwd *forwarder) Do(watcher *logger.LogWatcher, dec Decoder) (cont bool) { fwd.since = time.Time{} } if !fwd.until.IsZero() && msg.Timestamp.After(fwd.until) { + log.G(ctx).Debug("Log is newer than requested window, skipping remaining logs") return false } + select { + case <-ctx.Done(): + span.AddEvent(ctx.Err().Error()) + return false case <-watcher.WatchConsumerGone(): + span.AddEvent("watch consumer gone") return false case watcher.Msg <- msg: } diff --git a/daemon/logger/loggerutils/logfile_race_test.go b/daemon/logger/loggerutils/logfile_race_test.go index b27e338b5b..36fbb112cc 100644 --- a/daemon/logger/loggerutils/logfile_race_test.go +++ b/daemon/logger/loggerutils/logfile_race_test.go @@ -27,7 +27,7 @@ func TestConcurrentLogging(t *testing.T) { maxFiles = 3 compress = true ) - getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (SizeReaderAt, int, error) { return tailfile.NewTailReader(ctx, r, lines) } createDecoder := func(io.Reader) Decoder { diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index e410169fb1..4c526e9be7 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -4,10 +4,12 @@ import ( "bufio" "bytes" "context" + "encoding/json" "fmt" "io" "os" "path/filepath" + "strconv" "strings" "testing" "text/tabwriter" @@ -16,91 +18,179 @@ import ( "github.com/docker/docker/daemon/logger" "github.com/docker/docker/pkg/tailfile" "gotest.tools/v3/assert" + "gotest.tools/v3/assert/cmp" "gotest.tools/v3/poll" ) type testDecoder struct { - rdr io.Reader scanner *bufio.Scanner resetCount int } -func (d *testDecoder) Decode() (*logger.Message, error) { - if d.scanner == nil { - d.scanner = bufio.NewScanner(d.rdr) - } +func (d *testDecoder) Decode() (retMsg *logger.Message, retErr error) { if !d.scanner.Scan() { - return nil, d.scanner.Err() + err := d.scanner.Err() + if err == nil { + err = io.EOF + } + return nil, err } + // some comment return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil } func (d *testDecoder) Reset(rdr io.Reader) { - d.rdr = rdr d.scanner = bufio.NewScanner(rdr) d.resetCount++ } func (d *testDecoder) Close() { - d.rdr = nil d.scanner = nil } +// tetsJSONStreamDecoder is used as an easy way to test how [SyntaxError]s are +// handled in the log reader. +type testJSONStreamDecoder struct { + dec *json.Decoder +} + +func (d *testJSONStreamDecoder) Decode() (*logger.Message, error) { + var m logger.Message + if err := d.dec.Decode(&m); err != nil { + return nil, err + } + + return &m, nil +} + +func (d *testJSONStreamDecoder) Reset(rdr io.Reader) { + d.dec = json.NewDecoder(rdr) +} + +func (d *testJSONStreamDecoder) Close() { + d.dec = nil +} + func TestTailFiles(t *testing.T) { s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n") s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n") s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n") - files := []SizeReaderAt{s1, s2, s3} + makeOpener := func(ls ...SizeReaderAt) []fileOpener { + out := make([]fileOpener, 0, len(ls)) + for i, rdr := range ls { + out = append(out, &sizeReaderAtOpener{rdr, strconv.Itoa(i)}) + } + return out + } + + files := makeOpener(s1, s2, s3) watcher := logger.NewLogWatcher() defer watcher.ConsumerGone() - tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (SizeReaderAt, int, error) { return tailfile.NewTailReader(ctx, r, lines) } dec := &testDecoder{} - for desc, config := range map[string]logger.ReadConfig{} { - t.Run(desc, func(t *testing.T) { - started := make(chan struct{}) - fwd := newForwarder(config) - go func() { - close(started) - tailFiles(files, watcher, dec, tailReader, config.Tail, fwd) - }() - <-started - }) - } - config := logger.ReadConfig{Tail: 2} fwd := newForwarder(config) started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config.Tail, fwd) + tailFiles(context.TODO(), files, watcher, dec, tailReader, config.Tail, fwd) }() <-started - select { - case <-time.After(60 * time.Second): - t.Fatal("timeout waiting for tail line") - case err := <-watcher.Err: - assert.NilError(t, err) - case msg := <-watcher.Msg: - assert.Assert(t, msg != nil) - assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line)) - } + waitForMsg(t, watcher, "Roads?", 10*time.Second) + waitForMsg(t, watcher, "Where we're going we don't need roads.", 10*time.Second) - select { - case <-time.After(60 * time.Second): - t.Fatal("timeout waiting for tail line") - case err := <-watcher.Err: + t.Run("handle corrupted data", func(t *testing.T) { + // Here we'll use the test json decoder to test injecting garbage data + // in the middle of otherwise valid json streams + // The log reader should be able to skip over that data. + + writeMsg := func(buf *bytes.Buffer, s string) { + t.Helper() + + msg := &logger.Message{Line: []byte(s)} + dt, err := json.Marshal(msg) + assert.NilError(t, err) + + _, err = buf.Write(dt) + assert.NilError(t, err) + _, err = buf.WriteString("\n") + assert.NilError(t, err) + } + + msg1 := "Hello" + msg2 := "World!" + msg3 := "And again!" + msg4 := "One more time!" + msg5 := "This is the end!" + + f1 := bytes.NewBuffer(nil) + writeMsg(f1, msg1) + + _, err := f1.WriteString("some randome garbage") + assert.NilError(t, err, "error writing garbage to log stream") + + writeMsg(f1, msg2) // This won't be seen due to garbage written above + + f2 := bytes.NewBuffer(nil) + writeMsg(f2, msg3) + + // Write what looks like the start of a new log message + _, err = f2.WriteString("{\"Line\": ") assert.NilError(t, err) - case msg := <-watcher.Msg: - assert.Assert(t, msg != nil) - assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line)) - } + + writeMsg(f2, msg4) // This won't be seen due to garbage written above + + f3 := bytes.NewBuffer(nil) + writeMsg(f3, msg5) + + // [bytes.Buffer] is not a SizeReaderAt, so we need to convert it here. + files := makeOpener(bytes.NewReader(f1.Bytes()), bytes.NewReader(f2.Bytes()), bytes.NewReader(f3.Bytes())) + + // At this point we our log "files" should have 4 log messages in it + // interspersed with some junk that is invalid json. + + // We need a zero size watcher so that we can tell the decoder to give us + // a syntax error. + watcher := logger.NewLogWatcher() + + config := logger.ReadConfig{Tail: 4} + fwd := newForwarder(config) + + started := make(chan struct{}) + done := make(chan struct{}) + go func() { + close(started) + tailFiles(context.TODO(), files, watcher, &testJSONStreamDecoder{}, tailReader, config.Tail, fwd) + close(done) + }() + + waitOrTimeout := func(ch <-chan struct{}) { + t.Helper() + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + select { + case <-timer.C: + t.Fatal("timeout waiting for channel") + case <-ch: + } + } + + waitOrTimeout(started) + + // Note that due to how the json decoder works, we won't see anything in f1 + waitForMsg(t, watcher, msg3, 10*time.Second) + waitForMsg(t, watcher, msg5, 10*time.Second) + + waitOrTimeout(done) + }) } type dummyDecoder struct{} @@ -116,7 +206,7 @@ func TestCheckCapacityAndRotate(t *testing.T) { dir := t.TempDir() logPath := filepath.Join(dir, "log") - getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (SizeReaderAt, int, error) { return tailfile.NewTailReader(ctx, r, lines) } createDecoder := func(io.Reader) Decoder { @@ -160,12 +250,12 @@ func TestCheckCapacityAndRotate(t *testing.T) { t.Run("with log reader", func(t *testing.T) { // Make sure rotate works with an active reader - lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}) + lw := l.ReadLogs(context.TODO(), logger.ReadConfig{Follow: true, Tail: 1000}) defer lw.ConsumerGone() - assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!")), ls) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!\n")), ls) // make sure the log reader is primed - waitForMsg(t, lw, 30*time.Second) + waitForMsg(t, lw, "", 30*time.Second) assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 1!")), ls) assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 2!")), ls) @@ -175,17 +265,18 @@ func TestCheckCapacityAndRotate(t *testing.T) { }) } -func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) { +func waitForMsg(t *testing.T, lw *logger.LogWatcher, expected string, timeout time.Duration) { t.Helper() timer := time.NewTimer(timeout) defer timer.Stop() select { - case _, ok := <-lw.Msg: - assert.Assert(t, ok, "log producer gone before log message arrived") case err := <-lw.Err: assert.NilError(t, err) + case msg, ok := <-lw.Msg: + assert.Assert(t, ok, "log producer gone before log message arrived") + assert.Check(t, cmp.Equal(string(msg.Line), expected)) case <-timer.C: t.Fatal("timeout waiting for log message") } diff --git a/daemon/logger/loggerutils/sharedtemp.go b/daemon/logger/loggerutils/sharedtemp.go index c3493caabc..f5c8c1f035 100644 --- a/daemon/logger/loggerutils/sharedtemp.go +++ b/daemon/logger/loggerutils/sharedtemp.go @@ -76,7 +76,7 @@ func (c *sharedTempFileConverter) Do(f *os.File) (*sharedFileReader, error) { // ModTime, which conveniently also handles the case of true // positives where the file has also been modified since it was // first converted. - if os.SameFile(tf.src, stat) && tf.src.ModTime() == stat.ModTime() { + if os.SameFile(tf.src, stat) && tf.src.ModTime().Equal(stat.ModTime()) { return c.openExisting(st, id, tf) } } diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go index 0934537ec1..23751f387e 100644 --- a/daemon/logger/ring.go +++ b/daemon/logger/ring.go @@ -1,6 +1,7 @@ package logger // import "github.com/docker/docker/daemon/logger" import ( + "context" "errors" "sync" "sync/atomic" @@ -20,19 +21,22 @@ type RingLogger struct { wg sync.WaitGroup } -var _ SizedLogger = &RingLogger{} +var ( + _ SizedLogger = (*RingLogger)(nil) + _ LogReader = (*ringWithReader)(nil) +) type ringWithReader struct { *RingLogger } -func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher { +func (r *ringWithReader) ReadLogs(ctx context.Context, cfg ReadConfig) *LogWatcher { reader, ok := r.l.(LogReader) if !ok { // something is wrong if we get here panic("expected log reader") } - return reader.ReadLogs(cfg) + return reader.ReadLogs(ctx, cfg) } func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger { diff --git a/daemon/logs.go b/daemon/logs.go index c48a77c987..1c72eee410 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -5,6 +5,7 @@ import ( "strconv" "time" + "github.com/containerd/containerd/tracing" "github.com/containerd/log" "github.com/docker/docker/api/types/backend" containertypes "github.com/docker/docker/api/types/container" @@ -24,6 +25,12 @@ import ( // if it returns nil, the config channel will be active and return log // messages until it runs out or the context is canceled. func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *containertypes.LogsOptions) (messages <-chan *backend.LogMessage, isTTY bool, retErr error) { + ctx, span := tracing.StartSpan(ctx, "daemon.ContainerLogs") + defer func() { + span.SetStatus(retErr) + span.End() + }() + lg := log.G(ctx).WithFields(log.Fields{ "module": "daemon", "method": "(*Daemon).ContainerLogs", @@ -96,7 +103,7 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c Follow: follow, } - logs := logReader.ReadLogs(readConfig) + logs := logReader.ReadLogs(ctx, readConfig) // past this point, we can't possibly return any errors, so we can just // start a goroutine and return to tell the caller not to expect errors diff --git a/pkg/tailfile/tailfile.go b/pkg/tailfile/tailfile.go index afc84f00bb..af70b3e8a5 100644 --- a/pkg/tailfile/tailfile.go +++ b/pkg/tailfile/tailfile.go @@ -48,7 +48,7 @@ type SizeReaderAt interface { } // NewTailReader scopes the passed in reader to just the last N lines passed in -func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (io.Reader, int, error) { +func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (*io.SectionReader, int, error) { return NewTailReaderWithDelimiter(ctx, r, reqLines, eol) } @@ -56,7 +56,7 @@ func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (io.Reader // In this case a "line" is defined by the passed in delimiter. // // Delimiter lengths should be generally small, no more than 12 bytes -func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines int, delimiter []byte) (io.Reader, int, error) { +func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines int, delimiter []byte) (*io.SectionReader, int, error) { if reqLines < 1 { return nil, 0, ErrNonPositiveLinesNumber } @@ -71,7 +71,7 @@ func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines in ) if int64(len(delimiter)) >= size { - return bytes.NewReader(nil), 0, nil + return io.NewSectionReader(bytes.NewReader(nil), 0, 0), 0, nil } scanner := newScanner(r, delimiter) @@ -92,7 +92,7 @@ func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines in tailStart = scanner.Start(ctx) if found == 0 { - return bytes.NewReader(nil), 0, nil + return io.NewSectionReader(bytes.NewReader(nil), 0, 0), 0, nil } if found < reqLines && tailStart != 0 {