diff --git a/dev/sg/internal/run/logger.go b/dev/sg/internal/run/logger.go index e647eeee307..8e1c6f2b820 100644 --- a/dev/sg/internal/run/logger.go +++ b/dev/sg/internal/run/logger.go @@ -5,7 +5,6 @@ import ( "hash/fnv" "io" "strconv" - "sync" "go.bobheadxi.dev/streamline/pipe" @@ -42,12 +41,9 @@ func newOutputPipe(ctx context.Context, name string, out *output.Output, start < w, stream := pipe.NewStream() go func() { - var mux sync.Mutex <-start err := stream.Stream(func(line string) { - mux.Lock() out.Writef(lineFormat, output.StyleBold, color, name, output.StyleReset, line) - mux.Unlock() }) _ = w.CloseWithError(err) }() diff --git a/dev/sg/sg_start_test.go b/dev/sg/sg_start_test.go index 856bb3ac9ce..527d2b1241c 100644 --- a/dev/sg/sg_start_test.go +++ b/dev/sg/sg_start_test.go @@ -5,6 +5,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/google/go-cmp/cmp" @@ -121,8 +122,14 @@ func useOutputBuffer(t *testing.T) *outputtest.Buffer { func expectOutput(t *testing.T, buf *outputtest.Buffer, want []string) { t.Helper() + // TODO: DINF-104, find out why we need this sleep. Basically, when running in tests, + // for some reason, even though cmds.start() returned, it hasn't finished writing + // the outputs in rare cases (6 out out 100 runs on my machine). + time.Sleep(300 * time.Millisecond) have := buf.Lines() + // TODO: See DINF-104, without this, we can see the cmd output printed out after + // the exit message. For now, and to prevent flakes, we'll keep the sort. sort.Strings(want) sort.Strings(have) if !cmp.Equal(want, have) { diff --git a/lib/output/outputtest/buffer.go b/lib/output/outputtest/buffer.go index e305cf1f872..8f04d118035 100644 --- a/lib/output/outputtest/buffer.go +++ b/lib/output/outputtest/buffer.go @@ -2,6 +2,7 @@ package outputtest import ( "strconv" + "sync" ) // Buffer is used to test code that uses the `output` library to produce @@ -14,6 +15,7 @@ import ( // NOTE: Buffer is *not* complete and probably can't parse everything that // output produces. It should be extended as needed. type Buffer struct { + sync.Mutex lines [][]byte line int @@ -21,8 +23,14 @@ type Buffer struct { } func (t *Buffer) Write(b []byte) (int, error) { + t.Lock() + defer t.Unlock() + cur := 0 + // Debug helper: + // fmt.Printf("b: %q\n", string(b)) + for cur < len(b) { switch b[cur] { case '\n': @@ -120,6 +128,9 @@ func (t *Buffer) writeToCurrentLine(b byte) { } func (t *Buffer) Lines() []string { + t.Lock() + defer t.Unlock() + var lines []string for _, l := range t.lines { lines = append(lines, string(l))