mirror of
https://github.com/moby/moby.git
synced 2026-01-11 18:51:37 +00:00
logfile: Close reader when caller cancels
This allows for an individual decode operation to be cancelled while the log reader is reading data from a log file by closing the underlying file. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
@@ -605,7 +605,7 @@ func (cfo *compressedFileOpener) ReaderAt(ctx context.Context) (_ sizeReaderAtCl
|
||||
}
|
||||
defer gzr.Close()
|
||||
|
||||
// Extract the last log entry timestramp from the gzip header
|
||||
// Extract the last log entry timestamp from the gzip header
|
||||
// Use this to determine if we even need to read this file based on inputs
|
||||
extra := &rotateFileMetadata{}
|
||||
err = json.Unmarshal(gzr.Header.Extra, extra)
|
||||
@@ -803,6 +803,7 @@ func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatch
|
||||
}()
|
||||
|
||||
for _, ra := range readers {
|
||||
ra := ra
|
||||
select {
|
||||
case <-watcher.WatchConsumerGone():
|
||||
return false
|
||||
@@ -813,6 +814,12 @@ func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatch
|
||||
|
||||
dec.Reset(ra)
|
||||
|
||||
cancel := context.AfterFunc(ctx, func() {
|
||||
if err := ra.Close(); err != nil {
|
||||
log.G(ctx).WithError(err).Debug("Error closing log reader")
|
||||
}
|
||||
})
|
||||
|
||||
ok := fwd.Do(ctx, watcher, func() (*logger.Message, error) {
|
||||
msg, err := dec.Decode()
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
@@ -827,11 +834,8 @@ func tailFiles(ctx context.Context, files []fileOpener, watcher *logger.LogWatch
|
||||
}
|
||||
return msg, err
|
||||
})
|
||||
if err := ra.Close(); err != nil {
|
||||
log.G(ctx).WithError(err).Debug("Error closing log reader")
|
||||
}
|
||||
cancel()
|
||||
idx++
|
||||
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -150,14 +150,14 @@ func TestTailFiles(t *testing.T) {
|
||||
f3 := bytes.NewBuffer(nil)
|
||||
writeMsg(f3, msg5)
|
||||
|
||||
// [bytes.Buffer] is not a SizeReaderAt, so we need to convert it here
|
||||
// [bytes.Buffer] is not a SizeReaderAt, so we need to convert it here.
|
||||
files := makeOpener(bytes.NewReader(f1.Bytes()), bytes.NewReader(f2.Bytes()), bytes.NewReader(f3.Bytes()))
|
||||
|
||||
// At this point we our log "files" should have 4 log messages in it
|
||||
// intersperesed with some junk that is invalid json
|
||||
// interspersed with some junk that is invalid json.
|
||||
|
||||
// We need a zero size watcher so that we can tell the decoder to give us
|
||||
// a syntax error
|
||||
// a syntax error.
|
||||
watcher := logger.NewLogWatcher()
|
||||
|
||||
config := logger.ReadConfig{Tail: 4}
|
||||
|
||||
@@ -27,9 +27,7 @@ import (
|
||||
func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *containertypes.LogsOptions) (messages <-chan *backend.LogMessage, isTTY bool, retErr error) {
|
||||
ctx, span := tracing.StartSpan(ctx, "daemon.ContainerLogs")
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
span.SetStatus(retErr)
|
||||
}
|
||||
span.SetStatus(retErr)
|
||||
span.End()
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user