From b37c8a03c013ab50c9e3346a62840250b0672e6f Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Thu, 27 Jun 2024 20:58:05 +0000 Subject: [PATCH 1/6] Fix time comparison in sharedtemp implementation Times cannot be compared with `==` and instead should use the `t.Equal` function. Signed-off-by: Brian Goff --- daemon/logger/loggerutils/sharedtemp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemon/logger/loggerutils/sharedtemp.go b/daemon/logger/loggerutils/sharedtemp.go index c3493caabc..f5c8c1f035 100644 --- a/daemon/logger/loggerutils/sharedtemp.go +++ b/daemon/logger/loggerutils/sharedtemp.go @@ -76,7 +76,7 @@ func (c *sharedTempFileConverter) Do(f *os.File) (*sharedFileReader, error) { // ModTime, which conveniently also handles the case of true // positives where the file has also been modified since it was // first converted. - if os.SameFile(tf.src, stat) && tf.src.ModTime() == stat.ModTime() { + if os.SameFile(tf.src, stat) && tf.src.ModTime().Equal(stat.ModTime()) { return c.openExisting(st, id, tf) } } From 77f2d90e2745be81441a45a7fca94c90bcd5cc7c Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Wed, 5 Jun 2024 22:04:57 +0000 Subject: [PATCH 2/6] Refactor logfile reads This simplifies how we manage log files, especially rotated ones. It also fixes a long-standing issue to lazily open rotated files so we don't needlessly start decompressing files that we don't need. Much of this is just setting things up for commits following this one. It uses ReaderAtSize for managing all files to be tailed and manages cleanups by passing closures. Signed-off-by: Brian Goff --- daemon/logger/jsonfilelog/read.go | 2 +- daemon/logger/local/read.go | 2 +- daemon/logger/loggerutils/logfile.go | 425 ++++++++++++------ .../logger/loggerutils/logfile_race_test.go | 2 +- daemon/logger/loggerutils/logfile_test.go | 14 +- pkg/tailfile/tailfile.go | 8 +- 6 files changed, 297 insertions(+), 156 deletions(-) diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index bea8ceedb3..3a51267fe2 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -79,6 +79,6 @@ func decodeFunc(rdr io.Reader) loggerutils.Decoder { } } -func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { +func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (loggerutils.SizeReaderAt, int, error) { return tailfile.NewTailReader(ctx, r, req) } diff --git a/daemon/logger/local/read.go b/daemon/logger/local/read.go index cb5f9f0cd3..4108b19f19 100644 --- a/daemon/logger/local/read.go +++ b/daemon/logger/local/read.go @@ -22,7 +22,7 @@ func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { return d.logfile.ReadLogs(config) } -func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { +func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (loggerutils.SizeReaderAt, int, error) { size := r.Size() if req < 0 { return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req)) diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 61490c8d1a..babbc7279a 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -9,6 +9,7 @@ import ( "io/fs" "math" "os" + "slices" "strconv" "sync" "time" @@ -107,16 +108,11 @@ type SizeReaderAt interface { Size() int64 } -type readAtCloser interface { - io.ReaderAt - io.Closer -} - // GetTailReaderFunc is used to truncate a reader to only read as much as is required // in order to get the passed in number of log lines. // It returns the sectioned reader, the number of lines that the section reader // contains, and any error that occurs. -type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error) +type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr SizeReaderAt, nLines int, err error) // NewLogFile creates new LogFile func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) { @@ -393,6 +389,72 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { return watcher } +// tailFiles must be called with w.fsopMu locked for reads. +// w.fsopMu.RUnlock() is called before returning. +func (w *LogFile) tailFiles(config logger.ReadConfig, watcher *logger.LogWatcher, current SizeReaderAt, dec Decoder, fwd *forwarder) bool { + if config.Tail == 0 { + w.fsopMu.RUnlock() + return true + } + + files, err := w.openRotatedFiles(config) + w.fsopMu.RUnlock() + + if err != nil { + watcher.Err <- err + return false + } + + if current.Size() > 0 { + files = append(files, &sizeReaderAtOpener{current}) + } + + return tailFiles(files, watcher, dec, w.getTailReader, config.Tail, fwd) +} + +type sizeReaderAtOpener struct { + SizeReaderAt +} + +func (o *sizeReaderAtOpener) ReaderAt() (sizeReaderAtCloser, error) { + return &sizeReaderAtWithCloser{o, nil}, nil +} + +func (o *sizeReaderAtOpener) Close() {} + +type sizeReaderAtWithCloser struct { + SizeReaderAt + close func() error +} + +func (r *sizeReaderAtWithCloser) ReadAt(p []byte, offset int64) (int, error) { + if r.SizeReaderAt == nil { + return 0, io.EOF + } + return r.SizeReaderAt.ReadAt(p, offset) +} + +func (r *sizeReaderAtWithCloser) Read(p []byte) (int, error) { + if r.SizeReaderAt == nil { + return 0, io.EOF + } + return r.SizeReaderAt.Read(p) +} + +func (r *sizeReaderAtWithCloser) Size() int64 { + if r.SizeReaderAt == nil { + return 0 + } + return r.SizeReaderAt.Size() +} + +func (r *sizeReaderAtWithCloser) Close() error { + if r.close != nil { + return r.close() + } + return nil +} + // readLogsLocked is the bulk of the implementation of ReadLogs. // // w.fsopMu must be locked for reading when calling this method. @@ -402,6 +464,7 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa currentFile, err := open(w.f.Name()) if err != nil { + w.fsopMu.RUnlock() watcher.Err <- err return } @@ -410,53 +473,13 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa dec := w.createDecoder(nil) defer dec.Close() - currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size) fwd := newForwarder(config) - if config.Tail != 0 { - // TODO(@cpuguy83): Instead of opening every file, only get the files which - // are needed to tail. - // This is especially costly when compression is enabled. - files, err := w.openRotatedFiles(config) - if err != nil { - watcher.Err <- err - return - } + // At this point, w.tailFiles is responsible for unlocking w.fsopmu + ok := w.tailFiles(config, watcher, io.NewSectionReader(currentFile, 0, currentPos.size), dec, fwd) - closeFiles := func() { - for _, f := range files { - f.Close() - } - } - - readers := make([]SizeReaderAt, 0, len(files)+1) - for _, f := range files { - switch ff := f.(type) { - case SizeReaderAt: - readers = append(readers, ff) - case interface{ Stat() (fs.FileInfo, error) }: - stat, err := ff.Stat() - if err != nil { - watcher.Err <- errors.Wrap(err, "error reading size of rotated file") - closeFiles() - return - } - readers = append(readers, io.NewSectionReader(f, 0, stat.Size())) - default: - panic(fmt.Errorf("rotated file value %#v (%[1]T) has neither Size() nor Stat() methods", f)) - } - } - if currentChunk.Size() > 0 { - readers = append(readers, currentChunk) - } - - ok := tailFiles(readers, watcher, dec, w.getTailReader, config.Tail, fwd) - closeFiles() - if !ok { - return - } - } else { - w.fsopMu.RUnlock() + if !ok { + return } if !config.Follow { @@ -471,93 +494,38 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa }).Do(currentFile, currentPos) } -// openRotatedFiles returns a slice of files open for reading, in order from -// oldest to newest, and calls w.fsopMu.RUnlock() before returning. -// -// This method must only be called with w.fsopMu locked for reading. -func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []readAtCloser, err error) { - type rotatedFile struct { - f *os.File - compressed bool - } +type fileOpener interface { + ReaderAt() (ra sizeReaderAtCloser, err error) + Close() +} - var q []rotatedFile - defer func() { +// simpleFileOpener just holds a reference to an already open file +type simpleFileOpener struct { + f *os.File + sz int64 + closed bool +} + +func (o *simpleFileOpener) ReaderAt() (sizeReaderAtCloser, error) { + if o.closed { + return nil, errors.New("file is closed") + } + if o.sz == 0 { + stat, err := o.f.Stat() if err != nil { - for _, qq := range q { - qq.f.Close() - } - for _, f := range files { - f.Close() - } + return nil, errors.Wrap(err, "error stating file") } - }() - - q, err = func() (q []rotatedFile, err error) { - defer w.fsopMu.RUnlock() - - q = make([]rotatedFile, 0, w.maxFiles) - for i := w.maxFiles; i > 1; i-- { - var f rotatedFile - f.f, err = open(fmt.Sprintf("%s.%d", w.f.Name(), i-1)) - if err != nil { - if !errors.Is(err, fs.ErrNotExist) { - return nil, errors.Wrap(err, "error opening rotated log file") - } - f.compressed = true - f.f, err = open(fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)) - if err != nil { - if !errors.Is(err, fs.ErrNotExist) { - return nil, errors.Wrap(err, "error opening file for decompression") - } - continue - } - } - q = append(q, f) - } - return q, nil - }() - if err != nil { - return nil, err + o.sz = stat.Size() } - - for len(q) > 0 { - qq := q[0] - q = q[1:] - if qq.compressed { - defer qq.f.Close() - f, err := w.maybeDecompressFile(qq.f, config) - if err != nil { - return nil, err - } - if f != nil { - // The log before `config.Since` does not need to read - files = append(files, f) - } - } else { - files = append(files, qq.f) - } - } - return files, nil + return &sizeReaderAtWithCloser{io.NewSectionReader(o.f, 0, o.sz), nil}, nil } -func (w *LogFile) maybeDecompressFile(cf *os.File, config logger.ReadConfig) (readAtCloser, error) { - rc, err := gzip.NewReader(cf) - if err != nil { - return nil, errors.Wrap(err, "error making gzip reader for compressed log file") - } - defer rc.Close() - - // Extract the last log entry timestramp from the gzip header - extra := &rotateFileMetadata{} - err = json.Unmarshal(rc.Header.Extra, extra) - if err == nil && !extra.LastTime.IsZero() && extra.LastTime.Before(config.Since) { - return nil, nil - } - tmpf, err := w.decompress.Do(cf) - return tmpf, errors.Wrap(err, "error decompressing log file") +func (o *simpleFileOpener) Close() { + _ = o.f.Close() + o.closed = true } +// converter function used by shareTempFileConverter func decompress(dst io.WriteSeeker, src io.ReadSeeker) error { if _, err := src.Seek(0, io.SeekStart); err != nil { return err @@ -573,11 +541,164 @@ func decompress(dst io.WriteSeeker, src io.ReadSeeker) error { return rc.Close() } -func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) { +// compressedFileOpener holds a reference to compressed a log file and will +// lazily open a decompressed version of the file. +type compressedFileOpener struct { + closed bool + + f *os.File + + lf *LogFile + ifBefore time.Time +} + +func (cfo *compressedFileOpener) ReaderAt() (_ sizeReaderAtCloser, retErr error) { + if cfo.closed { + return nil, errors.New("compressed file closed") + } + + gzr, err := gzip.NewReader(cfo.f) + if err != nil { + return nil, err + } + defer gzr.Close() + + // Extract the last log entry timestramp from the gzip header + // Use this to determine if we even need to read this file based on inputs + extra := &rotateFileMetadata{} + err = json.Unmarshal(gzr.Header.Extra, extra) + if err == nil && !extra.LastTime.IsZero() && extra.LastTime.Before(cfo.ifBefore) { + return &sizeReaderAtWithCloser{}, nil + } + + return cfo.lf.decompress.Do(cfo.f) +} + +func (cfo *compressedFileOpener) Close() { + cfo.closed = true + cfo.f.Close() +} + +type emptyFileOpener struct{} + +func (emptyFileOpener) ReaderAt() (sizeReaderAtCloser, error) { + return &sizeReaderAtWithCloser{}, nil +} + +func (emptyFileOpener) Close() {} + +// openRotatedFiles returns a slice of files open for reading, in order from +// oldest to newest, and calls w.fsopMu.RUnlock() before returning. +// +// This method must only be called with w.fsopMu locked for reading. +func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (_ []fileOpener, retErr error) { + var out []fileOpener + + defer func() { + if retErr != nil { + for _, fo := range out { + fo.Close() + } + } + }() + + for i := w.maxFiles; i > 1; i-- { + fo, err := w.openRotatedFile(i-1, config) + if err != nil { + return nil, err + } + out = append(out, fo) + } + + return out, nil +} + +func (w *LogFile) openRotatedFile(i int, config logger.ReadConfig) (fileOpener, error) { + f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i)) + if err == nil { + return &simpleFileOpener{ + f: f, + }, nil + } + + if !errors.Is(err, fs.ErrNotExist) { + return nil, errors.Wrap(err, "error opening rotated log file") + } + + f, err = open(fmt.Sprintf("%s.%d.gz", w.f.Name(), i)) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return nil, errors.Wrap(err, "error opening file for decompression") + } + return &emptyFileOpener{}, nil + } + + return &compressedFileOpener{ + f: f, + lf: w, + ifBefore: config.Since, + }, nil +} + +// This is used to improve type safety around tailing logs +// Some log readers require the log file to be closed, so this makes sure all +// implementers have a closer even if it may be a no-op. +// This is opposed to asserting a type. +type sizeReaderAtCloser interface { + SizeReaderAt + io.Closer +} + +func getTailFiles(ctx context.Context, files []fileOpener, nLines int, getTailReader GetTailReaderFunc) (_ []sizeReaderAtCloser, retErr error) { + out := make([]sizeReaderAtCloser, 0, len(files)) + + defer func() { + if retErr != nil { + for _, ra := range out { + if err := ra.Close(); err != nil { + log.G(ctx).WithError(err).Warn("Error closing log reader") + } + } + } + }() + + if nLines <= 0 { + for _, fo := range files { + ra, err := fo.ReaderAt() + if err != nil { + return nil, err + } + out = append(out, ra) + } + return out, nil + } + + for i := len(files) - 1; i >= 0 && nLines > 0; i-- { + if err := ctx.Err(); err != nil { + return nil, errors.Wrap(err, "stopping parsing files to tail due to error") + } + ra, err := files[i].ReaderAt() + if err != nil { + return nil, err + } + + tail, n, err := getTailReader(ctx, ra, nLines) + if err != nil { + return nil, errors.Wrap(err, "error finding file position to start log tailing") + } + nLines -= n + out = append(out, &sizeReaderAtWithCloser{tail, ra.Close}) + } + + slices.Reverse(out) + + return out, nil +} + +func tailFiles(files []fileOpener, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cont = true // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here. go func() { select { @@ -587,27 +708,39 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge } }() - readers := make([]io.Reader, 0, len(files)) + readers, err := getTailFiles(ctx, files, nLines, getTailReader) + if err != nil { + watcher.Err <- err + return false + } - if nLines > 0 { - for i := len(files) - 1; i >= 0 && nLines > 0; i-- { - tail, n, err := getTailReader(ctx, files[i], nLines) - if err != nil { - watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing") - return false + var idx int + defer func() { + // Make sure all are released if there is an early return. + if !cont { + for _, r := range readers[idx:] { + if err := r.Close(); err != nil { + log.G(ctx).WithError(err).Warn("Error closing log reader") + } } - nLines -= n - readers = append([]io.Reader{tail}, readers...) } - } else { - for _, r := range files { - readers = append(readers, r) + }() + + for _, ra := range readers { + dec.Reset(ra) + + ok := fwd.Do(watcher, dec) + if err := ra.Close(); err != nil { + log.G(ctx).WithError(err).Warn("Error closing log reader") + } + idx++ + + if !ok { + return false } } - rdr := io.MultiReader(readers...) - dec.Reset(rdr) - return fwd.Do(watcher, dec) + return true } type forwarder struct { diff --git a/daemon/logger/loggerutils/logfile_race_test.go b/daemon/logger/loggerutils/logfile_race_test.go index b27e338b5b..36fbb112cc 100644 --- a/daemon/logger/loggerutils/logfile_race_test.go +++ b/daemon/logger/loggerutils/logfile_race_test.go @@ -27,7 +27,7 @@ func TestConcurrentLogging(t *testing.T) { maxFiles = 3 compress = true ) - getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (SizeReaderAt, int, error) { return tailfile.NewTailReader(ctx, r, lines) } createDecoder := func(io.Reader) Decoder { diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index e410169fb1..51f6253669 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -52,11 +52,19 @@ func TestTailFiles(t *testing.T) { s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n") s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n") - files := []SizeReaderAt{s1, s2, s3} + makeLZO := func(ls ...SizeReaderAt) []fileOpener { + out := make([]fileOpener, 0, len(ls)) + for _, rdr := range ls { + out = append(out, &sizeReaderAtOpener{rdr}) + } + return out + } + + files := makeLZO(s1, s2, s3) watcher := logger.NewLogWatcher() defer watcher.ConsumerGone() - tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (SizeReaderAt, int, error) { return tailfile.NewTailReader(ctx, r, lines) } dec := &testDecoder{} @@ -116,7 +124,7 @@ func TestCheckCapacityAndRotate(t *testing.T) { dir := t.TempDir() logPath := filepath.Join(dir, "log") - getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (SizeReaderAt, int, error) { return tailfile.NewTailReader(ctx, r, lines) } createDecoder := func(io.Reader) Decoder { diff --git a/pkg/tailfile/tailfile.go b/pkg/tailfile/tailfile.go index afc84f00bb..af70b3e8a5 100644 --- a/pkg/tailfile/tailfile.go +++ b/pkg/tailfile/tailfile.go @@ -48,7 +48,7 @@ type SizeReaderAt interface { } // NewTailReader scopes the passed in reader to just the last N lines passed in -func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (io.Reader, int, error) { +func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (*io.SectionReader, int, error) { return NewTailReaderWithDelimiter(ctx, r, reqLines, eol) } @@ -56,7 +56,7 @@ func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (io.Reader // In this case a "line" is defined by the passed in delimiter. // // Delimiter lengths should be generally small, no more than 12 bytes -func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines int, delimiter []byte) (io.Reader, int, error) { +func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines int, delimiter []byte) (*io.SectionReader, int, error) { if reqLines < 1 { return nil, 0, ErrNonPositiveLinesNumber } @@ -71,7 +71,7 @@ func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines in ) if int64(len(delimiter)) >= size { - return bytes.NewReader(nil), 0, nil + return io.NewSectionReader(bytes.NewReader(nil), 0, 0), 0, nil } scanner := newScanner(r, delimiter) @@ -92,7 +92,7 @@ func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines in tailStart = scanner.Start(ctx) if found == 0 { - return bytes.NewReader(nil), 0, nil + return io.NewSectionReader(bytes.NewReader(nil), 0, 0), 0, nil } if found < reqLines && tailStart != 0 { From 1b46faf233242121d45b19e098e313ebc4ba679f Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 7 Jun 2024 21:55:46 +0000 Subject: [PATCH 3/6] Logfile: skip files that are corrupted When there is an error in parsing an individual log file just close the log and move on to the next one instead of erroring our the entire request. I investigated trying to error correct and scan ahead for corrupted log files but found this is too much of a risk of parsing things we shouldn't be and hence why this is just dropping the rest of the file. Signed-off-by: Brian Goff --- daemon/logger/loggerutils/follow.go | 12 +- daemon/logger/loggerutils/logfile.go | 51 ++++++- daemon/logger/loggerutils/logfile_test.go | 170 ++++++++++++++++------ 3 files changed, 175 insertions(+), 58 deletions(-) diff --git a/daemon/logger/loggerutils/follow.go b/daemon/logger/loggerutils/follow.go index 6131bcea7c..f221a8cc70 100644 --- a/daemon/logger/loggerutils/follow.go +++ b/daemon/logger/loggerutils/follow.go @@ -22,8 +22,8 @@ type follow struct { } // Do follows the log file as it is written, starting from f at read. -func (fl *follow) Do(f *os.File, read logPos) { - fl.log = log.G(context.TODO()).WithFields(log.Fields{ +func (fl *follow) Do(ctx context.Context, f *os.File, read logPos) { + fl.log = log.G(ctx).WithFields(log.Fields{ "module": "logger", "file": f.Name(), }) @@ -49,7 +49,7 @@ func (fl *follow) Do(f *os.File, read logPos) { fl.Watcher.Err <- err return } - if !fl.forward(f) { + if !fl.forward(ctx, f) { return } @@ -91,7 +91,7 @@ func (fl *follow) Do(f *os.File, read logPos) { read.size = 0 } - if !fl.forward(io.NewSectionReader(f, read.size, wrote.size-read.size)) { + if !fl.forward(ctx, io.NewSectionReader(f, read.size, wrote.size-read.size)) { return } read = wrote @@ -135,7 +135,7 @@ func (fl *follow) nextPos(current logPos) (next logPos, ok bool) { // forward decodes log messages from r and forwards them to the log watcher. // // The return value, cont, signals whether following should continue. -func (fl *follow) forward(r io.Reader) (cont bool) { +func (fl *follow) forward(ctx context.Context, r io.Reader) (cont bool) { fl.Decoder.Reset(r) - return fl.Forwarder.Do(fl.Watcher, fl.Decoder) + return fl.Forwarder.Do(ctx, fl.Watcher, fl.Decoder.Decode) } diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index babbc7279a..84cf3b87b8 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -491,7 +491,7 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa Watcher: watcher, Decoder: dec, Forwarder: fwd, - }).Do(currentFile, currentPos) + }).Do(context.TODO(), currentFile, currentPos) } type fileOpener interface { @@ -684,7 +684,9 @@ func getTailFiles(ctx context.Context, files []fileOpener, nLines int, getTailRe tail, n, err := getTailReader(ctx, ra, nLines) if err != nil { - return nil, errors.Wrap(err, "error finding file position to start log tailing") + ra.Close() + log.G(ctx).WithError(err).Warn("Error scanning log file for tail file request, skipping") + continue } nLines -= n out = append(out, &sizeReaderAtWithCloser{tail, ra.Close}) @@ -720,18 +722,39 @@ func tailFiles(files []fileOpener, watcher *logger.LogWatcher, dec Decoder, getT if !cont { for _, r := range readers[idx:] { if err := r.Close(); err != nil { - log.G(ctx).WithError(err).Warn("Error closing log reader") + log.G(ctx).WithError(err).Debug("Error closing log reader") } } } }() for _, ra := range readers { + select { + case <-watcher.WatchConsumerGone(): + return false + case <-ctx.Done(): + return false + default: + } + dec.Reset(ra) - ok := fwd.Do(watcher, dec) + ok := fwd.Do(ctx, watcher, func() (*logger.Message, error) { + msg, err := dec.Decode() + if err != nil && !errors.Is(err, io.EOF) { + // We have an error decoding the stream, but we don't want to error out + // the whole log reader. + // If we return anything other than EOF then the forwarder will return + // false and we'll exit the loop. + // Instead just log the error here and return an EOF so we can move to + // the next file. + log.G(ctx).WithError(err).Warn("Error decoding log file") + return nil, io.EOF + } + return msg, err + }) if err := ra.Close(); err != nil { - log.G(ctx).WithError(err).Warn("Error closing log reader") + log.G(ctx).WithError(err).Debug("Error closing log reader") } idx++ @@ -755,16 +778,25 @@ func newForwarder(config logger.ReadConfig) *forwarder { // conditions to watcher. Do returns cont=true iff it has read all messages from // dec without encountering a message with a timestamp which is after the // configured until time. -func (fwd *forwarder) Do(watcher *logger.LogWatcher, dec Decoder) (cont bool) { +func (fwd *forwarder) Do(ctx context.Context, watcher *logger.LogWatcher, next func() (*logger.Message, error)) (cont bool) { for { - msg, err := dec.Decode() + select { + case <-watcher.WatchConsumerGone(): + return false + case <-ctx.Done(): + return false + default: + } + + msg, err := next() if err != nil { if errors.Is(err, io.EOF) { return true } - watcher.Err <- err + log.G(ctx).WithError(err).Debug("Error while decoding log entry, not continuing") return false } + if !fwd.since.IsZero() { if msg.Timestamp.Before(fwd.since) { continue @@ -776,9 +808,12 @@ func (fwd *forwarder) Do(watcher *logger.LogWatcher, dec Decoder) (cont bool) { fwd.since = time.Time{} } if !fwd.until.IsZero() && msg.Timestamp.After(fwd.until) { + log.G(ctx).Debug("Log is newer than requested window, skipping remaining logs") return false } select { + case <-ctx.Done(): + return false case <-watcher.WatchConsumerGone(): return false case watcher.Msg <- msg: diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index 51f6253669..4ee5723868 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "encoding/json" "fmt" "io" "os" @@ -16,43 +17,66 @@ import ( "github.com/docker/docker/daemon/logger" "github.com/docker/docker/pkg/tailfile" "gotest.tools/v3/assert" + "gotest.tools/v3/assert/cmp" "gotest.tools/v3/poll" ) type testDecoder struct { - rdr io.Reader scanner *bufio.Scanner resetCount int } -func (d *testDecoder) Decode() (*logger.Message, error) { - if d.scanner == nil { - d.scanner = bufio.NewScanner(d.rdr) - } +func (d *testDecoder) Decode() (retMsg *logger.Message, retErr error) { if !d.scanner.Scan() { - return nil, d.scanner.Err() + err := d.scanner.Err() + if err == nil { + err = io.EOF + } + return nil, err } + // some comment return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil } func (d *testDecoder) Reset(rdr io.Reader) { - d.rdr = rdr d.scanner = bufio.NewScanner(rdr) d.resetCount++ } func (d *testDecoder) Close() { - d.rdr = nil d.scanner = nil } +// tetsJSONStreamDecoder is used as an easy way to test how [SyntaxError]s are +// handled in the log reader. +type testJSONStreamDecoder struct { + dec *json.Decoder +} + +func (d *testJSONStreamDecoder) Decode() (*logger.Message, error) { + var m logger.Message + if err := d.dec.Decode(&m); err != nil { + return nil, err + } + + return &m, nil +} + +func (d *testJSONStreamDecoder) Reset(rdr io.Reader) { + d.dec = json.NewDecoder(rdr) +} + +func (d *testJSONStreamDecoder) Close() { + d.dec = nil +} + func TestTailFiles(t *testing.T) { s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n") s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n") s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n") - makeLZO := func(ls ...SizeReaderAt) []fileOpener { + makeOpener := func(ls ...SizeReaderAt) []fileOpener { out := make([]fileOpener, 0, len(ls)) for _, rdr := range ls { out = append(out, &sizeReaderAtOpener{rdr}) @@ -60,7 +84,7 @@ func TestTailFiles(t *testing.T) { return out } - files := makeLZO(s1, s2, s3) + files := makeOpener(s1, s2, s3) watcher := logger.NewLogWatcher() defer watcher.ConsumerGone() @@ -69,18 +93,6 @@ func TestTailFiles(t *testing.T) { } dec := &testDecoder{} - for desc, config := range map[string]logger.ReadConfig{} { - t.Run(desc, func(t *testing.T) { - started := make(chan struct{}) - fwd := newForwarder(config) - go func() { - close(started) - tailFiles(files, watcher, dec, tailReader, config.Tail, fwd) - }() - <-started - }) - } - config := logger.ReadConfig{Tail: 2} fwd := newForwarder(config) started := make(chan struct{}) @@ -90,25 +102,94 @@ func TestTailFiles(t *testing.T) { }() <-started - select { - case <-time.After(60 * time.Second): - t.Fatal("timeout waiting for tail line") - case err := <-watcher.Err: - assert.NilError(t, err) - case msg := <-watcher.Msg: - assert.Assert(t, msg != nil) - assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line)) - } + waitForMsg(t, watcher, "Roads?", 10*time.Second) + waitForMsg(t, watcher, "Where we're going we don't need roads.", 10*time.Second) - select { - case <-time.After(60 * time.Second): - t.Fatal("timeout waiting for tail line") - case err := <-watcher.Err: + t.Run("handle corrupted data", func(t *testing.T) { + // Here we'll use the test json decoder to test injecting garbage data + // in the middle of otherwise valid json streams + // The log reader should be able to skip over that data. + + writeMsg := func(buf *bytes.Buffer, s string) { + t.Helper() + + msg := &logger.Message{Line: []byte(s)} + dt, err := json.Marshal(msg) + assert.NilError(t, err) + + _, err = buf.Write(dt) + assert.NilError(t, err) + _, err = buf.WriteString("\n") + assert.NilError(t, err) + } + + msg1 := "Hello" + msg2 := "World!" + msg3 := "And again!" + msg4 := "One more time!" + msg5 := "This is the end!" + + f1 := bytes.NewBuffer(nil) + writeMsg(f1, msg1) + + _, err := f1.WriteString("some randome garbage") + assert.NilError(t, err, "error writing garbage to log stream") + + writeMsg(f1, msg2) // This won't be seen due to garbage written above + + f2 := bytes.NewBuffer(nil) + writeMsg(f2, msg3) + + // Write what looks like the start of a new log message + _, err = f2.WriteString("{\"Line\": ") assert.NilError(t, err) - case msg := <-watcher.Msg: - assert.Assert(t, msg != nil) - assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line)) - } + + writeMsg(f2, msg4) // This won't be seen due to garbage written above + + f3 := bytes.NewBuffer(nil) + writeMsg(f3, msg5) + + // [bytes.Buffer] is not a SizeReaderAt, so we need to convert it here + files := makeOpener(bytes.NewReader(f1.Bytes()), bytes.NewReader(f2.Bytes()), bytes.NewReader(f3.Bytes())) + + // At this point we our log "files" should have 4 log messages in it + // intersperesed with some junk that is invalid json + + // We need a zero size watcher so that we can tell the decoder to give us + // a syntax error + watcher := logger.NewLogWatcher() + + config := logger.ReadConfig{Tail: 4} + fwd := newForwarder(config) + + started := make(chan struct{}) + done := make(chan struct{}) + go func() { + close(started) + tailFiles(files, watcher, &testJSONStreamDecoder{}, tailReader, config.Tail, fwd) + close(done) + }() + + waitOrTimeout := func(ch <-chan struct{}) { + t.Helper() + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + select { + case <-timer.C: + t.Fatal("timeout waiting for channel") + case <-ch: + } + } + + waitOrTimeout(started) + + // Note that due to how the json decoder works, we won't see anything in f1 + waitForMsg(t, watcher, msg3, 10*time.Second) + waitForMsg(t, watcher, msg5, 10*time.Second) + + waitOrTimeout(done) + }) } type dummyDecoder struct{} @@ -171,9 +252,9 @@ func TestCheckCapacityAndRotate(t *testing.T) { lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}) defer lw.ConsumerGone() - assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!")), ls) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!\n")), ls) // make sure the log reader is primed - waitForMsg(t, lw, 30*time.Second) + waitForMsg(t, lw, "", 30*time.Second) assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 1!")), ls) assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 2!")), ls) @@ -183,17 +264,18 @@ func TestCheckCapacityAndRotate(t *testing.T) { }) } -func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) { +func waitForMsg(t *testing.T, lw *logger.LogWatcher, expected string, timeout time.Duration) { t.Helper() timer := time.NewTimer(timeout) defer timer.Stop() select { - case _, ok := <-lw.Msg: - assert.Assert(t, ok, "log producer gone before log message arrived") case err := <-lw.Err: assert.NilError(t, err) + case msg, ok := <-lw.Msg: + assert.Assert(t, ok, "log producer gone before log message arrived") + assert.Check(t, cmp.Equal(string(msg.Line), expected)) case <-timer.C: t.Fatal("timeout waiting for log message") } From dbf6873f4510ada608b2a63441995d9d3c4be051 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 14 Jun 2024 01:11:34 +0000 Subject: [PATCH 4/6] Logfile: Add tracing spans This plumbs a context down the stack and handles cancellation as needed so that we can have correlated traces from the API. Signed-off-by: Brian Goff --- daemon/attach.go | 2 +- daemon/logger/adapter.go | 8 +- daemon/logger/adapter_test.go | 5 +- daemon/logger/journald/read.go | 4 +- daemon/logger/jsonfilelog/read.go | 6 +- daemon/logger/jsonfilelog/read_test.go | 3 +- daemon/logger/local/read.go | 4 +- daemon/logger/logger.go | 3 +- daemon/logger/loggertest/logreader.go | 39 +++--- .../logger/loggerutils/cache/local_cache.go | 6 +- daemon/logger/loggerutils/follow.go | 6 +- daemon/logger/loggerutils/logfile.go | 131 +++++++++++++++--- daemon/logger/loggerutils/logfile_test.go | 11 +- daemon/logger/ring.go | 10 +- daemon/logs.go | 11 +- 15 files changed, 184 insertions(+), 65 deletions(-) diff --git a/daemon/attach.go b/daemon/attach.go index f233cdca86..4313a7f21d 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -141,7 +141,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach if !ok { return logger.ErrReadLogsNotSupported{} } - logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1}) + logs := cLog.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1}) defer logs.ConsumerGone() LogLoop: diff --git a/daemon/logger/adapter.go b/daemon/logger/adapter.go index 95ed5a859e..bd5cf5f226 100644 --- a/daemon/logger/adapter.go +++ b/daemon/logger/adapter.go @@ -87,7 +87,7 @@ type pluginAdapterWithRead struct { *pluginAdapter } -func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { +func (a *pluginAdapterWithRead) ReadLogs(ctx context.Context, config ReadConfig) *LogWatcher { watcher := NewLogWatcher() go func() { @@ -101,6 +101,10 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { dec := logdriver.NewLogEntryDecoder(stream) for { + if ctx.Err() != nil { + return + } + var buf logdriver.LogEntry if err := dec.Decode(&buf); err != nil { if err == io.EOF { @@ -127,6 +131,8 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { // send the message unless the consumer is gone select { case watcher.Msg <- msg: + case <-ctx.Done(): + return case <-watcher.WatchConsumerGone(): return } diff --git a/daemon/logger/adapter_test.go b/daemon/logger/adapter_test.go index 0c8f98c018..5c89f350d8 100644 --- a/daemon/logger/adapter_test.go +++ b/daemon/logger/adapter_test.go @@ -1,6 +1,7 @@ package logger // import "github.com/docker/docker/daemon/logger" import ( + "context" "encoding/binary" "io" "sync" @@ -154,7 +155,7 @@ func TestAdapterReadLogs(t *testing.T) { lr, ok := l.(LogReader) assert.Check(t, ok, "Logger does not implement LogReader") - lw := lr.ReadLogs(ReadConfig{}) + lw := lr.ReadLogs(context.TODO(), ReadConfig{}) for _, x := range testMsg { select { @@ -173,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) { } lw.ConsumerGone() - lw = lr.ReadLogs(ReadConfig{Follow: true}) + lw = lr.ReadLogs(context.TODO(), ReadConfig{Follow: true}) for _, x := range testMsg { select { case msg := <-lw.Msg: diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index cc79a72a51..6da096a639 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -20,6 +20,8 @@ const ( waitInterval = 250 * time.Millisecond ) +var _ logger.LogReader = (*journald)(nil) + // Fields which we know are not user-provided attribute fields. var wellKnownFields = map[string]bool{ "MESSAGE": true, @@ -447,7 +449,7 @@ func (r *reader) signalReady() { } } -func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { +func (s *journald) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { r := &reader{ s: s, logWatcher: logger.NewLogWatcher(), diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index 3a51267fe2..6627074fe2 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -12,10 +12,12 @@ import ( "github.com/docker/docker/pkg/tailfile" ) +var _ logger.LogReader = (*JSONFileLogger)(nil) + // ReadLogs implements the logger's LogReader interface for the logs // created by this driver. -func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - return l.writer.ReadLogs(config) +func (l *JSONFileLogger) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { + return l.writer.ReadLogs(ctx, config) } func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index dd56be0ff3..122617fec0 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -3,6 +3,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo import ( "bufio" "bytes" + "context" "fmt" "io" "os" @@ -62,7 +63,7 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) { } }() - lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true}) + lw := jsonlogger.(*JSONFileLogger).ReadLogs(context.TODO(), logger.ReadConfig{Follow: true}) for { select { case _, ok := <-lw.Msg: diff --git a/daemon/logger/local/read.go b/daemon/logger/local/read.go index 4108b19f19..14771a52db 100644 --- a/daemon/logger/local/read.go +++ b/daemon/logger/local/read.go @@ -18,8 +18,8 @@ import ( // logger.defaultBufSize caps the size of Line field. const maxMsgLen int = 1e6 // 1MB. -func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - return d.logfile.ReadLogs(config) +func (d *driver) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { + return d.logfile.ReadLogs(ctx, config) } func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (loggerutils.SizeReaderAt, int, error) { diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index d3e9da1053..868ac8b168 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -8,6 +8,7 @@ package logger // import "github.com/docker/docker/daemon/logger" import ( + "context" "sync" "time" @@ -88,7 +89,7 @@ type ReadConfig struct { // LogReader is the interface for reading log messages for loggers that support reading. type LogReader interface { // ReadLogs reads logs from underlying logging backend. - ReadLogs(ReadConfig) *LogWatcher + ReadLogs(context.Context, ReadConfig) *LogWatcher } // LogWatcher is used when consuming logs read from the LogReader interface. diff --git a/daemon/logger/loggertest/logreader.go b/daemon/logger/loggertest/logreader.go index 0573a6add4..d1769221b7 100644 --- a/daemon/logger/loggertest/logreader.go +++ b/daemon/logger/loggertest/logreader.go @@ -1,6 +1,7 @@ package loggertest // import "github.com/docker/docker/daemon/logger/loggertest" import ( + "context" "fmt" "runtime" "strings" @@ -93,63 +94,63 @@ func (tr Reader) testTail(t *testing.T, live bool) { t.Run("Exact", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected, compareLog) }) t.Run("LessThanAvailable", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: 2}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: 2}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[len(mm)-2:], compareLog) }) t.Run("MoreThanAvailable", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: 100}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: 100}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected, compareLog) }) t.Run("All", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: -1}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected, compareLog) }) t.Run("Since", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog) }) t.Run("MoreThanSince", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog) }) t.Run("LessThanSince", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[2:], compareLog) }) t.Run("Until", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[:2], compareLog) }) t.Run("SinceAndUntil", func(t *testing.T) { t.Parallel() - lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)}) + lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), expected[1:2], compareLog) }) @@ -182,7 +183,7 @@ func (tr Reader) testTailEmptyLogs(t *testing.T, live bool) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{}) defer lw.ConsumerGone() assert.DeepEqual(t, readAll(t, lw), ([]*logger.Message)(nil), cmpopts.EquateEmpty()) }) @@ -204,7 +205,7 @@ func (tr Reader) TestFollow(t *testing.T) { ContainerID: fmt.Sprintf("followstart%d", i), ContainerName: fmt.Sprintf("logloglog%d", i), })(t) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: tail, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: tail, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -232,7 +233,7 @@ func (tr Reader) TestFollow(t *testing.T) { mm := makeTestMessages() expected := logMessages(t, l, mm[0:1]) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -257,7 +258,7 @@ func (tr Reader) TestFollow(t *testing.T) { mm := makeTestMessages() - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -282,7 +283,7 @@ func (tr Reader) TestFollow(t *testing.T) { mm := makeTestMessages() - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -307,7 +308,7 @@ func (tr Reader) TestFollow(t *testing.T) { mm := makeTestMessages() - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -334,7 +335,7 @@ func (tr Reader) TestFollow(t *testing.T) { logMessages(t, l, mm[0:2]) syncLogger(t, l) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 0, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: 0, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -361,7 +362,7 @@ func (tr Reader) TestFollow(t *testing.T) { expected := logMessages(t, l, mm[0:2])[1:] syncLogger(t, l) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 1, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: 1, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -390,7 +391,7 @@ func (tr Reader) TestFollow(t *testing.T) { assert.NilError(t, l.Close()) l = factory(t) - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true}) defer lw.ConsumerGone() doneReading := make(chan struct{}) @@ -430,7 +431,7 @@ func (tr Reader) TestConcurrent(t *testing.T) { } // Follow all logs - lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Follow: true, Tail: -1}) + lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Follow: true, Tail: -1}) defer lw.ConsumerGone() // Log concurrently from two sources and close log diff --git a/daemon/logger/loggerutils/cache/local_cache.go b/daemon/logger/loggerutils/cache/local_cache.go index d5adfd4ffa..3466c1e79f 100644 --- a/daemon/logger/loggerutils/cache/local_cache.go +++ b/daemon/logger/loggerutils/cache/local_cache.go @@ -24,6 +24,8 @@ var builtInCacheLogOpts = map[string]bool{ cacheDisabledKey: true, } +var _ logger.LogReader = (*loggerWithCache)(nil) + // WithLocalCache wraps the passed in logger with a logger caches all writes locally // in addition to writing to the passed in logger. func WithLocalCache(l logger.Logger, info logger.Info) (logger.Logger, error) { @@ -85,8 +87,8 @@ func (l *loggerWithCache) Name() string { return l.l.Name() } -func (l *loggerWithCache) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - return l.cache.(logger.LogReader).ReadLogs(config) +func (l *loggerWithCache) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { + return l.cache.(logger.LogReader).ReadLogs(ctx, config) } func (l *loggerWithCache) Close() error { diff --git a/daemon/logger/loggerutils/follow.go b/daemon/logger/loggerutils/follow.go index f221a8cc70..7a6b8e50f0 100644 --- a/daemon/logger/loggerutils/follow.go +++ b/daemon/logger/loggerutils/follow.go @@ -38,7 +38,7 @@ func (fl *follow) Do(ctx context.Context, f *os.File, read logPos) { }() for { - wrote, ok := fl.nextPos(read) + wrote, ok := fl.nextPos(ctx, read) if !ok { return } @@ -100,9 +100,11 @@ func (fl *follow) Do(ctx context.Context, f *os.File, read logPos) { // nextPos waits until the write position of the LogFile being followed has // advanced from current and returns the new position. -func (fl *follow) nextPos(current logPos) (next logPos, ok bool) { +func (fl *follow) nextPos(ctx context.Context, current logPos) (next logPos, ok bool) { var st logReadState select { + case <-ctx.Done(): + return current, false case <-fl.Watcher.WatchConsumerGone(): return current, false case st = <-fl.LogFile.read: diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 84cf3b87b8..4b7e97e940 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -14,10 +14,13 @@ import ( "sync" "time" + "github.com/containerd/containerd/tracing" "github.com/containerd/log" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/pkg/pools" "github.com/pkg/errors" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // rotateFileMetadata is a metadata of the gzip header of the compressed log file @@ -373,7 +376,12 @@ func (w *LogFile) Close() error { // ReadLogs decodes entries from log files. // // It is the caller's responsibility to call ConsumerGone on the LogWatcher. -func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { +func (w *LogFile) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher { + ctx, span := tracing.StartSpan(ctx, "logger.LogFile.ReadLogs") + defer span.End() + + span.SetAttributes(tracing.Attribute("config", config)) + watcher := logger.NewLogWatcher() // Lock out filesystem operations so that we can capture the read // position and atomically open the corresponding log file, without the @@ -385,43 +393,57 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { st := <-w.read pos := st.pos w.read <- st - go w.readLogsLocked(pos, config, watcher) + go w.readLogsLocked(ctx, pos, config, watcher) return watcher } // tailFiles must be called with w.fsopMu locked for reads. // w.fsopMu.RUnlock() is called before returning. -func (w *LogFile) tailFiles(config logger.ReadConfig, watcher *logger.LogWatcher, current SizeReaderAt, dec Decoder, fwd *forwarder) bool { +func (w *LogFile) tailFiles(ctx context.Context, config logger.ReadConfig, watcher *logger.LogWatcher, current SizeReaderAt, dec Decoder, fwd *forwarder) (cont bool) { if config.Tail == 0 { w.fsopMu.RUnlock() return true } - files, err := w.openRotatedFiles(config) + ctx, span := tracing.StartSpan(ctx, "logger.Logfile.TailLogs") + defer func() { + span.SetAttributes(attribute.Bool("continue", cont)) + span.End() + }() + + files, err := w.openRotatedFiles(ctx, config) w.fsopMu.RUnlock() if err != nil { + // TODO: Should we allow this to continue (as in set `cont=true`) and not error out the log stream? + err = errors.Wrap(err, "error opening rotated log files") + span.SetStatus(err) watcher.Err <- err return false } if current.Size() > 0 { - files = append(files, &sizeReaderAtOpener{current}) + files = append(files, &sizeReaderAtOpener{current, "current"}) } - return tailFiles(files, watcher, dec, w.getTailReader, config.Tail, fwd) + return tailFiles(ctx, files, watcher, dec, w.getTailReader, config.Tail, fwd) } type sizeReaderAtOpener struct { SizeReaderAt + ref string } -func (o *sizeReaderAtOpener) ReaderAt() (sizeReaderAtCloser, error) { +func (o *sizeReaderAtOpener) ReaderAt(context.Context) (sizeReaderAtCloser, error) { return &sizeReaderAtWithCloser{o, nil}, nil } func (o *sizeReaderAtOpener) Close() {} +func (o *sizeReaderAtOpener) Ref() string { + return o.ref +} + type sizeReaderAtWithCloser struct { SizeReaderAt close func() error @@ -459,12 +481,16 @@ func (r *sizeReaderAtWithCloser) Close() error { // // w.fsopMu must be locked for reading when calling this method. // w.fsopMu.RUnlock() is called before returning. -func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) { +func (w *LogFile) readLogsLocked(ctx context.Context, currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) { + ctx, span := tracing.StartSpan(ctx, "logger.Logfile.ReadLogsLocked") + defer span.End() + defer close(watcher.Msg) currentFile, err := open(w.f.Name()) if err != nil { w.fsopMu.RUnlock() + span.SetStatus(err) watcher.Err <- err return } @@ -476,7 +502,7 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa fwd := newForwarder(config) // At this point, w.tailFiles is responsible for unlocking w.fsopmu - ok := w.tailFiles(config, watcher, io.NewSectionReader(currentFile, 0, currentPos.size), dec, fwd) + ok := w.tailFiles(ctx, config, watcher, io.NewSectionReader(currentFile, 0, currentPos.size), dec, fwd) if !ok { return @@ -491,12 +517,13 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa Watcher: watcher, Decoder: dec, Forwarder: fwd, - }).Do(context.TODO(), currentFile, currentPos) + }).Do(ctx, currentFile, currentPos) } type fileOpener interface { - ReaderAt() (ra sizeReaderAtCloser, err error) + ReaderAt(context.Context) (ra sizeReaderAtCloser, err error) Close() + Ref() string } // simpleFileOpener just holds a reference to an already open file @@ -506,10 +533,11 @@ type simpleFileOpener struct { closed bool } -func (o *simpleFileOpener) ReaderAt() (sizeReaderAtCloser, error) { +func (o *simpleFileOpener) ReaderAt(context.Context) (sizeReaderAtCloser, error) { if o.closed { return nil, errors.New("file is closed") } + if o.sz == 0 { stat, err := o.f.Stat() if err != nil { @@ -520,6 +548,10 @@ func (o *simpleFileOpener) ReaderAt() (sizeReaderAtCloser, error) { return &sizeReaderAtWithCloser{io.NewSectionReader(o.f, 0, o.sz), nil}, nil } +func (o *simpleFileOpener) Ref() string { + return o.f.Name() +} + func (o *simpleFileOpener) Close() { _ = o.f.Close() o.closed = true @@ -552,7 +584,17 @@ type compressedFileOpener struct { ifBefore time.Time } -func (cfo *compressedFileOpener) ReaderAt() (_ sizeReaderAtCloser, retErr error) { +func (cfo *compressedFileOpener) ReaderAt(ctx context.Context) (_ sizeReaderAtCloser, retErr error) { + _, span := tracing.StartSpan(ctx, "logger.Logfile.Compressed.ReaderAt") + defer func() { + if retErr != nil { + span.SetStatus(retErr) + } + span.End() + }() + + span.SetAttributes(attribute.String("file", cfo.f.Name())) + if cfo.closed { return nil, errors.New("compressed file closed") } @@ -568,9 +610,14 @@ func (cfo *compressedFileOpener) ReaderAt() (_ sizeReaderAtCloser, retErr error) extra := &rotateFileMetadata{} err = json.Unmarshal(gzr.Header.Extra, extra) if err == nil && !extra.LastTime.IsZero() && extra.LastTime.Before(cfo.ifBefore) { + span.SetAttributes(attribute.Bool("skip", true)) return &sizeReaderAtWithCloser{}, nil } + if err == nil { + span.SetAttributes(attribute.Stringer("lastLogTime", extra.LastTime)) + } + span.AddEvent("Start decompress") return cfo.lf.decompress.Do(cfo.f) } @@ -579,19 +626,27 @@ func (cfo *compressedFileOpener) Close() { cfo.f.Close() } +func (cfo *compressedFileOpener) Ref() string { + return cfo.f.Name() +} + type emptyFileOpener struct{} -func (emptyFileOpener) ReaderAt() (sizeReaderAtCloser, error) { +func (emptyFileOpener) ReaderAt(context.Context) (sizeReaderAtCloser, error) { return &sizeReaderAtWithCloser{}, nil } func (emptyFileOpener) Close() {} +func (emptyFileOpener) Ref() string { + return "null" +} + // openRotatedFiles returns a slice of files open for reading, in order from // oldest to newest, and calls w.fsopMu.RUnlock() before returning. // // This method must only be called with w.fsopMu locked for reading. -func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (_ []fileOpener, retErr error) { +func (w *LogFile) openRotatedFiles(ctx context.Context, config logger.ReadConfig) (_ []fileOpener, retErr error) { var out []fileOpener defer func() { @@ -603,7 +658,7 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (_ []fileOpener, re }() for i := w.maxFiles; i > 1; i-- { - fo, err := w.openRotatedFile(i-1, config) + fo, err := w.openRotatedFile(ctx, i-1, config) if err != nil { return nil, err } @@ -613,7 +668,7 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (_ []fileOpener, re return out, nil } -func (w *LogFile) openRotatedFile(i int, config logger.ReadConfig) (fileOpener, error) { +func (w *LogFile) openRotatedFile(ctx context.Context, i int, config logger.ReadConfig) (fileOpener, error) { f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i)) if err == nil { return &simpleFileOpener{ @@ -650,6 +705,15 @@ type sizeReaderAtCloser interface { } func getTailFiles(ctx context.Context, files []fileOpener, nLines int, getTailReader GetTailReaderFunc) (_ []sizeReaderAtCloser, retErr error) { + ctx, span := tracing.StartSpan(ctx, "logger.Logfile.CollectTailFiles") + span.SetAttributes(attribute.Int("requested_lines", nLines)) + + defer func() { + if retErr != nil { + span.SetStatus(retErr) + } + span.End() + }() out := make([]sizeReaderAtCloser, 0, len(files)) defer func() { @@ -664,11 +728,14 @@ func getTailFiles(ctx context.Context, files []fileOpener, nLines int, getTailRe if nLines <= 0 { for _, fo := range files { - ra, err := fo.ReaderAt() + span.AddEvent("Open file", trace.WithAttributes(attribute.String("file", fo.Ref()))) + + ra, err := fo.ReaderAt(ctx) if err != nil { return nil, err } out = append(out, ra) + } return out, nil } @@ -677,11 +744,19 @@ func getTailFiles(ctx context.Context, files []fileOpener, nLines int, getTailRe if err := ctx.Err(); err != nil { return nil, errors.Wrap(err, "stopping parsing files to tail due to error") } - ra, err := files[i].ReaderAt() + + fo := files[i] + + fileAttr := attribute.String("file", fo.Ref()) + span.AddEvent("Open file", trace.WithAttributes(fileAttr)) + + ra, err := fo.ReaderAt(ctx) if err != nil { return nil, err } + span.AddEvent("Scan file to tail", trace.WithAttributes(fileAttr, attribute.Int("remaining_lines", nLines))) + tail, n, err := getTailReader(ctx, ra, nLines) if err != nil { ra.Close() @@ -697,11 +772,10 @@ func getTailFiles(ctx context.Context, files []fileOpener, nLines int, getTailRe return out, nil } -func tailFiles(files []fileOpener, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) { - ctx, cancel := context.WithCancel(context.Background()) +func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) { + ctx, cancel := context.WithCancel(ctx) defer cancel() - // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here. go func() { select { case <-ctx.Done(): @@ -779,11 +853,19 @@ func newForwarder(config logger.ReadConfig) *forwarder { // dec without encountering a message with a timestamp which is after the // configured until time. func (fwd *forwarder) Do(ctx context.Context, watcher *logger.LogWatcher, next func() (*logger.Message, error)) (cont bool) { + ctx, span := tracing.StartSpan(ctx, "logger.Logfile.Forward") + defer func() { + span.SetAttributes(attribute.Bool("continue", cont)) + span.End() + }() + for { select { case <-watcher.WatchConsumerGone(): + span.AddEvent("watch consumer gone") return false case <-ctx.Done(): + span.AddEvent(ctx.Err().Error()) return false default: } @@ -791,8 +873,10 @@ func (fwd *forwarder) Do(ctx context.Context, watcher *logger.LogWatcher, next f msg, err := next() if err != nil { if errors.Is(err, io.EOF) { + span.AddEvent("EOF") return true } + span.SetStatus(err) log.G(ctx).WithError(err).Debug("Error while decoding log entry, not continuing") return false } @@ -811,10 +895,13 @@ func (fwd *forwarder) Do(ctx context.Context, watcher *logger.LogWatcher, next f log.G(ctx).Debug("Log is newer than requested window, skipping remaining logs") return false } + select { case <-ctx.Done(): + span.AddEvent(ctx.Err().Error()) return false case <-watcher.WatchConsumerGone(): + span.AddEvent("watch consumer gone") return false case watcher.Msg <- msg: } diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index 4ee5723868..a48eeceac5 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -9,6 +9,7 @@ import ( "io" "os" "path/filepath" + "strconv" "strings" "testing" "text/tabwriter" @@ -78,8 +79,8 @@ func TestTailFiles(t *testing.T) { makeOpener := func(ls ...SizeReaderAt) []fileOpener { out := make([]fileOpener, 0, len(ls)) - for _, rdr := range ls { - out = append(out, &sizeReaderAtOpener{rdr}) + for i, rdr := range ls { + out = append(out, &sizeReaderAtOpener{rdr, strconv.Itoa(i)}) } return out } @@ -98,7 +99,7 @@ func TestTailFiles(t *testing.T) { started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config.Tail, fwd) + tailFiles(context.TODO(), files, watcher, dec, tailReader, config.Tail, fwd) }() <-started @@ -166,7 +167,7 @@ func TestTailFiles(t *testing.T) { done := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, &testJSONStreamDecoder{}, tailReader, config.Tail, fwd) + tailFiles(context.TODO(), files, watcher, &testJSONStreamDecoder{}, tailReader, config.Tail, fwd) close(done) }() @@ -249,7 +250,7 @@ func TestCheckCapacityAndRotate(t *testing.T) { t.Run("with log reader", func(t *testing.T) { // Make sure rotate works with an active reader - lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}) + lw := l.ReadLogs(context.TODO(), logger.ReadConfig{Follow: true, Tail: 1000}) defer lw.ConsumerGone() assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!\n")), ls) diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go index 0934537ec1..23751f387e 100644 --- a/daemon/logger/ring.go +++ b/daemon/logger/ring.go @@ -1,6 +1,7 @@ package logger // import "github.com/docker/docker/daemon/logger" import ( + "context" "errors" "sync" "sync/atomic" @@ -20,19 +21,22 @@ type RingLogger struct { wg sync.WaitGroup } -var _ SizedLogger = &RingLogger{} +var ( + _ SizedLogger = (*RingLogger)(nil) + _ LogReader = (*ringWithReader)(nil) +) type ringWithReader struct { *RingLogger } -func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher { +func (r *ringWithReader) ReadLogs(ctx context.Context, cfg ReadConfig) *LogWatcher { reader, ok := r.l.(LogReader) if !ok { // something is wrong if we get here panic("expected log reader") } - return reader.ReadLogs(cfg) + return reader.ReadLogs(ctx, cfg) } func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger { diff --git a/daemon/logs.go b/daemon/logs.go index c48a77c987..e96d6ce77a 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -5,6 +5,7 @@ import ( "strconv" "time" + "github.com/containerd/containerd/tracing" "github.com/containerd/log" "github.com/docker/docker/api/types/backend" containertypes "github.com/docker/docker/api/types/container" @@ -24,6 +25,14 @@ import ( // if it returns nil, the config channel will be active and return log // messages until it runs out or the context is canceled. func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *containertypes.LogsOptions) (messages <-chan *backend.LogMessage, isTTY bool, retErr error) { + ctx, span := tracing.StartSpan(ctx, "daemon.ContainerLogs") + defer func() { + if retErr != nil { + span.SetStatus(retErr) + } + span.End() + }() + lg := log.G(ctx).WithFields(log.Fields{ "module": "daemon", "method": "(*Daemon).ContainerLogs", @@ -96,7 +105,7 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c Follow: follow, } - logs := logReader.ReadLogs(readConfig) + logs := logReader.ReadLogs(ctx, readConfig) // past this point, we can't possibly return any errors, so we can just // start a goroutine and return to tell the caller not to expect errors From 9b6ba18fc9c68476ef0b18e7260a6ea4955d38e5 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 22 Jul 2024 18:53:48 +0000 Subject: [PATCH 5/6] logfile: Close reader when caller cancels This allows for an individual decode operation to be cancelled while the log reader is reading data from a log file by closing the underlying file. Signed-off-by: Brian Goff --- daemon/logger/loggerutils/logfile.go | 14 +++++++++----- daemon/logger/loggerutils/logfile_test.go | 6 +++--- daemon/logs.go | 4 +--- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 4b7e97e940..73e3742482 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -605,7 +605,7 @@ func (cfo *compressedFileOpener) ReaderAt(ctx context.Context) (_ sizeReaderAtCl } defer gzr.Close() - // Extract the last log entry timestramp from the gzip header + // Extract the last log entry timestamp from the gzip header // Use this to determine if we even need to read this file based on inputs extra := &rotateFileMetadata{} err = json.Unmarshal(gzr.Header.Extra, extra) @@ -803,6 +803,7 @@ func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatch }() for _, ra := range readers { + ra := ra select { case <-watcher.WatchConsumerGone(): return false @@ -813,6 +814,12 @@ func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatch dec.Reset(ra) + cancel := context.AfterFunc(ctx, func() { + if err := ra.Close(); err != nil { + log.G(ctx).WithError(err).Debug("Error closing log reader") + } + }) + ok := fwd.Do(ctx, watcher, func() (*logger.Message, error) { msg, err := dec.Decode() if err != nil && !errors.Is(err, io.EOF) { @@ -827,11 +834,8 @@ func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatch } return msg, err }) - if err := ra.Close(); err != nil { - log.G(ctx).WithError(err).Debug("Error closing log reader") - } + cancel() idx++ - if !ok { return false } diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index a48eeceac5..4c526e9be7 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -150,14 +150,14 @@ func TestTailFiles(t *testing.T) { f3 := bytes.NewBuffer(nil) writeMsg(f3, msg5) - // [bytes.Buffer] is not a SizeReaderAt, so we need to convert it here + // [bytes.Buffer] is not a SizeReaderAt, so we need to convert it here. files := makeOpener(bytes.NewReader(f1.Bytes()), bytes.NewReader(f2.Bytes()), bytes.NewReader(f3.Bytes())) // At this point we our log "files" should have 4 log messages in it - // intersperesed with some junk that is invalid json + // interspersed with some junk that is invalid json. // We need a zero size watcher so that we can tell the decoder to give us - // a syntax error + // a syntax error. watcher := logger.NewLogWatcher() config := logger.ReadConfig{Tail: 4} diff --git a/daemon/logs.go b/daemon/logs.go index e96d6ce77a..1c72eee410 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -27,9 +27,7 @@ import ( func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *containertypes.LogsOptions) (messages <-chan *backend.LogMessage, isTTY bool, retErr error) { ctx, span := tracing.StartSpan(ctx, "daemon.ContainerLogs") defer func() { - if retErr != nil { - span.SetStatus(retErr) - } + span.SetStatus(retErr) span.End() }() From 6d941222173a11ed25c3a5daa75c21e03fe28ea4 Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Fri, 5 Jul 2024 18:08:34 -0400 Subject: [PATCH 6/6] logger/journald: plumb contexts into reader Co-authored-by:: Cory Snider Signed-off-by: Brian Goff --- daemon/logger/journald/read.go | 50 ++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 6da096a639..1b9fda5995 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -192,11 +192,20 @@ func (r *reader) initialSeekTail() (bool, error) { // wait blocks until the journal has new data to read, the reader's drain // deadline is exceeded, or the log reading consumer is gone. -func (r *reader) wait() (bool, error) { +func (r *reader) wait(ctx context.Context) (bool, error) { + deadline := r.drainDeadline + if d, ok := ctx.Deadline(); ok && d.Before(deadline) { + deadline = d + } + for { + if err := ctx.Err(); err != nil { + return false, err + } + dur := waitInterval - if !r.drainDeadline.IsZero() { - dur = time.Until(r.drainDeadline) + if !deadline.IsZero() { + dur = time.Until(deadline) if dur < 0 { // Container is gone but we haven't found the end of the // logs before the deadline. Maybe it was dropped by @@ -215,6 +224,8 @@ func (r *reader) wait() (bool, error) { select { case <-r.logWatcher.WatchConsumerGone(): return false, nil + case <-ctx.Done(): + return false, ctx.Err() case <-r.s.closed: // Container is gone; don't wait indefinitely for journal entries that will never arrive. if r.maxOrdinal >= r.s.ordinal.Load() { @@ -230,12 +241,15 @@ func (r *reader) wait() (bool, error) { // nextWait blocks until there is a new journal entry to read, and advances the // journal read pointer to it. -func (r *reader) nextWait() (bool, error) { +func (r *reader) nextWait(ctx context.Context) (bool, error) { for { + if err := ctx.Err(); err != nil { + return false, err + } if ok, err := r.j.Next(); err != nil || ok { return ok, err } - if ok, err := r.wait(); err != nil || !ok { + if ok, err := r.wait(ctx); err != nil || !ok { return false, err } } @@ -249,7 +263,7 @@ func (r *reader) nextWait() (bool, error) { // - the watch consumer is gone, or // - (if until is nonzero) a log entry is read which has a timestamp after // until -func (r *reader) drainJournal() (bool, error) { +func (r *reader) drainJournal(ctx context.Context) (bool, error) { for i := 0; ; i++ { // Read the entry's timestamp. timestamp, err := r.j.Realtime() @@ -299,6 +313,8 @@ func (r *reader) drainJournal() (bool, error) { select { case <-r.logWatcher.WatchConsumerGone(): return false, nil + case <-ctx.Done(): + return false, ctx.Err() case r.logWatcher.Msg <- msg: } } @@ -308,21 +324,25 @@ func (r *reader) drainJournal() (bool, error) { if i != 0 && i%1024 == 0 { if _, err := r.j.Process(); err != nil { // log a warning but ignore it for now - log.G(context.TODO()).WithField("container", r.s.vars[fieldContainerIDFull]). + log.G(ctx).WithField("container", r.s.vars[fieldContainerIDFull]). WithField("error", err). Warn("journald: error processing journal") } } + if err := ctx.Err(); err != nil { + return false, err + } + if ok, err := r.j.Next(); err != nil || !ok { return true, err } } } -func (r *reader) readJournal() error { +func (r *reader) readJournal(ctx context.Context) error { caughtUp := r.s.ordinal.Load() - if more, err := r.drainJournal(); err != nil || !more { + if more, err := r.drainJournal(ctx); err != nil || !more { return err } @@ -345,10 +365,10 @@ func (r *reader) readJournal() error { default: } - if more, err := r.nextWait(); err != nil || !more { + if more, err := r.nextWait(ctx); err != nil || !more { return err } - if more, err := r.drainJournal(); err != nil || !more { + if more, err := r.drainJournal(ctx); err != nil || !more { return err } if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp { @@ -357,7 +377,7 @@ func (r *reader) readJournal() error { } } -func (r *reader) readLogs() { +func (r *reader) readLogs(ctx context.Context) { defer close(r.logWatcher.Msg) // Make sure the ready channel is closed in the event of an early @@ -427,7 +447,7 @@ func (r *reader) readLogs() { // which case the position will be unaffected by subsequent logging, or // the read pointer is in the conceptual position corresponding to the // first journal entry to send once it is logged in the future. - if more, err := r.nextWait(); err != nil || !more { + if more, err := r.nextWait(ctx); err != nil || !more { if err != nil { r.logWatcher.Err <- err } @@ -435,7 +455,7 @@ func (r *reader) readLogs() { } } - if err := r.readJournal(); err != nil { + if err := r.readJournal(ctx); err != nil { r.logWatcher.Err <- err return } @@ -456,7 +476,7 @@ func (s *journald) ReadLogs(ctx context.Context, config logger.ReadConfig) *logg config: config, ready: make(chan struct{}), } - go r.readLogs() + go r.readLogs(ctx) // Block until the reader is in position to read from the current config // location to prevent race conditions in tests. <-r.ready