Skip to content

Commit

Permalink
refactor: Rename pipestream to fifostream.
Browse files Browse the repository at this point in the history
This keeps confusing me, but we're reading unix named pipes aka fifos, not pipes.
  • Loading branch information
jaqx0r committed Jul 12, 2024
1 parent 2e17520 commit 47e973f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
2 changes: 1 addition & 1 deletion internal/mtail/read_pipe_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"golang.org/x/sys/unix"
)

func TestReadFromPipe(t *testing.T) {
func TestReadFromFifo(t *testing.T) {
testutil.SkipIfShort(t)
tmpDir := testutil.TestTempDir(t)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ import (
"github.com/google/mtail/internal/waker"
)

type pipeStream struct {
type fifoStream struct {
streamBase

cancel context.CancelFunc

pathname string // Given name for the underlying named pipe on the filesystem
}

// newPipeStream creates a new stream reader for Unix Pipes.
// newFifoStream creates a new stream reader for Unix Fifos.
// `pathname` must already be verified as clean.
func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) {
func newFifoStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) {
ctx, cancel := context.WithCancel(ctx)
ps := &pipeStream{
ps := &fifoStream{
cancel: cancel,
pathname: pathname,
streamBase: streamBase{
Expand All @@ -44,40 +44,40 @@ func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, p
return ps, nil
}

func pipeOpen(pathname string) (*os.File, error) {
func fifoOpen(pathname string) (*os.File, error) {
if IsStdinPattern(pathname) {
return os.Stdin, nil
}
// Open in nonblocking mode because the write end of the pipe may not have started yet; this also gives us the ability to set a read deadline when the context is cancelled. https://github.com/golang/go/issues/24842
// Open in nonblocking mode because the write end of the fifo may not have started yet; this also gives us the ability to set a read deadline when the context is cancelled. https://github.com/golang/go/issues/24842
fd, err := os.OpenFile(pathname, os.O_RDONLY|syscall.O_NONBLOCK, 0o600) // #nosec G304 -- path already validated by caller
if err != nil {
glog.Warningf("pipeOpen(%s): open failed: %v", pathname, err)
glog.Warningf("fifoOpen(%s): open failed: %v", pathname, err)
logErrors.Add(pathname, 1)
return nil, err
}
glog.V(2).Infof("pipeOpen(%s): opened new pipe %v", pathname, fd)
glog.V(2).Infof("fifoOpen(%s): opened new fifo %v", pathname, fd)
return fd, nil
}

// The read buffer size for pipes.
// The read buffer size for fifos.
//
// Before Linux 2.6.11, the capacity of a pipe was the same as the
// Before Linux 2.6.11, the capacity of a fifo was the same as the
// system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11,
// the pipe capacity is 16 pages (i.e., 65,536 bytes in a system
// the fifo capacity is 16 pages (i.e., 65,536 bytes in a system
// with a page size of 4096 bytes). Since Linux 2.6.35, the default
// pipe capacity is 16 pages, but the capacity can be queried and
// fifo capacity is 16 pages, but the capacity can be queried and
// set using the fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations.
// See fcntl(2) for more information.
//
// https://man7.org/linux/man-pages/man7/pipe.7.html
const defaultPipeReadBufferSize = 131072
const defaultFifoReadBufferSize = 131072

func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, _ os.FileInfo) error {
fd, err := pipeOpen(ps.pathname)
func (ps *fifoStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, _ os.FileInfo) error {
fd, err := fifoOpen(ps.pathname)
if err != nil {
return err
}
b := make([]byte, defaultPipeReadBufferSize)
b := make([]byte, defaultFifoReadBufferSize)
partial := bytes.NewBufferString("")
var total int
wg.Add(1)
Expand Down Expand Up @@ -123,8 +123,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
} else if n == 0 && total > 0 {
// `pipe(7)` tells us "If all file descriptors referring to the
// write end of a pipe have been closed, then an attempt to
// read(2) from the pipe will see end-of-file (read(2) will
// write end of a fifo have been closed, then an attempt to
// read(2) from the fifo will see end-of-file (read(2) will
// return 0)." To avoid shutting down the stream at startup
// before any writer has connected to the fifo, condition on
// having read any bytes previously.
Expand Down
4 changes: 2 additions & 2 deletions internal/tailer/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st
logErrors.Add(path, 1)
return nil, err
}
return newPipeStream(ctx, wg, waker, path, fi)
return newFifoStream(ctx, wg, waker, path, fi)
}
fi, err := os.Stat(path)
if err != nil {
Expand All @@ -103,7 +103,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st
case m.IsRegular():
return newFileStream(ctx, wg, waker, path, fi, oneShot)
case m&os.ModeType == os.ModeNamedPipe:
return newPipeStream(ctx, wg, waker, path, fi)
return newFifoStream(ctx, wg, waker, path, fi)
// TODO(jaq): in order to listen on an existing socket filepath, we must unlink and recreate it
// case m&os.ModeType == os.ModeSocket:
// return newSocketStream(ctx, wg, waker, pathname)
Expand Down

0 comments on commit 47e973f

Please sign in to comment.