From 9b6ba18fc9c68476ef0b18e7260a6ea4955d38e5 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 22 Jul 2024 18:53:48 +0000 Subject: [PATCH] logfile: Close reader when caller cancels This allows for an individual decode operation to be cancelled while the log reader is reading data from a log file by closing the underlying file. Signed-off-by: Brian Goff --- daemon/logger/loggerutils/logfile.go | 14 +++++++++----- daemon/logger/loggerutils/logfile_test.go | 6 +++--- daemon/logs.go | 4 +--- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 4b7e97e940..73e3742482 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -605,7 +605,7 @@ func (cfo *compressedFileOpener) ReaderAt(ctx context.Context) (_ sizeReaderAtCl } defer gzr.Close() - // Extract the last log entry timestramp from the gzip header + // Extract the last log entry timestamp from the gzip header // Use this to determine if we even need to read this file based on inputs extra := &rotateFileMetadata{} err = json.Unmarshal(gzr.Header.Extra, extra) @@ -803,6 +803,7 @@ func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatch }() for _, ra := range readers { + ra := ra select { case <-watcher.WatchConsumerGone(): return false @@ -813,6 +814,12 @@ func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatch dec.Reset(ra) + cancel := context.AfterFunc(ctx, func() { + if err := ra.Close(); err != nil { + log.G(ctx).WithError(err).Debug("Error closing log reader") + } + }) + ok := fwd.Do(ctx, watcher, func() (*logger.Message, error) { msg, err := dec.Decode() if err != nil && !errors.Is(err, io.EOF) { @@ -827,11 +834,8 @@ func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatch } return msg, err }) - if err := ra.Close(); err != nil { - log.G(ctx).WithError(err).Debug("Error closing log reader") - } + cancel() idx++ - if !ok { return false } diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index a48eeceac5..4c526e9be7 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -150,14 +150,14 @@ func TestTailFiles(t *testing.T) { f3 := bytes.NewBuffer(nil) writeMsg(f3, msg5) - // [bytes.Buffer] is not a SizeReaderAt, so we need to convert it here + // [bytes.Buffer] is not a SizeReaderAt, so we need to convert it here. files := makeOpener(bytes.NewReader(f1.Bytes()), bytes.NewReader(f2.Bytes()), bytes.NewReader(f3.Bytes())) // At this point we our log "files" should have 4 log messages in it - // intersperesed with some junk that is invalid json + // interspersed with some junk that is invalid json. // We need a zero size watcher so that we can tell the decoder to give us - // a syntax error + // a syntax error. watcher := logger.NewLogWatcher() config := logger.ReadConfig{Tail: 4} diff --git a/daemon/logs.go b/daemon/logs.go index e96d6ce77a..1c72eee410 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -27,9 +27,7 @@ import ( func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *containertypes.LogsOptions) (messages <-chan *backend.LogMessage, isTTY bool, retErr error) { ctx, span := tracing.StartSpan(ctx, "daemon.ContainerLogs") defer func() { - if retErr != nil { - span.SetStatus(retErr) - } + span.SetStatus(retErr) span.End() }()