fix/sg: fix mangled log output from sg start and sg run (#63405)

Right now `sg run` / `sg start` can horribly mangle multi-line output. A
nicely annotated report from @unknwon:


![image](https://github.com/sourcegraph/sourcegraph/assets/23356519/38acbaf9-89dc-4d4b-9fd7-b601f5654240)

Replacing the "buffered process logger" thing with
https://github.com/bobheadxi/streamline which powers `sourcegraph/run`
etc (fairly reliably if I do say so myself) fixes this for a few cases
where I can reliably repro wonky misordered output 😁

## Test plan

`sg start dotcom` with `sg.config.overwrite.yaml`:

```yaml
commands:
  enterprise-portal:
    env:
      SRC_LOG_LEVEL: debug
      PG_QUERY_LOGGING: true
```

Log scope `pgx.devtracer` is consistently formatted  , even with high
volume of logs


![image](https://github.com/sourcegraph/sourcegraph/assets/23356519/5c46f94f-e388-477a-94d3-151d5a3c7468)

Also don't see anything suspicious happening after running for a while
This commit is contained in:
Robert Lin 2024-06-20 16:07:27 -07:00 committed by GitHub
parent c437c21ba6
commit 78dcd57221
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 38 additions and 130 deletions

View File

@ -34,6 +34,7 @@ go_library(
"@com_github_nxadm_tail//:tail",
"@com_github_rjeczalik_notify//:notify",
"@com_github_sourcegraph_conc//pool",
"@dev_bobheadxi_go_streamline//pipe",
],
)

View File

@ -14,6 +14,7 @@ import (
"github.com/grafana/regexp"
"github.com/sourcegraph/conc/pool"
"go.bobheadxi.dev/streamline/pipe"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/secrets"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/std"
@ -405,14 +406,22 @@ func (sc *startedCmd) getOutputWriter(ctx context.Context, opts *outputOptions,
close(opts.start)
}
writers = append(writers, newBufferedCmdLogger(ctx, sc.opts.name, std.Out.Output, opts.start))
writers = append(writers, newOutputPipe(ctx, sc.opts.name, std.Out.Output, opts.start))
}
if sgConn != nil {
sink := func(data string) {
sgConn.Write([]byte(fmt.Sprintf("%s: %s\n", sc.opts.name, data)))
}
writers = append(writers, process.NewLogger(ctx, sink))
w, stream := pipe.NewStream()
go func() {
err := stream.Stream(func(line string) {
_, _ = sgConn.Write([]byte(fmt.Sprintf("%s: %s\n", sc.opts.name, line)))
})
_ = w.CloseWithError(err)
}()
go func() {
<-ctx.Done()
_ = w.CloseWithError(ctx.Err())
}()
writers = append(writers, w)
}
return io.MultiWriter(writers...)

View File

@ -3,10 +3,13 @@ package run
import (
"context"
"hash/fnv"
"io"
"strconv"
"sync"
"go.bobheadxi.dev/streamline/pipe"
"github.com/sourcegraph/sourcegraph/lib/output"
"github.com/sourcegraph/sourcegraph/lib/process"
)
func nameToColor(s string) output.Style {
@ -31,20 +34,29 @@ var (
lineFormat = "%s%s[%+" + strconv.Itoa(maxNameLength) + "s]%s %s"
)
// newBufferedCmdLogger returns a new process.Logger with a unique color based on the name of the cmd
// newOutputPipe returns a new output with a unique color based on the name of the cmd
// that blocks until the given start signal and writes logs to the given output.Output.
func newBufferedCmdLogger(ctx context.Context, name string, out *output.Output, start <-chan struct{}) *process.Logger {
func newOutputPipe(ctx context.Context, name string, out *output.Output, start <-chan struct{}) io.Writer {
name = compactName(name)
color := nameToColor(name)
sink := func(data string) {
go func() {
<-start
out.Writef(lineFormat, output.StyleBold, color, name, output.StyleReset, data)
}()
}
w, stream := pipe.NewStream()
go func() {
var mux sync.Mutex
<-start
err := stream.Stream(func(line string) {
mux.Lock()
out.Writef(lineFormat, output.StyleBold, color, name, output.StyleReset, line)
mux.Unlock()
})
_ = w.CloseWithError(err)
}()
go func() {
<-ctx.Done()
_ = w.CloseWithError(ctx.Err())
}()
return process.NewLogger(ctx, sink)
return w
}
func compactName(name string) string {

View File

@ -3,10 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "process",
srcs = [
"logger.go",
"pipe.go",
],
srcs = ["pipe.go"],
importpath = "github.com/sourcegraph/sourcegraph/lib/process",
tags = [TAG_PLATFORM_SOURCE],
visibility = ["//visibility:public"],

View File

@ -1,111 +0,0 @@
package process
import (
"bytes"
"context"
"time"
)
// tickDuration is the time to wait before writing the buffer contents
// without having received a newline.
var tickDuration = 2 * time.Millisecond
// Logger is a simplified version of goreman's logger:
// https://github.com/mattn/goreman/blob/master/log.go
type Logger struct {
sink func(string)
// buf is used to keep partial lines buffered before flushing them (either
// on the next newline or after tickDuration)
buf *bytes.Buffer
writes chan []byte
done chan struct{}
}
// NewLogger returns a new Logger instance and spawns a goroutine in the
// background that regularily flushed the logged output to the given sink.
//
// If the passed in ctx is canceled the goroutine will exit.
func NewLogger(ctx context.Context, sink func(string)) *Logger {
l := &Logger{
sink: sink,
writes: make(chan []byte),
done: make(chan struct{}),
buf: &bytes.Buffer{},
}
go l.writeLines(ctx)
return l
}
func (l *Logger) bufLine(line []byte) error {
_, err := l.buf.Write(line)
return err
}
func (l *Logger) flush() {
if l.buf.Len() == 0 {
return
}
l.sink(l.buf.String())
l.buf.Reset()
}
// Write handler of logger.
func (l *Logger) Write(p []byte) (int, error) {
l.writes <- p
<-l.done
return len(p), nil
}
func (l *Logger) writeLines(ctx context.Context) {
tick := time.NewTicker(tickDuration)
for {
select {
case <-ctx.Done():
l.flush()
return
case w, ok := <-l.writes:
if !ok {
l.flush()
return
}
buf := bytes.NewBuffer(w)
for {
line, err := buf.ReadBytes('\n')
if len(line) > 0 {
if line[len(line)-1] == '\n' {
// TODO: We currently add a newline in flush(), see comment there
line = line[0 : len(line)-1]
// But since there *was* a newline, we need to flush,
// but only if there is more than a newline or there
// was already content.
if len(line) != 0 || l.buf.Len() > 0 {
if err := l.bufLine(line); err != nil {
break
}
l.flush()
}
tick.Stop()
} else {
if err := l.bufLine(line); err != nil {
break
}
tick.Reset(tickDuration)
}
}
if err != nil {
break
}
}
l.done <- struct{}{}
case <-tick.C:
l.flush()
tick.Stop()
}
}
}