Restore buffered process.PipeOutput and add tests (#31478)

This restores the buffered PipeOutput function that I accidentally
removed in #31081 because I didn't notice it is used in src-cli.

This adds it back and also adds a tests for both methods so we're sure
that they actually do what they say they do.
This commit is contained in:
Thorsten Ball 2022-02-18 15:37:04 +01:00 committed by GitHub
parent 7edfd1b031
commit b74907c472
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 227 additions and 18 deletions

View File

@ -1,24 +1,70 @@
package process
import (
"bufio"
"context"
"fmt"
"io"
"io/fs"
"os/exec"
"golang.org/x/sync/errgroup"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// initialBufSize is the initial size of the buffer that PipeOutput uses to
// read lines.
const initialBufSize = 4 * 1024 // 4k
// maxTokenSize is the max size of a token that PipeOutput reads.
const maxTokenSize = 100 * 1024 * 1024 // 100mb
type pipe func(w io.Writer, r io.Reader) error
type cmdPiper interface {
StdoutPipe() (io.ReadCloser, error)
StderrPipe() (io.ReadCloser, error)
}
// PipeOutput reads stdout/stderr output of the given command into the two
// io.Writers.
//
// It returns a sync.WaitGroup. The caller *must* call the Wait() method of the
// WaitGroup after waiting for the *exec.Cmd to finish.
// It returns a errgroup.Group. The caller *must* call the Wait() method of the
// errgroup.Group after waiting for the *exec.Cmd to finish.
//
// See this issue for more details: https://github.com/golang/go/issues/21922
func PipeOutputUnbuffered(ctx context.Context, c *exec.Cmd, stdoutWriter, stderrWriter io.Writer) (*errgroup.Group, error) {
func PipeOutput(ctx context.Context, c cmdPiper, stdoutWriter, stderrWriter io.Writer) (*errgroup.Group, error) {
pipe := func(w io.Writer, r io.Reader) error {
scanner := bufio.NewScanner(r)
buf := make([]byte, initialBufSize)
scanner.Buffer(buf, maxTokenSize)
for scanner.Scan() {
fmt.Fprintln(w, scanner.Text())
}
return scanner.Err()
}
return pipeProcessOutput(ctx, c, stdoutWriter, stderrWriter, pipe)
}
// PipeOutputUnbuffered is the unbuffered version of PipeOutput and uses
// io.Copy instead of piping output line-based to the output.
func PipeOutputUnbuffered(ctx context.Context, c cmdPiper, stdoutWriter, stderrWriter io.Writer) (*errgroup.Group, error) {
pipe := func(w io.Writer, r io.Reader) error {
_, err := io.Copy(w, r)
// We can ignore ErrClosed because we get that if a process crashes
if err != nil && !errors.Is(err, fs.ErrClosed) {
return err
}
return nil
}
return pipeProcessOutput(ctx, c, stdoutWriter, stderrWriter, pipe)
}
func pipeProcessOutput(ctx context.Context, c cmdPiper, stdoutWriter, stderrWriter io.Writer, fn pipe) (*errgroup.Group, error) {
stdoutPipe, err := c.StdoutPipe()
if err != nil {
return nil, err
@ -41,20 +87,8 @@ func PipeOutputUnbuffered(ctx context.Context, c *exec.Cmd, stdoutWriter, stderr
eg := &errgroup.Group{}
readIntoBuf := func(w io.Writer, r io.Reader) error {
_, err := io.Copy(w, r)
// We can ignore ErrClosed because we get that if a process crashes
if err != nil && !errors.Is(err, fs.ErrClosed) {
return err
}
return nil
}
eg.Go(func() error { return fn(stdoutWriter, stdoutPipe) })
eg.Go(func() error { return fn(stderrWriter, stderrPipe) })
eg.Go(func() error {
return readIntoBuf(stdoutWriter, stdoutPipe)
})
eg.Go(func() error {
return readIntoBuf(stderrWriter, stderrPipe)
})
return eg, nil
}

175
lib/process/pipe_test.go Normal file
View File

@ -0,0 +1,175 @@
package process
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"
)
func TestPipeOutput(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := newDummyCmd()
out := newMockBuf()
eg, err := PipeOutput(ctx, d, out, out)
if err != nil {
t.Fatalf("PipeOutput returned err: %s", err)
}
// Write byte to stdout
write(t, d.stdout, "a")
// No newline, so nothing should be written
expectNoWrite(t, out)
wantBytesWritten(t, out, 0)
// Write newline
write(t, d.stdout, "\n")
waitForWrite(t, out)
wantBytesWritten(t, out, 2)
// Write byte to stderr
write(t, d.stderr, "b")
// No newline, so same buffer length
expectNoWrite(t, out)
wantBytesWritten(t, out, 2)
// Write more bytes and newline
write(t, d.stderr, "\n")
waitForWrite(t, out)
wantBytesWritten(t, out, 4)
// Write bytes to stdout without newline
write(t, d.stdout, "c")
expectNoWrite(t, out)
wantBytesWritten(t, out, 4)
// Now write and flush stderr
write(t, d.stderr, "d\n")
waitForWrite(t, out)
// stdout should still *not* be written
wantBytesWritten(t, out, 6)
// For that we need to write newline to stdout again
write(t, d.stdout, "\n")
waitForWrite(t, out)
wantBytesWritten(t, out, 8)
d.stdout.Close()
d.stderr.Close()
if err := eg.Wait(); err != nil {
t.Fatalf("errgroup has err: %s", err)
}
}
func TestPipeOutputUnbuffered(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := newDummyCmd()
out := newMockBuf()
eg, err := PipeOutputUnbuffered(ctx, d, out, out)
if err != nil {
t.Fatalf("PipeOutput returned err: %s", err)
}
// Write byte to stdout
write(t, d.stdout, "a")
// It's unbuffered, so we want it to be written immediately
waitForWrite(t, out)
wantBytesWritten(t, out, 1)
// Write byte to stderr
write(t, d.stderr, "b")
// Both should be written immediately
waitForWrite(t, out)
wantBytesWritten(t, out, 2)
write(t, d.stdout, "cdefg")
waitForWrite(t, out)
write(t, d.stderr, "hijkl")
waitForWrite(t, out)
wantBytesWritten(t, out, 12)
d.stdout.Close()
d.stderr.Close()
if err := eg.Wait(); err != nil {
t.Fatalf("errgroup has err: %s", err)
}
}
type dummyCmd struct {
stdout, stderr io.WriteCloser
stdoutRead, stderrRead io.ReadCloser
}
func newDummyCmd() *dummyCmd {
stdoutRead, stdout := io.Pipe()
stderrRead, stderr := io.Pipe()
return &dummyCmd{
stdout: stdout,
stderr: stderr,
stdoutRead: stdoutRead,
stderrRead: stderrRead,
}
}
func (d dummyCmd) StdoutPipe() (io.ReadCloser, error) { return d.stdoutRead, nil }
func (d dummyCmd) StderrPipe() (io.ReadCloser, error) { return d.stderrRead, nil }
type mockBuf struct {
// We don't embed bytes.Buffer directly otherwise io.Copy will cast mockBuf
// to io.WriterTo which buffers.
buf *bytes.Buffer
writes chan int
}
func newMockBuf() *mockBuf {
return &mockBuf{buf: new(bytes.Buffer), writes: make(chan int)}
}
func (b *mockBuf) Len() int { return b.buf.Len() }
func (b *mockBuf) Write(d []byte) (n int, err error) {
n, err = b.buf.Write(d)
go func() { b.writes <- n }()
return n, err
}
func write(t *testing.T, w io.Writer, s string) {
t.Helper()
if _, err := fmt.Fprint(w, s); err != nil {
t.Fatalf("writing byte failed")
}
}
func wantBytesWritten(t *testing.T, out *mockBuf, want int) {
t.Helper()
if have := out.Len(); have != want {
t.Fatalf("wrong number of bytes written. want=%d, have=%d", want, have)
}
}
func expectNoWrite(t *testing.T, out *mockBuf) {
t.Helper()
select {
case n := <-out.writes:
t.Fatal("% bytes unexpectedly written", n)
default:
}
}
func waitForWrite(t *testing.T, out *mockBuf) {
t.Helper()
select {
case <-out.writes:
return
case <-time.After(5 * time.Second):
t.Fatalf("timeout reached. no write received")
}
}