diff --git a/internal/mtail/read_pipe_integration_unix_test.go b/internal/mtail/read_pipe_integration_unix_test.go index 5034dce08..2f34fc89b 100644 --- a/internal/mtail/read_pipe_integration_unix_test.go +++ b/internal/mtail/read_pipe_integration_unix_test.go @@ -17,7 +17,7 @@ import ( "golang.org/x/sys/unix" ) -func TestReadFromPipe(t *testing.T) { +func TestReadFromFifo(t *testing.T) { testutil.SkipIfShort(t) tmpDir := testutil.TestTempDir(t) diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/fifostream.go similarity index 80% rename from internal/tailer/logstream/pipestream.go rename to internal/tailer/logstream/fifostream.go index 599119e3c..ec3e8d9f5 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/fifostream.go @@ -18,7 +18,7 @@ import ( "github.com/google/mtail/internal/waker" ) -type pipeStream struct { +type fifoStream struct { streamBase cancel context.CancelFunc @@ -26,11 +26,11 @@ type pipeStream struct { pathname string // Given name for the underlying named pipe on the filesystem } -// newPipeStream creates a new stream reader for Unix Pipes. +// newFifoStream creates a new stream reader for Unix Fifos. // `pathname` must already be verified as clean. -func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) { +func newFifoStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) { ctx, cancel := context.WithCancel(ctx) - ps := &pipeStream{ + ps := &fifoStream{ cancel: cancel, pathname: pathname, streamBase: streamBase{ @@ -44,40 +44,40 @@ func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, p return ps, nil } -func pipeOpen(pathname string) (*os.File, error) { +func fifoOpen(pathname string) (*os.File, error) { if IsStdinPattern(pathname) { return os.Stdin, nil } - // Open in nonblocking mode because the write end of the pipe may not have started yet; this also gives us the ability to set a read deadline when the context is cancelled. https://github.com/golang/go/issues/24842 + // Open in nonblocking mode because the write end of the fifo may not have started yet; this also gives us the ability to set a read deadline when the context is cancelled. https://github.com/golang/go/issues/24842 fd, err := os.OpenFile(pathname, os.O_RDONLY|syscall.O_NONBLOCK, 0o600) // #nosec G304 -- path already validated by caller if err != nil { - glog.Warningf("pipeOpen(%s): open failed: %v", pathname, err) + glog.Warningf("fifoOpen(%s): open failed: %v", pathname, err) logErrors.Add(pathname, 1) return nil, err } - glog.V(2).Infof("pipeOpen(%s): opened new pipe %v", pathname, fd) + glog.V(2).Infof("fifoOpen(%s): opened new fifo %v", pathname, fd) return fd, nil } -// The read buffer size for pipes. +// The read buffer size for fifos. // -// Before Linux 2.6.11, the capacity of a pipe was the same as the +// Before Linux 2.6.11, the capacity of a fifo was the same as the // system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11, -// the pipe capacity is 16 pages (i.e., 65,536 bytes in a system +// the fifo capacity is 16 pages (i.e., 65,536 bytes in a system // with a page size of 4096 bytes). Since Linux 2.6.35, the default -// pipe capacity is 16 pages, but the capacity can be queried and +// fifo capacity is 16 pages, but the capacity can be queried and // set using the fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations. // See fcntl(2) for more information. // // https://man7.org/linux/man-pages/man7/pipe.7.html -const defaultPipeReadBufferSize = 131072 +const defaultFifoReadBufferSize = 131072 -func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, _ os.FileInfo) error { - fd, err := pipeOpen(ps.pathname) +func (ps *fifoStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, _ os.FileInfo) error { + fd, err := fifoOpen(ps.pathname) if err != nil { return err } - b := make([]byte, defaultPipeReadBufferSize) + b := make([]byte, defaultFifoReadBufferSize) partial := bytes.NewBufferString("") var total int wg.Add(1) @@ -123,8 +123,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } } else if n == 0 && total > 0 { // `pipe(7)` tells us "If all file descriptors referring to the - // write end of a pipe have been closed, then an attempt to - // read(2) from the pipe will see end-of-file (read(2) will + // write end of a fifo have been closed, then an attempt to + // read(2) from the fifo will see end-of-file (read(2) will // return 0)." To avoid shutting down the stream at startup // before any writer has connected to the fifo, condition on // having read any bytes previously. diff --git a/internal/tailer/logstream/pipestream_unix_test.go b/internal/tailer/logstream/fifostream_unix_test.go similarity index 100% rename from internal/tailer/logstream/pipestream_unix_test.go rename to internal/tailer/logstream/fifostream_unix_test.go diff --git a/internal/tailer/logstream/logstream.go b/internal/tailer/logstream/logstream.go index 520278e86..7dbd5314a 100644 --- a/internal/tailer/logstream/logstream.go +++ b/internal/tailer/logstream/logstream.go @@ -92,7 +92,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st logErrors.Add(path, 1) return nil, err } - return newPipeStream(ctx, wg, waker, path, fi) + return newFifoStream(ctx, wg, waker, path, fi) } fi, err := os.Stat(path) if err != nil { @@ -103,7 +103,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st case m.IsRegular(): return newFileStream(ctx, wg, waker, path, fi, oneShot) case m&os.ModeType == os.ModeNamedPipe: - return newPipeStream(ctx, wg, waker, path, fi) + return newFifoStream(ctx, wg, waker, path, fi) // TODO(jaq): in order to listen on an existing socket filepath, we must unlink and recreate it // case m&os.ModeType == os.ModeSocket: // return newSocketStream(ctx, wg, waker, pathname)