From 6bdf41e098d3e32211374a36a8df6dd7f1964a4a Mon Sep 17 00:00:00 2001 From: Erik Seliger Date: Fri, 9 Feb 2024 12:53:25 +0100 Subject: [PATCH] Revert recent sg changes that cause high load and instability in dev (#60362) * Revert "Made all file watches in `sg start` recursive (#60285)" This reverts commit 4b8a4030ef6765411f10620de6a9b3214c97d662. * Revert "SG start bazel enhancements (#59718)" This reverts commit 00185f9ceda8674c6b265c662c28619b16e81838. --- .bazel_fix_commands.json | 18 +- deps.bzl | 4 +- dev/bazel_configure_accept_changes.sh | 19 - dev/sg/internal/run/BUILD.bazel | 8 +- dev/sg/internal/run/bazel_build.go | 64 ++ dev/sg/internal/run/bazel_command.go | 209 ++++--- dev/sg/internal/run/command.go | 461 +++----------- dev/sg/internal/run/ibazel.go | 255 +------- dev/sg/internal/run/installer.go | 338 ---------- dev/sg/internal/run/logger.go | 10 +- dev/sg/internal/run/prefix_suffix_saver.go | 12 +- dev/sg/internal/run/run.go | 686 ++++++++++++++++----- dev/sg/internal/run/run_bazel.go | 73 +++ dev/sg/internal/run/sgconfig_command.go | 91 --- dev/sg/sg_run.go | 32 +- dev/sg/sg_start.go | 130 ++-- dev/sg/sg_start_test.go | 1 - dev/sg/sg_tests.go | 29 +- go.mod | 2 - go.sum | 5 +- sg.config.yaml | 13 +- 21 files changed, 1024 insertions(+), 1436 deletions(-) delete mode 100755 dev/bazel_configure_accept_changes.sh create mode 100644 dev/sg/internal/run/bazel_build.go delete mode 100644 dev/sg/internal/run/installer.go create mode 100644 dev/sg/internal/run/run_bazel.go delete mode 100644 dev/sg/internal/run/sgconfig_command.go diff --git a/.bazel_fix_commands.json b/.bazel_fix_commands.json index b128df67c1f..fe51488c706 100644 --- a/.bazel_fix_commands.json +++ b/.bazel_fix_commands.json @@ -1,17 +1 @@ -[ - { - "regex": "^Check that imports in Go sources match importpath attributes in deps.$", - "command": "./dev/bazel_configure_accept_changes.sh", - "args": [] - }, - { - "regex": "missing input file", - "command": "./dev/bazel_configure_accept_changes.sh", - "args": [] - }, - { - "regex": ": undefined:", - "command": "./dev/bazel_configure_accept_changes.sh", - "args": [] - } -] +[] diff --git a/deps.bzl b/deps.bzl index 8ba704a3a19..2da6c96a6d9 100644 --- a/deps.bzl +++ b/deps.bzl @@ -4201,8 +4201,8 @@ def go_dependencies(): name = "com_github_nxadm_tail", build_file_proto_mode = "disable_global", importpath = "github.com/nxadm/tail", - sum = "h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=", - version = "v1.4.11", + sum = "h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=", + version = "v1.4.8", ) go_repository( name = "com_github_nytimes_gziphandler", diff --git a/dev/bazel_configure_accept_changes.sh b/dev/bazel_configure_accept_changes.sh deleted file mode 100755 index b78f2d29032..00000000000 --- a/dev/bazel_configure_accept_changes.sh +++ /dev/null @@ -1,19 +0,0 @@ -#! /bin/bash - -# Run bazel configure and if the error code is 110, exit with error code 0 -# This is because 110 means that configuration files were successfully -# Can be used by processes which want to run configuration as an auto-fix -# and expect a 0 exit code -bazel configure -exit_code=$? - -if [ $exit_code -eq 0 ]; then - echo "No configuration changes made" - exit 0 -elif [ $exit_code -eq 110 ]; then - echo "Bazel configuration completed" - exit 0 -else - echo "Unknown error" - exit $exit_code -fi diff --git a/dev/sg/internal/run/BUILD.bazel b/dev/sg/internal/run/BUILD.bazel index b726eca1a97..08aa42ff93d 100644 --- a/dev/sg/internal/run/BUILD.bazel +++ b/dev/sg/internal/run/BUILD.bazel @@ -1,19 +1,19 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") load("//dev:go_defs.bzl", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "run", srcs = [ + "bazel_build.go", "bazel_command.go", "command.go", "helpers.go", "ibazel.go", - "installer.go", "logger.go", "pid.go", "prefix_suffix_saver.go", "run.go", - "sgconfig_command.go", + "run_bazel.go", ], importpath = "github.com/sourcegraph/sourcegraph/dev/sg/internal/run", visibility = ["//dev/sg:__subpackages__"], @@ -28,9 +28,9 @@ go_library( "//lib/output", "//lib/process", "@com_github_grafana_regexp//:regexp", - "@com_github_nxadm_tail//:tail", "@com_github_rjeczalik_notify//:notify", "@com_github_sourcegraph_conc//pool", + "@org_golang_x_sync//semaphore", ], ) diff --git a/dev/sg/internal/run/bazel_build.go b/dev/sg/internal/run/bazel_build.go new file mode 100644 index 00000000000..a258d8f59a0 --- /dev/null +++ b/dev/sg/internal/run/bazel_build.go @@ -0,0 +1,64 @@ +package run + +import ( + "context" + "fmt" + "io" + "os/exec" + + "github.com/sourcegraph/sourcegraph/dev/sg/internal/std" + "github.com/sourcegraph/sourcegraph/dev/sg/root" + "github.com/sourcegraph/sourcegraph/lib/output" + "github.com/sourcegraph/sourcegraph/lib/process" +) + +// BazelBuild peforms a bazel build command with all the given targets and blocks until an +// error is returned or the build is completed. +func BazelBuild(ctx context.Context, cmds ...BazelCommand) error { + if len(cmds) == 0 { + // no Bazel commands so we return + return nil + } + std.Out.WriteLine(output.Styled(output.StylePending, fmt.Sprintf("Detected %d bazel targets, running bazel build before anything else", len(cmds)))) + + repoRoot, err := root.RepositoryRoot() + if err != nil { + return err + } + + targets := make([]string, 0, len(cmds)) + for _, cmd := range cmds { + targets = append(targets, cmd.Target) + } + + var cancel func() + ctx, cancel = context.WithCancel(ctx) + + args := append([]string{"build"}, targets...) + cmd := exec.CommandContext(ctx, "bazel", args...) + + sc := &startedCmd{ + stdoutBuf: &prefixSuffixSaver{N: 32 << 10}, + stderrBuf: &prefixSuffixSaver{N: 32 << 10}, + } + + sc.cancel = cancel + sc.Cmd = cmd + sc.Cmd.Dir = repoRoot + + var stdoutWriter, stderrWriter io.Writer + logger := newCmdLogger(ctx, "bazel", std.Out.Output) + stdoutWriter = io.MultiWriter(logger, sc.stdoutBuf) + stderrWriter = io.MultiWriter(logger, sc.stderrBuf) + eg, err := process.PipeOutputUnbuffered(ctx, sc.Cmd, stdoutWriter, stderrWriter) + if err != nil { + return err + } + sc.outEg = eg + + // Bazel out directory should exist here before returning + if err := sc.Start(); err != nil { + return err + } + return sc.Wait() +} diff --git a/dev/sg/internal/run/bazel_command.go b/dev/sg/internal/run/bazel_command.go index def1737266a..c3d07b744f9 100644 --- a/dev/sg/internal/run/bazel_command.go +++ b/dev/sg/internal/run/bazel_command.go @@ -3,123 +3,150 @@ package run import ( "context" "fmt" + "io" "os/exec" - "strings" "github.com/rjeczalik/notify" - "github.com/sourcegraph/sourcegraph/dev/sg/internal/secrets" + "github.com/sourcegraph/sourcegraph/dev/sg/internal/std" + "github.com/sourcegraph/sourcegraph/lib/errors" + "github.com/sourcegraph/sourcegraph/lib/output" + "github.com/sourcegraph/sourcegraph/lib/process" ) // A BazelCommand is a command definition for sg run/start that uses // bazel under the hood. It will handle restarting itself autonomously, // as long as iBazel is running and watch that specific target. type BazelCommand struct { - Name string - Description string `yaml:"description"` - Target string `yaml:"target"` - Args string `yaml:"args"` - PreCmd string `yaml:"precmd"` - Env map[string]string `yaml:"env"` - IgnoreStdout bool `yaml:"ignoreStdout"` - IgnoreStderr bool `yaml:"ignoreStderr"` - ContinueWatchOnExit bool `yaml:"continueWatchOnExit"` - // Preamble is a short and visible message, displayed when the command is launched. - Preamble string `yaml:"preamble"` + Name string + Description string `yaml:"description"` + Target string `yaml:"target"` + Args string `yaml:"args"` + PreCmd string `yaml:"precmd"` + Env map[string]string `yaml:"env"` + IgnoreStdout bool `yaml:"ignoreStdout"` + IgnoreStderr bool `yaml:"ignoreStderr"` ExternalSecrets map[string]secrets.ExternalSecret `yaml:"external_secrets"` - - // RunTarget specifies a target that should be run via `bazel run $RunTarget` instead of directly executing the binary. - RunTarget string `yaml:"runTarget"` } -func (bc BazelCommand) GetName() string { - return bc.Name +func (bc *BazelCommand) BinLocation() (string, error) { + return binLocation(bc.Target) } -func (bc BazelCommand) GetContinueWatchOnExit() bool { - return bc.ContinueWatchOnExit -} - -func (bc BazelCommand) GetEnv() map[string]string { - return bc.Env -} - -func (bc BazelCommand) GetIgnoreStdout() bool { - return bc.IgnoreStdout -} - -func (bc BazelCommand) GetIgnoreStderr() bool { - return bc.IgnoreStderr -} - -func (bc BazelCommand) GetPreamble() string { - return bc.Preamble -} - -func (bc BazelCommand) GetBinaryLocation() (string, error) { - baseOutput, err := outputPath() - if err != nil { - return "", err - } - // Trim "bazel-out" because the next bazel query will include it. - outputPath := strings.TrimSuffix(strings.TrimSpace(string(baseOutput)), "bazel-out") - - // Get the binary from the specific target. - cmd := exec.Command("bazel", "cquery", bc.Target, "--output=files") - baseOutput, err = cmd.Output() - if err != nil { - return "", err - } - binPath := strings.TrimSpace(string(baseOutput)) - - return fmt.Sprintf("%s%s", outputPath, binPath), nil -} - -func (bc BazelCommand) GetExternalSecrets() map[string]secrets.ExternalSecret { - return bc.ExternalSecrets -} - -func (bc BazelCommand) watchPaths() ([]string, error) { - // If no target is defined, there is nothing to be built and watched - if bc.Target == "" { - return nil, nil - } +func (bc *BazelCommand) watch(ctx context.Context) (<-chan struct{}, error) { // Grab the location of the binary in bazel-out. - binLocation, err := bc.GetBinaryLocation() + binLocation, err := bc.BinLocation() if err != nil { return nil, err } - return []string{binLocation}, nil -} - -func (bc BazelCommand) StartWatch(ctx context.Context) (<-chan struct{}, error) { - if watchPaths, err := bc.watchPaths(); err != nil { + // Set up the watcher. + restart := make(chan struct{}) + events := make(chan notify.EventInfo, 1) + if err := notify.Watch(binLocation, events, notify.All); err != nil { return nil, err - } else { - // skip remove events as we don't care about files being removed, we only - // want to know when the binary has been rebuilt - return WatchPaths(ctx, watchPaths, notify.Remove) } + + // Start watching for a freshly compiled version of the binary. + go func() { + defer close(events) + defer notify.Stop(events) + + for { + select { + case <-ctx.Done(): + return + case e := <-events: + if e.Event() != notify.Remove { + restart <- struct{}{} + } + } + + } + }() + + return restart, nil } -func (bc BazelCommand) GetExecCmd(ctx context.Context) (*exec.Cmd, error) { - var cmd string - var err error - if bc.RunTarget != "" { - cmd = "bazel run " + bc.RunTarget - } else { - if cmd, err = bc.GetBinaryLocation(); err != nil { - return nil, err +func (bc *BazelCommand) Start(ctx context.Context, dir string, parentEnv map[string]string) error { + std.Out.WriteLine(output.Styledf(output.StylePending, "Running %s...", bc.Name)) + + // Run the binary for the first time. + cancel, err := bc.start(ctx, dir, parentEnv) + if err != nil { + return errors.Wrapf(err, "failed to start Bazel command %q", bc.Name) + } + + // Restart when the binary change. + wantRestart, err := bc.watch(ctx) + if err != nil { + return err + } + + // Wait forever until we're asked to stop or that restarting returns an error. + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-wantRestart: + std.Out.WriteLine(output.Styledf(output.StylePending, "Restarting %s...", bc.Name)) + cancel() + cancel, err = bc.start(ctx, dir, parentEnv) + if err != nil { + return err + } } } - - return exec.CommandContext(ctx, "bash", "-c", fmt.Sprintf("%s\n%s", bc.PreCmd, cmd)), nil } -func outputPath() ([]byte, error) { - // Get the output directory from Bazel, which varies depending on which OS - // we're running against. - cmd := exec.Command("bazel", "info", "output_path") - return cmd.Output() +func (bc *BazelCommand) start(ctx context.Context, dir string, parentEnv map[string]string) (func(), error) { + binLocation, err := bc.BinLocation() + if err != nil { + return nil, err + } + + sc := &startedCmd{ + stdoutBuf: &prefixSuffixSaver{N: 32 << 10}, + stderrBuf: &prefixSuffixSaver{N: 32 << 10}, + } + + commandCtx, cancel := context.WithCancel(ctx) + sc.cancel = cancel + sc.Cmd = exec.CommandContext(commandCtx, "bash", "-c", fmt.Sprintf("%s\n%s", bc.PreCmd, binLocation)) + sc.Cmd.Dir = dir + + secretsEnv, err := getSecrets(ctx, bc.Name, bc.ExternalSecrets) + if err != nil { + std.Out.WriteLine(output.Styledf(output.StyleWarning, "[%s] %s %s", + bc.Name, output.EmojiFailure, err.Error())) + } + + sc.Cmd.Env = makeEnv(parentEnv, secretsEnv, bc.Env) + + var stdoutWriter, stderrWriter io.Writer + logger := newCmdLogger(commandCtx, bc.Name, std.Out.Output) + if bc.IgnoreStdout { + std.Out.WriteLine(output.Styledf(output.StyleSuggestion, "Ignoring stdout of %s", bc.Name)) + stdoutWriter = sc.stdoutBuf + } else { + stdoutWriter = io.MultiWriter(logger, sc.stdoutBuf) + } + if bc.IgnoreStderr { + std.Out.WriteLine(output.Styledf(output.StyleSuggestion, "Ignoring stderr of %s", bc.Name)) + stderrWriter = sc.stderrBuf + } else { + stderrWriter = io.MultiWriter(logger, sc.stderrBuf) + } + + eg, err := process.PipeOutputUnbuffered(ctx, sc.Cmd, stdoutWriter, stderrWriter) + if err != nil { + return nil, err + } + sc.outEg = eg + + if err := sc.Start(); err != nil { + return nil, err + } + + return cancel, nil } diff --git a/dev/sg/internal/run/command.go b/dev/sg/internal/run/command.go index c3fd4d8469f..6926631aab9 100644 --- a/dev/sg/internal/run/command.go +++ b/dev/sg/internal/run/command.go @@ -1,22 +1,16 @@ package run import ( - "bytes" "context" "fmt" "io" "net" "os/exec" - "path/filepath" - "syscall" - "github.com/grafana/regexp" "github.com/sourcegraph/conc/pool" "github.com/sourcegraph/sourcegraph/dev/sg/internal/secrets" "github.com/sourcegraph/sourcegraph/dev/sg/internal/std" - "github.com/sourcegraph/sourcegraph/dev/sg/interrupt" - "github.com/sourcegraph/sourcegraph/dev/sg/root" "github.com/sourcegraph/sourcegraph/lib/errors" "github.com/sourcegraph/sourcegraph/lib/output" "github.com/sourcegraph/sourcegraph/lib/process" @@ -44,118 +38,6 @@ type Command struct { // field in `Merge` (below). } -func (cmd Command) GetName() string { - return cmd.Name -} - -func (cmd Command) GetContinueWatchOnExit() bool { - return cmd.ContinueWatchOnExit -} - -func (cmd Command) GetBinaryLocation() (string, error) { - if cmd.CheckBinary != "" { - repoRoot, err := root.RepositoryRoot() - if err != nil { - return "", err - } - return filepath.Join(repoRoot, cmd.CheckBinary), nil - } - return "", noBinaryError{name: cmd.Name} -} - -func (cmd Command) GetExternalSecrets() map[string]secrets.ExternalSecret { - return cmd.ExternalSecrets -} - -func (cmd Command) GetIgnoreStdout() bool { - return cmd.IgnoreStdout -} - -func (cmd Command) GetIgnoreStderr() bool { - return cmd.IgnoreStderr -} - -func (cmd Command) GetPreamble() string { - return cmd.Preamble -} - -func (cmd Command) GetEnv() map[string]string { - return cmd.Env -} - -func (cmd Command) GetExecCmd(ctx context.Context) (*exec.Cmd, error) { - return exec.CommandContext(ctx, "bash", "-c", cmd.Cmd), nil -} - -func (cmd Command) RunInstall(ctx context.Context, parentEnv map[string]string) error { - if cmd.requiresInstall() { - if cmd.hasBashInstaller() { - return cmd.bashInstall(ctx, parentEnv) - } else { - return cmd.functionInstall(ctx, parentEnv) - } - } - - return nil -} - -// Standard commands ignore installer -func (cmd Command) SetInstallerOutput(chan<- output.FancyLine) {} - -func (cmd Command) Count() int { - return 1 -} - -func (cmd Command) requiresInstall() bool { - return cmd.Install != "" || cmd.InstallFunc != "" -} - -func (cmd Command) hasBashInstaller() bool { - return cmd.Install != "" || cmd.InstallFunc == "" -} - -func (cmd Command) bashInstall(ctx context.Context, parentEnv map[string]string) error { - output, err := BashInRoot(ctx, cmd.Install, makeEnv(parentEnv, cmd.Env)) - if err != nil { - return installErr{cmdName: cmd.Name, output: output, originalErr: err} - } - return nil -} - -func (cmd Command) functionInstall(ctx context.Context, parentEnv map[string]string) error { - fn, ok := installFuncs[cmd.InstallFunc] - if !ok { - return installErr{cmdName: cmd.Name, originalErr: errors.Newf("no install func with name %q found", cmd.InstallFunc)} - } - if err := fn(ctx, makeEnvMap(parentEnv, cmd.Env)); err != nil { - return installErr{cmdName: cmd.Name, originalErr: err} - } - - return nil -} - -func (cmd Command) getWatchPaths() ([]string, error) { - root, err := root.RepositoryRoot() - if err != nil { - return nil, err - } - - fullPaths := make([]string, len(cmd.Watch)) - for i, path := range cmd.Watch { - fullPaths[i] = filepath.Join(root, path) - } - - return fullPaths, nil -} - -func (cmd Command) StartWatch(ctx context.Context) (<-chan struct{}, error) { - if watchPaths, err := cmd.getWatchPaths(); err != nil { - return nil, err - } else { - return WatchPaths(ctx, watchPaths) - } -} - func (c Command) Merge(other Command) Command { merged := c @@ -223,12 +105,38 @@ func equal(a, b []string) bool { return true } -var sgConn net.Conn +type startedCmd struct { + *exec.Cmd -func OpenUnixSocket() error { - var err error - sgConn, err = net.Dial("unix", "/tmp/sg.sock") - return err + cancel func() + + stdoutBuf *prefixSuffixSaver + stderrBuf *prefixSuffixSaver + + outEg *pool.ErrorPool +} + +func (sc *startedCmd) Wait() error { + if err := sc.outEg.Wait(); err != nil { + return err + } + return sc.Cmd.Wait() +} + +func (sc *startedCmd) CapturedStdout() string { + if sc.stdoutBuf == nil { + return "" + } + + return string(sc.stdoutBuf.Bytes()) +} + +func (sc *startedCmd) CapturedStderr() string { + if sc.stderrBuf == nil { + return "" + } + + return string(sc.stderrBuf.Bytes()) } func getSecrets(ctx context.Context, name string, extSecrets map[string]secrets.ExternalSecret) (map[string]string, error) { @@ -254,255 +162,84 @@ func getSecrets(ctx context.Context, name string, extSecrets map[string]secrets. return secretsEnv, errs } -type startedCmd struct { - *exec.Cmd - opts commandOptions - cancel func() - - outEg *pool.ErrorPool - result chan error -} - -type commandOptions struct { - name string - exec *exec.Cmd - dir string - env []string - stdout outputOptions - stderr outputOptions -} - -type outputOptions struct { - // When true, output will be ignored and not written to any writers - ignore bool - - // when enabled, output will not be streamed to the writers until - // after the process is begun, only captured for later retrieval - buffer bool - - // Buffer that captures the output for error logging - captured io.ReadWriter - - // Additional writers to write output to - additionalWriters []io.Writer - - // Channel that is used to signal that output should start streaming - // when buffer is true - start chan struct{} -} - -func startSgCmd(ctx context.Context, cmd SGConfigCommand, dir string, parentEnv map[string]string) (*startedCmd, error) { - exec, err := cmd.GetExecCmd(ctx) - if err != nil { - return nil, err - } - - secretsEnv, err := getSecrets(ctx, cmd.GetName(), cmd.GetExternalSecrets()) - if err != nil { - std.Out.WriteLine(output.Styledf(output.StyleWarning, "[%s] %s %s", - cmd.GetName(), output.EmojiFailure, err.Error())) - } - - opts := commandOptions{ - name: cmd.GetName(), - exec: exec, - env: makeEnv(parentEnv, secretsEnv, cmd.GetEnv()), - dir: dir, - stdout: outputOptions{ignore: cmd.GetIgnoreStdout()}, - stderr: outputOptions{ignore: cmd.GetIgnoreStderr()}, - } - - if cmd.GetPreamble() != "" { - std.Out.WriteLine(output.Styledf(output.StyleOrange, "[%s] %s %s", cmd.GetName(), output.EmojiInfo, cmd.GetPreamble())) - } - - return startCmd(ctx, opts) -} - -func startCmd(ctx context.Context, opts commandOptions) (*startedCmd, error) { - sc := &startedCmd{ - opts: opts, - } - - ctx, cancel := context.WithCancel(ctx) - sc.cancel = func() { - // The default cancel function will use a SIGKILL (9) which does - // not allow processes to cleanup. If they have spawned child processes - // those child processes will be orphaned and continue running. - // SIGINT will instead gracefully shut down the process and child processes. - if sc.Cmd.Process != nil { - // We created a process group above which we kill here. - pgid, err := syscall.Getpgid(sc.Cmd.Process.Pid) - if err != nil { - // Ignore Errno 3 (No such process) as this means the process has already exited - if !errors.Is(err, syscall.Errno(0x3)) { - panic(errors.Wrapf(err, "failed to get process group ID for %s (PID %d)", sc.opts.name, sc.Cmd.Process.Pid)) - } - // note the minus sign; this signals that we want to kill the whole process group - } else if err := syscall.Kill(-pgid, syscall.SIGINT); err != nil { - panic(errors.Wrapf(err, "failed kill process group ID %d for cmd %s ", pgid, sc.opts.name)) - } - } - - cancel() - } - // Register an interrput handler - interrupt.Register(sc.cancel) - - sc.Cmd = opts.exec - sc.Cmd.Dir = opts.dir - sc.Cmd.Env = opts.env - - // This sets up a process group which we kill later. - // This allows us to ensure that any child processes are killed as well when this exits - // This will only work on POSIX systems - sc.Cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - if err := sc.connectOutput(ctx); err != nil { - sc.cancel() - return nil, err - } - - if err := sc.Start(); err != nil { - sc.cancel() - return nil, err - } - return sc, nil -} - -func (sc *startedCmd) getOutputWriter(ctx context.Context, opts *outputOptions, outputName string) io.Writer { - writers := opts.additionalWriters - if writers == nil { - writers = []io.Writer{} - } - if opts.captured == nil { - opts.captured = &prefixSuffixSaver{N: 32 << 10} - } - writers = append(writers, opts.captured) - - if opts.ignore { - std.Out.WriteLine(output.Styledf(output.StyleSuggestion, "Ignoring %s of %s", outputName, sc.opts.name)) - } else { - // Create a channel to signal when output should start. If buffering is disabled, close - // the channel so output starts immediately. - opts.start = make(chan struct{}) - if !opts.buffer { - close(opts.start) - } - - writers = append(writers, newBufferedCmdLogger(ctx, sc.opts.name, std.Out.Output, opts.start)) - } - - if sgConn != nil { - sink := func(data string) { - sgConn.Write([]byte(fmt.Sprintf("%s: %s\n", sc.opts.name, data))) - } - writers = append(writers, process.NewLogger(ctx, sink)) - } - - return io.MultiWriter(writers...) -} - -func (sc *startedCmd) connectOutput(ctx context.Context) error { - stdoutWriter := sc.getOutputWriter(ctx, &sc.opts.stdout, "stdout") - stderrWriter := sc.getOutputWriter(ctx, &sc.opts.stderr, "stderr") - - eg, err := process.PipeOutputUnbuffered(ctx, sc.Cmd, stdoutWriter, stderrWriter) - if err != nil { - return err - } - sc.outEg = eg - - return nil -} - -func (sc *startedCmd) Exit() <-chan error { - if sc.result == nil { - sc.result = make(chan error) - go func() { - sc.result <- sc.Wait() - }() - } - return sc.result -} - -func (sc *startedCmd) Wait() error { - err := sc.wait() - var e *exec.ExitError - if errors.As(err, &e) { - err = runErr{ - cmdName: sc.opts.name, - exitCode: e.ExitCode(), - stderr: sc.CapturedStderr(), - stdout: sc.CapturedStdout(), - } - } +var sgConn net.Conn +func OpenUnixSocket() error { + var err error + sgConn, err = net.Dial("unix", "/tmp/sg.sock") return err } -func (sc *startedCmd) wait() error { - if err := sc.outEg.Wait(); err != nil { - return err - } - return sc.Cmd.Wait() -} -func (sc *startedCmd) CapturedStdout() string { - return captured(sc.opts.stdout) -} - -func (sc *startedCmd) CapturedStderr() string { - return captured(sc.opts.stderr) -} - -func captured(opts outputOptions) string { - if opts.captured == nil { - return "" +func startCmd(ctx context.Context, dir string, cmd Command, parentEnv map[string]string) (*startedCmd, error) { + sc := &startedCmd{ + stdoutBuf: &prefixSuffixSaver{N: 32 << 10}, + stderrBuf: &prefixSuffixSaver{N: 32 << 10}, } - if output, err := io.ReadAll(opts.captured); err == nil { - return string(output) - } + commandCtx, cancel := context.WithCancel(ctx) + sc.cancel = cancel - return "" -} + sc.Cmd = exec.CommandContext(commandCtx, "bash", "-c", cmd.Cmd) + sc.Cmd.Dir = dir -// Begins writing output to StdOut and StdErr if it was previously buffered -func (sc *startedCmd) StartOutput() { - sc.startOutput(sc.opts.stdout) - sc.startOutput(sc.opts.stderr) -} - -func (sc *startedCmd) startOutput(opts outputOptions) { - if opts.buffer && opts.start != nil { - close(opts.start) - } -} - -// patternMatcher is writer which looks for a regular expression in the -// written bytes and calls a callback if a match is found -// by default it only looks for the matched pattern once -type patternMatcher struct { - regex *regexp.Regexp - callback func() - buffer bytes.Buffer - multi bool - disabled bool -} - -func (writer *patternMatcher) Write(p []byte) (int, error) { - if writer.disabled { - return len(p), nil - } - n, err := writer.buffer.Write(p) + secretsEnv, err := getSecrets(ctx, cmd.Name, cmd.ExternalSecrets) if err != nil { - return n, err + std.Out.WriteLine(output.Styledf(output.StyleWarning, "[%s] %s %s", + cmd.Name, output.EmojiFailure, err.Error())) } - if writer.regex.MatchReader(&writer.buffer) { - writer.callback() - if !writer.multi { - writer.disabled = true + + sc.Cmd.Env = makeEnv(parentEnv, secretsEnv, cmd.Env) + + var stdoutWriter, stderrWriter io.Writer + logger := newCmdLogger(commandCtx, cmd.Name, std.Out.Output) + + // TODO(JH) sgtail experiment going on, this is a bit ugly, that will do it + // for the demo day. + if sgConn != nil { + sink := func(data string) { + sgConn.Write([]byte(fmt.Sprintf("%s: %s\n", cmd.Name, data))) + } + sgConnLog := process.NewLogger(ctx, sink) + + if cmd.IgnoreStdout { + std.Out.WriteLine(output.Styledf(output.StyleSuggestion, "Ignoring stdout of %s", cmd.Name)) + stdoutWriter = sc.stdoutBuf + } else { + stdoutWriter = io.MultiWriter(logger, sc.stdoutBuf, sgConnLog) + } + if cmd.IgnoreStderr { + std.Out.WriteLine(output.Styledf(output.StyleSuggestion, "Ignoring stderr of %s", cmd.Name)) + stderrWriter = sc.stderrBuf + } else { + stderrWriter = io.MultiWriter(logger, sc.stderrBuf, sgConnLog) + } + } else { + if cmd.IgnoreStdout { + std.Out.WriteLine(output.Styledf(output.StyleSuggestion, "Ignoring stdout of %s", cmd.Name)) + stdoutWriter = sc.stdoutBuf + } else { + stdoutWriter = io.MultiWriter(logger, sc.stdoutBuf) + } + if cmd.IgnoreStderr { + std.Out.WriteLine(output.Styledf(output.StyleSuggestion, "Ignoring stderr of %s", cmd.Name)) + stderrWriter = sc.stderrBuf + } else { + stderrWriter = io.MultiWriter(logger, sc.stderrBuf) } } - return n, err + + if cmd.Preamble != "" { + std.Out.WriteLine(output.Styledf(output.StyleOrange, "[%s] %s %s", cmd.Name, output.EmojiInfo, cmd.Preamble)) + } + eg, err := process.PipeOutputUnbuffered(ctx, sc.Cmd, stdoutWriter, stderrWriter) + if err != nil { + return nil, err + } + sc.outEg = eg + + if err := sc.Start(); err != nil { + return sc, err + } + + return sc, nil } diff --git a/dev/sg/internal/run/ibazel.go b/dev/sg/internal/run/ibazel.go index e75a4b58c42..ee4293eccf9 100644 --- a/dev/sg/internal/run/ibazel.go +++ b/dev/sg/internal/run/ibazel.go @@ -2,253 +2,56 @@ package run import ( "context" - "encoding/json" - "fmt" "io" - "os" "os/exec" - "path" - "slices" - "strings" - "github.com/grafana/regexp" - "github.com/nxadm/tail" - - "github.com/sourcegraph/sourcegraph/lib/errors" - "github.com/sourcegraph/sourcegraph/lib/output" + "github.com/sourcegraph/sourcegraph/dev/sg/internal/std" + "github.com/sourcegraph/sourcegraph/lib/process" ) -func ibazelLogPath(tempDir string) string { - return path.Join(tempDir, "ibazel.log") -} - -func profileEventsPath(tempDir string) string { - return path.Join(tempDir, "profile.json") -} - -var watchErrorRegex = regexp.MustCompile(`Bazel query failed: exit status 7`) - type IBazel struct { + pwd string targets []string - events *iBazelEventHandler - tempDir string - logFile *os.File - dir string - proc *startedCmd - logs chan<- output.FancyLine + cancel func() } -// returns a runner to interact with ibazel. -func NewIBazel(cmds []BazelCommand, dir string) (*IBazel, error) { - tempDir, err := os.MkdirTemp("", "ibazel") - if err != nil { - return nil, err - } - - logFile, err := os.Create(ibazelLogPath(tempDir)) - if err != nil { - return nil, err - } - - targets := make([]string, 0, len(cmds)) - for _, cmd := range cmds { - if cmd.Target != "" && !slices.Contains(targets, cmd.Target) { - targets = append(targets, cmd.Target) - } - } - +// newIBazel returns a runner to interact with ibazel. +func newIBazel(pwd string, targets ...string) *IBazel { return &IBazel{ + pwd: pwd, targets: targets, - events: newIBazelEventHandler(profileEventsPath(tempDir)), - tempDir: tempDir, - logFile: logFile, - dir: dir, - }, nil + } } -func (ibazel *IBazel) GetName() string { - return fmt.Sprintf("bazel targets (%s)", strings.Join(ibazel.targets, ", ")) -} +func (ib *IBazel) Start(ctx context.Context, dir string) error { + args := append([]string{"build"}, ib.targets...) + ctx, ib.cancel = context.WithCancel(ctx) + cmd := exec.CommandContext(ctx, "ibazel", args...) -func (ibazel *IBazel) RunInstall(ctx context.Context, env map[string]string) error { - if len(ibazel.targets) == 0 { - // no Bazel commands so we return - return nil + sc := &startedCmd{ + stdoutBuf: &prefixSuffixSaver{N: 32 << 10}, + stderrBuf: &prefixSuffixSaver{N: 32 << 10}, } - err := ibazel.build(ctx) + sc.cancel = ib.cancel + sc.Cmd = cmd + sc.Cmd.Dir = dir + + var stdoutWriter, stderrWriter io.Writer + logger := newCmdLogger(ctx, "iBazel", std.Out.Output) + stdoutWriter = io.MultiWriter(logger, sc.stdoutBuf) + stderrWriter = io.MultiWriter(logger, sc.stderrBuf) + eg, err := process.PipeOutputUnbuffered(ctx, sc.Cmd, stdoutWriter, stderrWriter) if err != nil { return err } + sc.outEg = eg - go ibazel.events.watch(ctx) - - // block until initial ibazel build is completed - return ibazel.WaitForInitialBuild(ctx) + // Bazel out directory should exist here before returning + return sc.Start() } -func (ib *IBazel) SetInstallerOutput(logs chan<- output.FancyLine) { - logs <- output.Styledf(output.StyleGrey, "iBazel output can be found at %s", ibazelLogPath(ib.tempDir)) - logs <- output.Styledf(output.StyleGrey, "iBazel log events can be found at %s", profileEventsPath(ib.tempDir)) - ib.logs = logs -} - -func (ib *IBazel) Count() int { - return len(ib.targets) -} - -func (ib *IBazel) GetExecCmd(ctx context.Context) *exec.Cmd { - // Writes iBazel events out to a log file. These are much easier to parse - // than trying to understand the output directly - profilePath := "--profile_dev=" + profileEventsPath(ib.tempDir) - // This enables iBazel to try to apply the fixes from .bazel_fix_commands.json automatically - enableAutoFix := "--run_output_interactive=false" - args := append([]string{profilePath, enableAutoFix, "build"}, ib.targets...) - return exec.CommandContext(ctx, "ibazel", args...) -} - -func (ib *IBazel) WaitForInitialBuild(ctx context.Context) error { - defer ib.events.close() - for event := range ib.events.events { - if event.Type == buildDone { - return nil - } - if event.Type == buildFailed { - return errors.Newf("initial ibazel build failed") - } - } +func (ib *IBazel) Stop() error { + ib.cancel() return nil } - -func (ib *IBazel) getCommandOptions(ctx context.Context) commandOptions { - return commandOptions{ - name: "iBazel", - exec: ib.GetExecCmd(ctx), - dir: ib.dir, - // Don't output iBazel logs (which are all on stderr) until - // initial build is complete as it will break the progress bar - stderr: outputOptions{ - buffer: true, - additionalWriters: []io.Writer{ - ib.logFile, - &patternMatcher{regex: watchErrorRegex, callback: ib.logWatchError}, - }}, - } -} - -// Build starts an ibazel process to build the targets provided in the constructor -// It runs perpetually, watching for file changes -func (ib *IBazel) build(ctx context.Context) (err error) { - ib.proc, err = startCmd(ctx, ib.getCommandOptions(ctx)) - return err -} - -func (ib *IBazel) StartOutput() { - ib.proc.StartOutput() -} - -func (ib *IBazel) Close() { - ib.logFile.Close() - os.RemoveAll(ib.tempDir) - ib.proc.cancel() -} - -func (ib *IBazel) logWatchError() { - buildQuery := `buildfiles(deps(set(%s)))` - queries := make([]string, len(ib.targets)) - for i, target := range ib.targets { - queries[i] = fmt.Sprintf(buildQuery, target) - } - - queryString := strings.Join(queries, " union ") - - msg := `WARNING: iBazel failed to watch for changes, and will be unable to reload upon file changes. -This is likely because bazel query for one of the targets failed. Try running: - -bazel query "%s" - -to determine which target is crashing the analysis. - -` - ib.logs <- output.Styledf(output.StyleWarning, msg, queryString) -} - -type iBazelEventHandler struct { - events chan iBazelEvent - stop chan struct{} - filename string -} - -func newIBazelEventHandler(filename string) *iBazelEventHandler { - return &iBazelEventHandler{ - events: make(chan iBazelEvent), - stop: make(chan struct{}), - filename: filename, - } -} - -// Watch opens the provided profile.json and reads it as it is continuously written by iBazel -// Each time it sees a iBazel event log, it parses it and puts it on the events channel -// This is a blocking function -func (h *iBazelEventHandler) watch(ctx context.Context) { - _, cancel := context.WithCancelCause(ctx) - tail, err := tail.TailFile(h.filename, tail.Config{Follow: true, Logger: tail.DiscardingLogger}) - if err != nil { - cancel(err) - } - defer tail.Cleanup() - - for { - select { - case line := <-tail.Lines: - var event iBazelEvent - if err := json.Unmarshal([]byte(line.Text), &event); err != nil { - cancel(errors.Newf("failed to unmarshal event json: %s", err)) - } - h.events <- event - case <-ctx.Done(): - cancel(ctx.Err()) - return - case <-h.stop: - return - } - - } -} - -func (h *iBazelEventHandler) close() { - h.stop <- struct{}{} -} - -// Schema information at https://github.com/bazelbuild/bazel-watcher?tab=readme-ov-file#profiler-events -type iBazelEvent struct { - // common - Type string `json:"type"` - Iteration string `json:"iteration"` - Time int64 `json:"time"` - Targets []string `json:"targets,omitempty"` - Elapsed int64 `json:"elapsed,omitempty"` - - // start event - IBazelVersion string `json:"iBazelVersion,omitempty"` - BazelVersion string `json:"bazelVersion,omitempty"` - MaxHeapSize string `json:"maxHeapSize,omitempty"` - CommittedHeapSize string `json:"committedHeapSize,omitempty"` - - // change event - Change string `json:"change,omitempty"` - - // build & reload event - Changes []string `json:"changes,omitempty"` - - // browser event - RemoteType string `json:"remoteType,omitempty"` - RemoteTime int64 `json:"remoteTime,omitempty"` - RemoteElapsed int64 `json:"remoteElapsed,omitempty"` - RemoteData string `json:"remoteData,omitempty"` -} - -const ( - buildDone = "BUILD_DONE" - buildFailed = "BUILD_FAILED" -) diff --git a/dev/sg/internal/run/installer.go b/dev/sg/internal/run/installer.go deleted file mode 100644 index d28a61b1132..00000000000 --- a/dev/sg/internal/run/installer.go +++ /dev/null @@ -1,338 +0,0 @@ -package run - -import ( - "context" - "fmt" - "os" - "path/filepath" - "runtime" - "strings" - "time" - - "github.com/sourcegraph/sourcegraph/dev/sg/internal/analytics" - "github.com/sourcegraph/sourcegraph/dev/sg/internal/std" - "github.com/sourcegraph/sourcegraph/dev/sg/interrupt" - "github.com/sourcegraph/sourcegraph/dev/sg/root" - "github.com/sourcegraph/sourcegraph/internal/download" - "github.com/sourcegraph/sourcegraph/lib/errors" - "github.com/sourcegraph/sourcegraph/lib/output" -) - -type Installer interface { - RunInstall(ctx context.Context, env map[string]string) error - - // Gives a channel which the installer can use to send log messages - SetInstallerOutput(chan<- output.FancyLine) - - GetName() string - - // Number of programs this target is installing - Count() int -} - -type InstallManager struct { - // Constructor commands - *std.Output - cmds map[string]Installer - env map[string]string - verbose bool - - // State vars - installed chan string - failures chan failedRun - logs chan output.FancyLine - done int - total int - waitingMessageIndex int - progress output.Progress - ticker *time.Ticker - tickInterval time.Duration - stats *installAnalytics -} - -func Install(ctx context.Context, env map[string]string, verbose bool, cmds ...Installer) error { - installer := newInstallManager(cmds, std.Out, env, verbose) - - installer.start(ctx) - - installer.install(ctx, cmds) - - return installer.wait(ctx) -} - -func newInstallManager(cmds []Installer, out *std.Output, env map[string]string, verbose bool) *InstallManager { - total := 0 - cmdsMap := make(map[string]Installer, len(cmds)) - for _, cmd := range cmds { - total += cmd.Count() - cmdsMap[cmd.GetName()] = cmd - } - return &InstallManager{ - Output: out, - cmds: cmdsMap, - verbose: verbose, - env: env, - - installed: make(chan string, total), - failures: make(chan failedRun, total), - logs: make(chan output.FancyLine, 10), - done: 0, - total: total, - } -} - -// starts all progress bars and counters but does not start installation -func (installer *InstallManager) start(ctx context.Context) { - installer.Write("") - installer.WriteLine(output.Linef(output.EmojiLightbulb, output.StyleBold, "Installing %d commands...", installer.total)) - installer.Write("") - - installer.progress = installer.Progress([]output.ProgressBar{ - {Label: fmt.Sprintf("Installing %d commands", installer.total), Max: float64(installer.total)}, - }, nil) - - // Every uninterrupted 15 seconds we will print out a waiting message - installer.startTicker(15 * time.Second) - - installer.startAnalytics(ctx, installer.cmds) -} - -// Starts the installation process in a non-blocking process -func (installer *InstallManager) install(ctx context.Context, cmds []Installer) { - for _, cmd := range cmds { - go func(ctx context.Context, cmd Installer) { - // Set the log channel for the installer - cmd.SetInstallerOutput(installer.logs) - - if err := cmd.RunInstall(ctx, installer.env); err != nil { - // if failed, put on the failure queue and exit - installer.failures <- failedRun{cmdName: cmd.GetName(), err: err} - } - - installer.installed <- cmd.GetName() - }(ctx, cmd) - } -} - -// Blocks until all installations have successfully completed -// or until a failure occurs -func (installer *InstallManager) wait(ctx context.Context) error { - for { - select { - case cmdName := <-installer.installed: - installer.handleInstalled(cmdName) - - // Everything installed! - if installer.isDone() { - installer.complete() - return nil - } - - case failure := <-installer.failures: - installer.handleFailure(failure.cmdName, failure.err) - return failure - - case log := <-installer.logs: - installer.progress.WriteLine(log) - - case <-ctx.Done(): - // Context was canceled, exit early - return ctx.Err() - - case <-installer.tick(): - installer.handleWaiting() - } - } -} -func (installer *InstallManager) startTicker(interval time.Duration) { - installer.ticker = time.NewTicker(interval) - installer.tickInterval = interval -} - -func (installer *InstallManager) startAnalytics(ctx context.Context, cmds map[string]Installer) { - installer.stats = startInstallAnalytics(ctx, cmds) -} - -func (installer *InstallManager) handleInstalled(name string) { - installer.stats.handleInstalled(name) - installer.ticker.Reset(installer.tickInterval) - - installer.done += installer.cmds[name].Count() - delete(installer.cmds, name) - - installer.progress.WriteLine(output.Styledf(output.StyleSuccess, "%s installed", name)) - installer.progress.SetValue(0, float64(installer.done)) - installer.progress.SetLabelAndRecalc(0, fmt.Sprintf("%d/%d commands installed", int(installer.done), int(installer.total))) -} - -func (installer *InstallManager) complete() { - installer.progress.Complete() - - installer.Write("") - if installer.verbose { - installer.WriteLine(output.Linef(output.EmojiSuccess, output.StyleSuccess, "Everything installed! Took %s. Booting up the system!", installer.stats.duration())) - } else { - installer.WriteLine(output.Linef(output.EmojiSuccess, output.StyleSuccess, "Everything installed! Booting up the system!")) - } - installer.Write("") - - // If there are any pendings logs, print them out - for { - select { - case log := <-installer.logs: - installer.WriteLine(log) - default: - return - } - } -} - -func (installer *InstallManager) handleFailure(name string, err error) { - installer.progress.Destroy() - installer.stats.handleFailure(name, err) - printCmdError(installer.Output.Output, name, err) -} - -func (installer *InstallManager) handleWaiting() { - names := []string{} - for name := range installer.cmds { - names = append(names, name) - } - - msg := waitingMessages[installer.waitingMessageIndex] - emoji := output.EmojiHourglass - if installer.waitingMessageIndex > 3 { - emoji = output.EmojiShrug - } - - installer.progress.WriteLine(output.Linef(emoji, output.StyleBold, msg, strings.Join(names, ", "))) - installer.waitingMessageIndex = (installer.waitingMessageIndex + 1) % len(waitingMessages) -} - -func (installer *InstallManager) tick() <-chan time.Time { - return installer.ticker.C -} - -func (installer *InstallManager) isDone() bool { - return len(installer.cmds) == 0 -} - -type installAnalytics struct { - Start time.Time - Spans map[string]*analytics.Span -} - -func startInstallAnalytics(ctx context.Context, cmds map[string]Installer) *installAnalytics { - installer := &installAnalytics{ - Start: time.Now(), - Spans: make(map[string]*analytics.Span, len(cmds)), - } - - for cmd := range cmds { - _, installer.Spans[cmd] = analytics.StartSpan(ctx, fmt.Sprintf("install %s", cmd), "install_command") - } - - interrupt.Register(installer.handleInterrupt) - - return installer -} - -func (a *installAnalytics) handleInterrupt() { - for _, span := range a.Spans { - if span.IsRecording() { - span.Cancelled() - span.End() - } - } -} - -func (a *installAnalytics) handleInstalled(name string) { - a.Spans[name].Succeeded() - a.Spans[name].End() -} - -func (a *installAnalytics) handleFailure(name string, err error) { - a.Spans[name].RecordError("failed", err) - a.Spans[name].End() -} - -func (a *installAnalytics) duration() time.Duration { - return time.Since(a.Start) -} - -type installFunc func(context.Context, map[string]string) error - -var installFuncs = map[string]installFunc{ - "installCaddy": func(ctx context.Context, env map[string]string) error { - version := env["CADDY_VERSION"] - if version == "" { - return errors.New("could not find CADDY_VERSION in env") - } - - root, err := root.RepositoryRoot() - if err != nil { - return err - } - - var os string - switch runtime.GOOS { - case "linux": - os = "linux" - case "darwin": - os = "mac" - } - - archiveName := fmt.Sprintf("caddy_%s_%s_%s", version, os, runtime.GOARCH) - url := fmt.Sprintf("https://github.com/caddyserver/caddy/releases/download/v%s/%s.tar.gz", version, archiveName) - - target := filepath.Join(root, fmt.Sprintf(".bin/caddy_%s", version)) - - return download.ArchivedExecutable(ctx, url, target, "caddy") - }, - "installJaeger": func(ctx context.Context, env map[string]string) error { - version := env["JAEGER_VERSION"] - - // Make sure the data folder exists. - disk := env["JAEGER_DISK"] - if err := os.MkdirAll(disk, 0755); err != nil { - return err - } - - if version == "" { - return errors.New("could not find JAEGER_VERSION in env") - } - - root, err := root.RepositoryRoot() - if err != nil { - return err - } - - archiveName := fmt.Sprintf("jaeger-%s-%s-%s", version, runtime.GOOS, runtime.GOARCH) - url := fmt.Sprintf("https://github.com/jaegertracing/jaeger/releases/download/v%s/%s.tar.gz", version, archiveName) - - target := filepath.Join(root, fmt.Sprintf(".bin/jaeger-all-in-one-%s", version)) - - return download.ArchivedExecutable(ctx, url, target, fmt.Sprintf("%s/jaeger-all-in-one", archiveName)) - }, -} - -// As per tradition, if you edit this file you must add a new waiting message -var waitingMessages = []string{ - "Still waiting for %s to finish installing...", - "Yup, still waiting for %s to finish installing...", - "Here's the bad news: still waiting for %s to finish installing. The good news is that we finally have a chance to talk, no?", - "Still waiting for %s to finish installing...", - "Hey, %s, there's people waiting for you, pal", - "Sooooo, how are ya? Yeah, waiting. I hear you. Wish %s would hurry up.", - "I mean, what is %s even doing?", - "I now expect %s to mean 'producing a miracle' with 'installing'", - "Still waiting for %s to finish installing...", - "Before this I think the longest I ever had to wait was at Disneyland in '99, but %s is now #1", - "Still waiting for %s to finish installing...", - "At this point it could be anything - does your computer still have power? Come on, %s", - "Might as well check Slack. %s is taking its time...", - "In German there's a saying: ein guter Käse braucht seine Zeit - a good cheese needs its time. Maybe %s is cheese?", - "If %ss turns out to be cheese I'm gonna lose it. Hey, hurry up, will ya", - "Still waiting for %s to finish installing...", - "You're probably wondering why I've called %s here today...", -} diff --git a/dev/sg/internal/run/logger.go b/dev/sg/internal/run/logger.go index 3452dad6849..609e57af45c 100644 --- a/dev/sg/internal/run/logger.go +++ b/dev/sg/internal/run/logger.go @@ -31,17 +31,13 @@ var ( lineFormat = "%s%s[%+" + strconv.Itoa(maxNameLength) + "s]%s %s" ) -// newBufferedCmdLogger returns a new process.Logger with a unique color based on the name of the cmd -// that blocks until the given start signal and writes logs to the given output.Output. -func newBufferedCmdLogger(ctx context.Context, name string, out *output.Output, start <-chan struct{}) *process.Logger { +// newCmdLogger returns a new process.Logger with a unique color based on the name of the cmd. +func newCmdLogger(ctx context.Context, name string, out *output.Output) *process.Logger { name = compactName(name) color := nameToColor(name) sink := func(data string) { - go func() { - <-start - out.Writef(lineFormat, output.StyleBold, color, name, output.StyleReset, data) - }() + out.Writef(lineFormat, output.StyleBold, color, name, output.StyleReset, data) } return process.NewLogger(ctx, sink) diff --git a/dev/sg/internal/run/prefix_suffix_saver.go b/dev/sg/internal/run/prefix_suffix_saver.go index 316be5aca3a..ed02f3ed17e 100644 --- a/dev/sg/internal/run/prefix_suffix_saver.go +++ b/dev/sg/internal/run/prefix_suffix_saver.go @@ -59,16 +59,12 @@ func (w *prefixSuffixSaver) fill(dst *[]byte, p []byte) (pRemain []byte) { return p } -func (w *prefixSuffixSaver) Read(p []byte) (n int, err error) { - return w.Bytes().Read(p) -} - -func (w *prefixSuffixSaver) Bytes() *bytes.Buffer { +func (w *prefixSuffixSaver) Bytes() []byte { if w.suffix == nil { - return bytes.NewBuffer(w.prefix) + return w.prefix } if w.skipped == 0 { - return bytes.NewBuffer(append(w.prefix, w.suffix...)) + return append(w.prefix, w.suffix...) } var buf bytes.Buffer buf.Grow(len(w.prefix) + len(w.suffix) + 50) @@ -78,5 +74,5 @@ func (w *prefixSuffixSaver) Bytes() *bytes.Buffer { buf.WriteString(" bytes ...\n") buf.Write(w.suffix[w.suffixOff:]) buf.Write(w.suffix[:w.suffixOff]) - return &buf + return buf.Bytes() } diff --git a/dev/sg/internal/run/run.go b/dev/sg/internal/run/run.go index b14bac649ac..7a2ecdd445c 100644 --- a/dev/sg/internal/run/run.go +++ b/dev/sg/internal/run/run.go @@ -8,30 +8,43 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "strings" + "sync" + "time" - "github.com/sourcegraph/conc/pool" + "github.com/grafana/regexp" + "github.com/rjeczalik/notify" + "golang.org/x/sync/semaphore" + "github.com/sourcegraph/sourcegraph/dev/sg/internal/analytics" "github.com/sourcegraph/sourcegraph/dev/sg/internal/std" + "github.com/sourcegraph/sourcegraph/dev/sg/interrupt" "github.com/sourcegraph/sourcegraph/dev/sg/root" + "github.com/sourcegraph/sourcegraph/internal/download" "github.com/sourcegraph/sourcegraph/lib/errors" "github.com/sourcegraph/sourcegraph/lib/output" ) -type cmdRunner struct { - *std.Output - cmds []SGConfigCommand - repositoryRoot string - parentEnv map[string]string - verbose bool -} +const MAX_CONCURRENT_BUILD_PROCS = 4 -func Commands(ctx context.Context, parentEnv map[string]string, verbose bool, cmds ...SGConfigCommand) (err error) { +func Commands(ctx context.Context, parentEnv map[string]string, verbose bool, cmds ...Command) error { if len(cmds) == 0 { // Exit early if there are no commands to run. return nil } - std.Out.WriteLine(output.Styled(output.StylePending, fmt.Sprintf("Starting %d cmds", len(cmds)))) + + chs := make([]<-chan struct{}, 0, len(cmds)) + monitor := &changeMonitor{} + for _, cmd := range cmds { + chs = append(chs, monitor.register(cmd)) + } + + pathChanges, err := watch() + if err != nil { + return err + } + go monitor.run(pathChanges) repoRoot, err := root.RepositoryRoot() if err != nil { @@ -46,140 +59,356 @@ func Commands(ctx context.Context, parentEnv map[string]string, verbose bool, cm return err } + wg := sync.WaitGroup{} + installSemaphore := semaphore.NewWeighted(MAX_CONCURRENT_BUILD_PROCS) + failures := make(chan failedRun, len(cmds)) + installed := make(chan string, len(cmds)) + okayToStart := make(chan struct{}) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + runner := &cmdRunner{ + verbose: verbose, + installSemaphore: installSemaphore, + failures: failures, + installed: installed, + okayToStart: okayToStart, + repositoryRoot: repoRoot, + parentEnv: parentEnv, + } + + cmdNames := make(map[string]struct{}, len(cmds)) + + for i, cmd := range cmds { + cmdNames[cmd.Name] = struct{}{} + + wg.Add(1) + + go func(cmd Command, ch <-chan struct{}) { + defer wg.Done() + var err error + for first := true; cmd.ContinueWatchOnExit || first; first = false { + if err = runner.runAndWatch(ctx, cmd, ch); err != nil { + if errors.Is(err, ctx.Err()) { // if error caused by context, terminate + return + } + if cmd.ContinueWatchOnExit { + printCmdError(std.Out.Output, cmd.Name, err) + time.Sleep(time.Second * 10) // backoff + } else { + failures <- failedRun{cmdName: cmd.Name, err: err} + } + } + } + if err != nil { + cancel() + } + }(cmd, chs[i]) + } + + err = runner.waitForInstallation(ctx, cmdNames) + if err != nil { + return err + } + if err := writePid(); err != nil { return err } - runner := cmdRunner{ - std.Out, - cmds, - repoRoot, - parentEnv, - verbose, - } + wg.Wait() - return runner.run(ctx) + select { + case <-ctx.Done(): + printCmdError(std.Out.Output, "other", ctx.Err()) + return ctx.Err() + case failure := <-failures: + printCmdError(std.Out.Output, failure.cmdName, failure.err) + return failure + default: + return nil + } } -func (runner *cmdRunner) run(ctx context.Context) error { - p := pool.New().WithContext(ctx).WithCancelOnError().WithFirstError() - // Start each command concurrently - for _, cmd := range runner.cmds { - cmd := cmd - p.Go(func(ctx context.Context) error { - std.Out.WriteLine(output.Styledf(output.StylePending, "Running %s...", cmd.GetName())) +type cmdRunner struct { + verbose bool - // Start watching the commands dependencies - wantRestart, err := cmd.StartWatch(ctx) - if err != nil { - runner.printError(cmd, err) - return err + installSemaphore *semaphore.Weighted + failures chan failedRun + installed chan string + okayToStart chan struct{} + + repositoryRoot string + parentEnv map[string]string +} + +func (c *cmdRunner) runAndWatch(ctx context.Context, cmd Command, reload <-chan struct{}) error { + printDebug := func(f string, args ...any) { + if !c.verbose { + return + } + message := fmt.Sprintf(f, args...) + std.Out.WriteLine(output.Styledf(output.StylePending, "%s[DEBUG] %s: %s %s", output.StyleBold, cmd.Name, output.StyleReset, message)) + } + + startedOnce := false + + var ( + md5hash string + md5changed bool + ) + + var wg sync.WaitGroup + var cancelFuncs []context.CancelFunc + + errs := make(chan error, 1) + defer func() { + wg.Wait() + close(errs) + }() + + for { + // Build it + if cmd.Install != "" || cmd.InstallFunc != "" { + install := func() (string, error) { + if err := c.installSemaphore.Acquire(ctx, 1); err != nil { + return "", errors.Wrap(err, "lockfiles semaphore") + } + defer c.installSemaphore.Release(1) + + if startedOnce { + std.Out.WriteLine(output.Styledf(output.StylePending, "Installing %s...", cmd.Name)) + } + if cmd.Install != "" && cmd.InstallFunc == "" { + return BashInRoot(ctx, cmd.Install, makeEnv(c.parentEnv, cmd.Env)) + } else if cmd.Install == "" && cmd.InstallFunc != "" { + fn, ok := installFuncs[cmd.InstallFunc] + if !ok { + return "", errors.Newf("no install func with name %q found", cmd.InstallFunc) + } + return "", fn(ctx, makeEnvMap(c.parentEnv, cmd.Env)) + } + + return "", nil } - // start up the binary - proc, err := runner.start(ctx, cmd) + cmdOut, err := install() if err != nil { - runner.printError(cmd, err) - return errors.Wrapf(err, "failed to start command %q", cmd.GetName()) - } - defer proc.cancel() - - // Wait forever until we're asked to stop or that restarting returns an error. - for { - select { - // Handle context cancelled - case <-ctx.Done(): - return ctx.Err() - - // Handle process exit - case err := <-proc.Exit(): - // If the process failed, we exit immediately - if err != nil { - return err - } - - runner.WriteLine(output.Styledf(output.StyleSuccess, "%s%s exited without error%s", output.StyleBold, cmd.GetName(), output.StyleReset)) - - // If we shouldn't restart when the process exits, return - if !cmd.GetContinueWatchOnExit() { - return nil - } - - // handle file watcher triggered - case <-wantRestart: - // If the command has an installer, re-run the install and determine if we should restart - runner.WriteLine(output.Styledf(output.StylePending, "Change detected. Reloading %s...", cmd.GetName())) - shouldRestart, err := runner.reinstall(ctx, cmd) - if err != nil { - runner.printError(cmd, err) - return err - } - - if shouldRestart { - runner.WriteLine(output.Styledf(output.StylePending, "Restarting %s...", cmd.GetName())) - proc.cancel() - proc, err = runner.start(ctx, cmd) - if err != nil { - return err - } - defer proc.cancel() - } else { - runner.WriteLine(output.Styledf(output.StylePending, "Binary for %s did not change. Not restarting.", cmd.GetName())) - } + if !startedOnce { + return installErr{cmdName: cmd.Name, output: cmdOut, originalErr: err} + } else { + printCmdError(std.Out.Output, cmd.Name, reinstallErr{cmdName: cmd.Name, output: cmdOut}) + // Now we wait for a reload signal before we start to build it again + <-reload + continue } } - }) - } - return p.Wait() -} + // clear this signal before starting + select { + case <-reload: + default: + } -func (runner *cmdRunner) printError(cmd SGConfigCommand, err error) { - printCmdError(runner.Output.Output, cmd.GetName(), err) -} + if startedOnce { + std.Out.WriteLine(output.Styledf(output.StyleSuccess, "%sSuccessfully installed %s%s", output.StyleBold, cmd.Name, output.StyleReset)) + } -func (runner *cmdRunner) debug(msg string, args ...any) { //nolint currently unused but a handy tool for debugginlg - if runner.verbose { - message := fmt.Sprintf(msg, args...) - runner.WriteLine(output.Styledf(output.StylePending, "%s[DEBUG]: %s %s", output.StyleBold, output.StyleReset, message)) + if cmd.CheckBinary != "" { + newHash, err := md5HashFile(filepath.Join(c.repositoryRoot, cmd.CheckBinary)) + if err != nil { + return installErr{cmdName: cmd.Name, output: cmdOut, originalErr: err} + } + + md5changed = md5hash != newHash + md5hash = newHash + } + + } + + if !startedOnce { + c.installed <- cmd.Name + <-c.okayToStart + } + + if cmd.CheckBinary == "" || md5changed { + for _, cancel := range cancelFuncs { + printDebug("Canceling previous process and waiting for it to exit...") + cancel() // Stop command + <-errs // Wait for exit + printDebug("Previous command exited") + } + cancelFuncs = nil + + // Run it + std.Out.WriteLine(output.Styledf(output.StylePending, "Running %s...", cmd.Name)) + + sc, err := startCmd(ctx, c.repositoryRoot, cmd, c.parentEnv) + if err != nil { + return err + } + defer sc.cancel() + + cancelFuncs = append(cancelFuncs, sc.cancel) + + wg.Add(1) + go func() { + defer wg.Done() + + err := sc.Wait() + + var e *exec.ExitError + if errors.As(err, &e) { + err = runErr{ + cmdName: cmd.Name, + exitCode: e.ExitCode(), + stderr: sc.CapturedStderr(), + stdout: sc.CapturedStdout(), + } + } + if err == nil && cmd.ContinueWatchOnExit { + std.Out.WriteLine(output.Styledf(output.StyleSuccess, "Command %s completed", cmd.Name)) + <-reload // on success, wait for next reload before restarting + errs <- nil + } else { + errs <- err + } + }() + + // TODO: We should probably only set this after N seconds (or when + // we're sure that the command has booted up -- maybe healthchecks?) + startedOnce = true + } else { + std.Out.WriteLine(output.Styled(output.StylePending, "Binary did not change. Not restarting.")) + } + + select { + case <-reload: + std.Out.WriteLine(output.Styledf(output.StylePending, "Change detected. Reloading %s...", cmd.Name)) + continue // Reinstall + + case err := <-errs: + // Exited on its own or errored + if err == nil { + std.Out.WriteLine(output.Styledf(output.StyleSuccess, "%s%s exited without error%s", output.StyleBold, cmd.Name, output.StyleReset)) + } + return err + } } } -func (runner *cmdRunner) start(ctx context.Context, cmd SGConfigCommand) (*startedCmd, error) { - return startSgCmd(ctx, cmd, runner.repositoryRoot, runner.parentEnv) -} - -func (runner *cmdRunner) reinstall(ctx context.Context, cmd SGConfigCommand) (bool, error) { - if installer, ok := cmd.(Installer); !ok { - // If there is no installer, then we always restart - return true, nil - } else { - bin, err := cmd.GetBinaryLocation() - if err != nil { - // If the command doesn't have a CheckBinary, we just ignore it - if errors.Is(err, noBinaryError{}) { - return false, nil - } else { - return false, err +func (c *cmdRunner) waitForInstallation(ctx context.Context, cmdNames map[string]struct{}) error { + installationStart := time.Now() + installationSpans := make(map[string]*analytics.Span, len(cmdNames)) + for name := range cmdNames { + _, installationSpans[name] = analytics.StartSpan(ctx, fmt.Sprintf("install %s", name), "install_command") + } + interrupt.Register(func() { + for _, span := range installationSpans { + if span.IsRecording() { + span.Cancelled() + span.End() } } + }) - oldHash, err := md5HashFile(bin) - if err != nil { - return false, err - } + std.Out.Write("") + std.Out.WriteLine(output.Linef(output.EmojiLightbulb, output.StyleBold, "Installing %d commands...", len(cmdNames))) + std.Out.Write("") - if err := installer.RunInstall(ctx, runner.parentEnv); err != nil { - runner.printError(cmd, err) - return false, err - } - newHash, err := md5HashFile(bin) - if err != nil { - return false, err - } - - return oldHash != newHash, nil + waitingMessages := []string{ + "Still waiting for %s to finish installing...", + "Yup, still waiting for %s to finish installing...", + "Here's the bad news: still waiting for %s to finish installing. The good news is that we finally have a chance to talk, no?", + "Still waiting for %s to finish installing...", + "Hey, %s, there's people waiting for you, pal", + "Sooooo, how are ya? Yeah, waiting. I hear you. Wish %s would hurry up.", + "I mean, what is %s even doing?", + "I now expect %s to mean 'producing a miracle' with 'installing'", + "Still waiting for %s to finish installing...", + "Before this I think the longest I ever had to wait was at Disneyland in '99, but %s is now #1", + "Still waiting for %s to finish installing...", + "At this point it could be anything - does your computer still have power? Come on, %s", + "Might as well check Slack. %s is taking its time...", + "In German there's a saying: ein guter Käse braucht seine Zeit - a good cheese needs its time. Maybe %s is cheese?", + "If %ss turns out to be cheese I'm gonna lose it. Hey, hurry up, will ya", + "Still waiting for %s to finish installing...", } + messageCount := 0 + + const tickInterval = 15 * time.Second + ticker := time.NewTicker(tickInterval) + + done := 0.0 + total := float64(len(cmdNames)) + progress := std.Out.Progress([]output.ProgressBar{ + {Label: fmt.Sprintf("Installing %d commands", len(cmdNames)), Max: total}, + }, nil) + + for { + select { + case cmdName := <-c.installed: + ticker.Reset(tickInterval) + + delete(cmdNames, cmdName) + done += 1.0 + installationSpans[cmdName].Succeeded() + installationSpans[cmdName].End() + + progress.WriteLine(output.Styledf(output.StyleSuccess, "%s installed", cmdName)) + + progress.SetValue(0, done) + progress.SetLabelAndRecalc(0, fmt.Sprintf("%d/%d commands installed", int(done), int(total))) + + // Everything installed! + if len(cmdNames) == 0 { + progress.Complete() + + duration := time.Since(installationStart) + + std.Out.Write("") + if c.verbose { + std.Out.WriteLine(output.Linef(output.EmojiSuccess, output.StyleSuccess, "Everything installed! Took %s. Booting up the system!", duration)) + } else { + std.Out.WriteLine(output.Linef(output.EmojiSuccess, output.StyleSuccess, "Everything installed! Booting up the system!")) + } + std.Out.Write("") + + close(c.okayToStart) + return nil + } + + case failure := <-c.failures: + progress.Destroy() + installationSpans[failure.cmdName].RecordError("failed", failure.err) + installationSpans[failure.cmdName].End() + + // Something went wrong with an installation, no need to wait for the others + printCmdError(std.Out.Output, failure.cmdName, failure.err) + return failure + + case <-ticker.C: + names := []string{} + for name := range cmdNames { + names = append(names, name) + } + + idx := messageCount + if idx > len(waitingMessages)-1 { + idx = len(waitingMessages) - 1 + } + msg := waitingMessages[idx] + + emoji := output.EmojiHourglass + if idx > 3 { + emoji = output.EmojiShrug + } + + progress.WriteLine(output.Linef(emoji, output.StyleBold, msg, strings.Join(names, ", "))) + messageCount += 1 + } + } + } // failedRun is returned by run when a command failed to run and run exits @@ -204,6 +433,17 @@ func (e installErr) Error() string { return fmt.Sprintf("install of %s failed: %s", e.cmdName, e.output) } +// reinstallErr is used internally by runWatch to print a message when a +// command failed to reinstall. +type reinstallErr struct { + cmdName string + output string +} + +func (e reinstallErr) Error() string { + return fmt.Sprintf("reinstalling %s failed: %s", e.cmdName, e.output) +} + // runErr is used internally by runWatch to print a message when a // command failed to reinstall. type runErr struct { @@ -218,12 +458,8 @@ func (e runErr) Error() string { } func printCmdError(out *output.Output, cmdName string, err error) { - // Don't log context canceled errors because they are not the root issue - if errors.Is(err, context.Canceled) { - return - } - var message, cmdOut string + switch e := errors.Cause(err).(type) { case installErr: message = "Failed to build " + cmdName @@ -237,6 +473,9 @@ func printCmdError(out *output.Output, cmdName string, err error) { } } cmdOut = e.output + case reinstallErr: + message = "Failed to rebuild " + cmdName + cmdOut = e.output case runErr: message = "Failed to run " + cmdName cmdOut = fmt.Sprintf("Exit code: %d\n\n", e.exitCode) @@ -252,19 +491,7 @@ func printCmdError(out *output.Output, cmdName string, err error) { } default: - var exc *exec.ExitError - // recurse if it is an exit error - if errors.As(err, &exc) { - printCmdError(out, cmdName, runErr{ - cmdName: cmdName, - exitCode: exc.ExitCode(), - stderr: string(exc.Stderr), - }) - return - } else { - message = fmt.Sprintf("Failed to run %s: %#v", cmdName, err) - } - + message = fmt.Sprintf("Failed to run %s: %s", cmdName, err) } separator := strings.Repeat("-", 80) @@ -287,6 +514,62 @@ func printCmdError(out *output.Output, cmdName string, err error) { } } +type installFunc func(context.Context, map[string]string) error + +var installFuncs = map[string]installFunc{ + "installCaddy": func(ctx context.Context, env map[string]string) error { + version := env["CADDY_VERSION"] + if version == "" { + return errors.New("could not find CADDY_VERSION in env") + } + + root, err := root.RepositoryRoot() + if err != nil { + return err + } + + var os string + switch runtime.GOOS { + case "linux": + os = "linux" + case "darwin": + os = "mac" + } + + archiveName := fmt.Sprintf("caddy_%s_%s_%s", version, os, runtime.GOARCH) + url := fmt.Sprintf("https://github.com/caddyserver/caddy/releases/download/v%s/%s.tar.gz", version, archiveName) + + target := filepath.Join(root, fmt.Sprintf(".bin/caddy_%s", version)) + + return download.ArchivedExecutable(ctx, url, target, "caddy") + }, + "installJaeger": func(ctx context.Context, env map[string]string) error { + version := env["JAEGER_VERSION"] + + // Make sure the data folder exists. + disk := env["JAEGER_DISK"] + if err := os.MkdirAll(disk, 0755); err != nil { + return err + } + + if version == "" { + return errors.New("could not find JAEGER_VERSION in env") + } + + root, err := root.RepositoryRoot() + if err != nil { + return err + } + + archiveName := fmt.Sprintf("jaeger-%s-%s-%s", version, runtime.GOOS, runtime.GOARCH) + url := fmt.Sprintf("https://github.com/jaegertracing/jaeger/releases/download/v%s/%s.tar.gz", version, archiveName) + + target := filepath.Join(root, fmt.Sprintf(".bin/jaeger-all-in-one-%s", version)) + + return download.ArchivedExecutable(ctx, url, target, fmt.Sprintf("%s/jaeger-all-in-one", archiveName)) + }, +} + // makeEnv merges environments starting from the left, meaning the first environment will be overriden by the second one, skipping // any key that has been explicitly defined in the current environment of this process. This enables users to manually overrides // environment variables explictly, i.e FOO=1 sg start will have FOO=1 set even if a command or commandset sets FOO. @@ -358,16 +641,131 @@ func md5HashFile(filename string) (string, error) { return string(h.Sum(nil)), nil } -func Test(ctx context.Context, cmd SGConfigCommand, parentEnv map[string]string) error { +// +// + +type changeMonitor struct { + subscriptions []subscription +} + +type subscription struct { + cmd Command + ch chan struct{} +} + +func (m *changeMonitor) run(paths <-chan string) { + for path := range paths { + for _, sub := range m.subscriptions { + m.notify(sub, path) + } + } +} + +func (m *changeMonitor) notify(sub subscription, path string) { + found := false + for _, prefix := range sub.cmd.Watch { + if strings.HasPrefix(path, prefix) { + found = true + } + } + if !found { + return + } + + select { + case sub.ch <- struct{}{}: + default: + } +} + +func (m *changeMonitor) register(cmd Command) <-chan struct{} { + ch := make(chan struct{}) + m.subscriptions = append(m.subscriptions, subscription{cmd, ch}) + return ch +} + +// +// + +var watchIgnorePatterns = []*regexp.Regexp{ + regexp.MustCompile(`_test\.go$`), + regexp.MustCompile(`^.bin/`), + regexp.MustCompile(`^.git/`), + regexp.MustCompile(`^dev/`), + regexp.MustCompile(`^node_modules/`), +} + +func watch() (<-chan string, error) { + repoRoot, err := root.RepositoryRoot() + if err != nil { + return nil, err + } + + paths := make(chan string) + events := make(chan notify.EventInfo, 1) + + if err := notify.Watch(repoRoot+"/...", events, notify.All); err != nil { + return nil, err + } + + go func() { + defer close(events) + defer notify.Stop(events) + + outer: + for event := range events { + path := strings.TrimPrefix(strings.TrimPrefix(event.Path(), repoRoot), "/") + + for _, pattern := range watchIgnorePatterns { + if pattern.MatchString(path) { + continue outer + } + } + + paths <- path + } + }() + + return paths, nil +} + +func Test(ctx context.Context, cmd Command, args []string, parentEnv map[string]string) error { repoRoot, err := root.RepositoryRoot() if err != nil { return err } - std.Out.WriteLine(output.Styledf(output.StylePending, "Starting testsuite %q.", cmd.GetName())) - proc, err := startSgCmd(ctx, cmd, repoRoot, parentEnv) - if err != nil { - printCmdError(std.Out.Output, cmd.GetName(), err) + std.Out.WriteLine(output.Styledf(output.StylePending, "Starting testsuite %q.", cmd.Name)) + if len(args) != 0 { + std.Out.WriteLine(output.Styledf(output.StylePending, "\tAdditional arguments: %s", args)) } - return proc.Wait() + commandCtx, cancel := context.WithCancel(ctx) + defer cancel() + + cmdArgs := []string{cmd.Cmd} + if len(args) != 0 { + cmdArgs = append(cmdArgs, args...) + } else { + cmdArgs = append(cmdArgs, cmd.DefaultArgs) + } + + secretsEnv, err := getSecrets(ctx, cmd.Name, cmd.ExternalSecrets) + if err != nil { + std.Out.WriteLine(output.Styledf(output.StyleWarning, "[%s] %s %s", + cmd.Name, output.EmojiFailure, err.Error())) + } + + if cmd.Preamble != "" { + std.Out.WriteLine(output.Styledf(output.StyleOrange, "[%s] %s %s", cmd.Name, output.EmojiInfo, cmd.Preamble)) + } + + c := exec.CommandContext(commandCtx, "bash", "-c", strings.Join(cmdArgs, " ")) + c.Dir = repoRoot + c.Env = makeEnv(parentEnv, secretsEnv, cmd.Env) + c.Stdout = os.Stdout + c.Stderr = os.Stderr + + std.Out.WriteLine(output.Styledf(output.StylePending, "Running %s in %q...", c, repoRoot)) + + return c.Run() } diff --git a/dev/sg/internal/run/run_bazel.go b/dev/sg/internal/run/run_bazel.go new file mode 100644 index 00000000000..76d226ba458 --- /dev/null +++ b/dev/sg/internal/run/run_bazel.go @@ -0,0 +1,73 @@ +package run + +import ( + "context" + "fmt" + "os/exec" + "strings" + + "github.com/sourcegraph/conc/pool" + + "github.com/sourcegraph/sourcegraph/dev/sg/root" +) + +func outputPath() ([]byte, error) { + // Get the output directory from Bazel, which varies depending on which OS + // we're running against. + cmd := exec.Command("bazel", "info", "output_path") + return cmd.Output() +} + +// binLocation returns the path on disk where Bazel is putting the binary +// associated with a given target. +func binLocation(target string) (string, error) { + baseOutput, err := outputPath() + if err != nil { + return "", err + } + // Trim "bazel-out" because the next bazel query will include it. + outputPath := strings.TrimSuffix(strings.TrimSpace(string(baseOutput)), "bazel-out") + + // Get the binary from the specific target. + cmd := exec.Command("bazel", "cquery", target, "--output=files") + baseOutput, err = cmd.Output() + if err != nil { + return "", err + } + binPath := strings.TrimSpace(string(baseOutput)) + + return fmt.Sprintf("%s%s", outputPath, binPath), nil +} + +func BazelCommands(ctx context.Context, parentEnv map[string]string, verbose bool, cmds ...BazelCommand) error { + if len(cmds) == 0 { + // no Bazel commands so we return + return nil + } + + repoRoot, err := root.RepositoryRoot() + if err != nil { + return err + } + + var targets []string + for _, cmd := range cmds { + targets = append(targets, cmd.Target) + } + + ibazel := newIBazel(repoRoot, targets...) + + p := pool.New().WithContext(ctx).WithCancelOnError() + p.Go(func(ctx context.Context) error { + return ibazel.Start(ctx, repoRoot) + }) + + for _, bc := range cmds { + bc := bc + p.Go(func(ctx context.Context) error { + return bc.Start(ctx, repoRoot, parentEnv) + }) + } + + return p.Wait() +} diff --git a/dev/sg/internal/run/sgconfig_command.go b/dev/sg/internal/run/sgconfig_command.go deleted file mode 100644 index ea44e473b62..00000000000 --- a/dev/sg/internal/run/sgconfig_command.go +++ /dev/null @@ -1,91 +0,0 @@ -package run - -import ( - "context" - "fmt" - "os/exec" - - "github.com/rjeczalik/notify" - - "github.com/sourcegraph/sourcegraph/dev/sg/internal/secrets" -) - -type SGConfigCommand interface { - // Getters for common fields - GetName() string - GetContinueWatchOnExit() bool - GetIgnoreStdout() bool - GetIgnoreStderr() bool - GetPreamble() string - GetEnv() map[string]string - GetBinaryLocation() (string, error) - GetExternalSecrets() map[string]secrets.ExternalSecret - GetExecCmd(context.Context) (*exec.Cmd, error) - - // Start a file watcher on the relevant filesystem sub-tree for this command - StartWatch(context.Context) (<-chan struct{}, error) -} - -func WatchPaths(ctx context.Context, paths []string, skipEvents ...notify.Event) (<-chan struct{}, error) { - // Set up the watchers. - restart := make(chan struct{}) - events := make(chan notify.EventInfo, 1) - skip := make(map[notify.Event]struct{}, len(skipEvents)) - for _, event := range skipEvents { - skip[event] = struct{}{} - } - - // Do nothing if no watch paths are configured - if len(paths) == 0 { - return restart, nil - } - - for _, path := range paths { - if err := notify.Watch(path+"/...", events, notify.All); err != nil { - return nil, err - } - } - - // Start watching for changes to the source tree - go func() { - defer close(events) - defer notify.Stop(events) - - for { - select { - case <-ctx.Done(): - return - case evt := <-events: - if _, shouldSkip := skip[evt.Event()]; !shouldSkip { - restart <- struct{}{} - } - } - - } - }() - - return restart, nil -} - -type noBinaryError struct { - name string - err error -} - -func (e noBinaryError) Error() string { - return fmt.Sprintf("no-binary-error: %s has no binary", e.name) -} - -func (e noBinaryError) Unwrap() error { - return e.err -} - -func (e noBinaryError) Wrap(err error) error { - e.err = err - return e -} - -func (e noBinaryError) Is(target error) bool { - _, ok := target.(noBinaryError) - return ok -} diff --git a/dev/sg/sg_run.go b/dev/sg/sg_run.go index a6d52df6c5a..a30fe02916e 100644 --- a/dev/sg/sg_run.go +++ b/dev/sg/sg_run.go @@ -92,36 +92,48 @@ func runExec(ctx *cli.Context) error { return flag.ErrHelp } - cmds := make([]run.SGConfigCommand, 0, len(args)) + var cmds []run.Command + var bcmds []run.BazelCommand for _, arg := range args { - if bazelCmd, ok := config.BazelCommands[arg]; ok && !legacy { - cmds = append(cmds, bazelCmd) - } else if cmd, ok := config.Commands[arg]; ok { - cmds = append(cmds, cmd) + if bazelCmd, okB := config.BazelCommands[arg]; okB && !legacy { + bcmds = append(bcmds, bazelCmd) } else { - std.Out.WriteLine(output.Styledf(output.StyleWarning, "ERROR: command %q not found :(", arg)) - return flag.ErrHelp + cmd, okC := config.Commands[arg] + if !okC && !okB { + std.Out.WriteLine(output.Styledf(output.StyleWarning, "ERROR: command %q not found :(", arg)) + return flag.ErrHelp + } + cmds = append(cmds, cmd) } } if ctx.Bool("describe") { + // TODO Bazel commands for _, cmd := range cmds { out, err := yaml.Marshal(cmd) if err != nil { return err } - if err = std.Out.WriteMarkdown(fmt.Sprintf("# %s\n\n```yaml\n%s\n```\n\n", cmd.GetName(), string(out))); err != nil { - return err - } + std.Out.WriteMarkdown(fmt.Sprintf("# %s\n\n```yaml\n%s\n```\n\n", cmd.Name, string(out))) } return nil } + if !legacy { + // First we build everything once, to ensure all binaries are present. + if err := run.BazelBuild(ctx.Context, bcmds...); err != nil { + return err + } + } + p := pool.New().WithContext(ctx.Context).WithCancelOnError() p.Go(func(ctx context.Context) error { return run.Commands(ctx, config.Env, verbose, cmds...) }) + p.Go(func(ctx context.Context) error { + return run.BazelCommands(ctx, config.Env, verbose, bcmds...) + }) return p.Wait() } diff --git a/dev/sg/sg_start.go b/dev/sg/sg_start.go index 960e473621e..f9faeeecd4a 100644 --- a/dev/sg/sg_start.go +++ b/dev/sg/sg_start.go @@ -8,8 +8,10 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" + "github.com/sourcegraph/conc/pool" sgrun "github.com/sourcegraph/run" "github.com/urfave/cli/v2" "gopkg.in/yaml.v3" @@ -176,6 +178,8 @@ func constructStartCmdLongHelp() string { return out.String() } +var sgOnce sync.Once + func startExec(ctx *cli.Context) error { config, err := getConfig() if err != nil { @@ -301,21 +305,52 @@ func startCommandSet(ctx context.Context, set *sgconf.Commandset, conf *sgconf.C return err } - repoRoot, err := root.RepositoryRoot() - if err != nil { - return err + exceptList := exceptServices.Value() + exceptSet := make(map[string]interface{}, len(exceptList)) + for _, svc := range exceptList { + exceptSet[svc] = struct{}{} } - cmds, err := getCommands(set.Commands, set, conf.Commands) - if err != nil { - return err + onlyList := onlyServices.Value() + onlySet := make(map[string]interface{}, len(onlyList)) + for _, svc := range onlyList { + onlySet[svc] = struct{}{} } - bcmds, err := getCommands(set.BazelCommands, set, conf.BazelCommands) - if err != nil { - return err + cmds := make([]run.Command, 0, len(set.Commands)) + for _, name := range set.Commands { + cmd, ok := conf.Commands[name] + if !ok { + return errors.Errorf("command %q not found in commandset %q", name, set.Name) + } + + if _, excluded := exceptSet[name]; excluded { + std.Out.WriteLine(output.Styledf(output.StylePending, "Skipping command %s since it's in --except.", cmd.Name)) + continue + } + + // No --only specified, just add command + if len(onlySet) == 0 { + cmds = append(cmds, cmd) + } else { + if _, inSet := onlySet[name]; inSet { + cmds = append(cmds, cmd) + } else { + std.Out.WriteLine(output.Styledf(output.StylePending, "Skipping command %s since it's not included in --only.", cmd.Name)) + } + } + } + bcmds := make([]run.BazelCommand, 0, len(set.BazelCommands)) + for _, name := range set.BazelCommands { + bcmd, ok := conf.BazelCommands[name] + if !ok { + return errors.Errorf("command %q not found in commandset %q", name, set.Name) + } + + bcmds = append(bcmds, bcmd) + } if len(cmds) == 0 && len(bcmds) == 0 { std.Out.WriteLine(output.Styled(output.StyleWarning, "WARNING: no commands to run")) return nil @@ -331,77 +366,20 @@ func startCommandSet(ctx context.Context, set *sgconf.Commandset, conf *sgconf.C env[k] = v } - installers := make([]run.Installer, 0, len(cmds)+1) - for _, cmd := range cmds { - installers = append(installers, cmd) - } - - var ibazel *run.IBazel - if len(bcmds) > 0 { - ibazel, err = run.NewIBazel(bcmds, repoRoot) - if err != nil { - return err - } - defer ibazel.Close() - installers = append(installers, ibazel) - } - if err := run.Install(ctx, env, verbose, installers...); err != nil { + // First we build everything once, to ensure all binaries are present. + if err := run.BazelBuild(ctx, bcmds...); err != nil { return err } - if ibazel != nil { - ibazel.StartOutput() - } + p := pool.New().WithContext(ctx).WithCancelOnError() + p.Go(func(ctx context.Context) error { + return run.Commands(ctx, env, verbose, cmds...) + }) + p.Go(func(ctx context.Context) error { + return run.BazelCommands(ctx, env, verbose, bcmds...) + }) - configCmds := make([]run.SGConfigCommand, 0, len(bcmds)+len(cmds)) - for _, cmd := range bcmds { - configCmds = append(configCmds, cmd) - } - - for _, cmd := range cmds { - configCmds = append(configCmds, cmd) - } - return run.Commands(ctx, env, verbose, configCmds...) -} - -func getCommands[T run.SGConfigCommand](commands []string, set *sgconf.Commandset, conf map[string]T) ([]T, error) { - exceptList := exceptServices.Value() - exceptSet := make(map[string]interface{}, len(exceptList)) - for _, svc := range exceptList { - exceptSet[svc] = struct{}{} - } - - onlyList := onlyServices.Value() - onlySet := make(map[string]interface{}, len(onlyList)) - for _, svc := range onlyList { - onlySet[svc] = struct{}{} - } - - cmds := make([]T, 0, len(commands)) - for _, name := range commands { - cmd, ok := conf[name] - if !ok { - return nil, errors.Errorf("command %q not found in commandset %q", name, set.Name) - } - - if _, excluded := exceptSet[name]; excluded { - std.Out.WriteLine(output.Styledf(output.StylePending, "Skipping command %s since it's in --except.", cmd.GetName())) - continue - } - - // No --only specified, just add command - if len(onlySet) == 0 { - cmds = append(cmds, cmd) - } else { - if _, inSet := onlySet[name]; inSet { - cmds = append(cmds, cmd) - } else { - std.Out.WriteLine(output.Styledf(output.StylePending, "Skipping command %s since it's not included in --only.", cmd.GetName())) - } - } - - } - return cmds, nil + return p.Wait() } // logLevelOverrides builds a map of commands -> log level that should be overridden in the environment. diff --git a/dev/sg/sg_start_test.go b/dev/sg/sg_start_test.go index 1f952057f9d..97c400eaab4 100644 --- a/dev/sg/sg_start_test.go +++ b/dev/sg/sg_start_test.go @@ -46,7 +46,6 @@ func TestStartCommandSet(t *testing.T) { "", "✅ Everything installed! Booting up the system!", "", - "Starting 1 cmds", "Running test-cmd-1...", "[ test-cmd-1] horsegraph booted up. mount your horse.", "[ test-cmd-1] quitting. not horsing around anymore.", diff --git a/dev/sg/sg_tests.go b/dev/sg/sg_tests.go index 06a95c7eb68..1f1946de5b6 100644 --- a/dev/sg/sg_tests.go +++ b/dev/sg/sg_tests.go @@ -1,10 +1,8 @@ package main import ( - "context" "flag" "fmt" - "os/exec" "sort" "strings" @@ -73,7 +71,7 @@ func testExec(ctx *cli.Context) error { return flag.ErrHelp } - return run.Test(ctx.Context, newSGTestCommand(cmd, args[1:]), config.Env) + return run.Test(ctx.Context, cmd, args[1:], config.Env) } func constructTestCmdLongHelp() string { @@ -104,28 +102,3 @@ func constructTestCmdLongHelp() string { return out.String() } - -type sgTestCommand struct { - run.Command - args []string -} - -// Ovrrides the GetExecCmd method with a custom implementation to construct the command -// using CLI-passed arguments -func (test sgTestCommand) GetExecCmd(ctx context.Context) (*exec.Cmd, error) { - cmdArgs := []string{test.Command.Cmd} - if len(test.args) != 0 { - cmdArgs = append(cmdArgs, test.args...) - } else { - cmdArgs = append(cmdArgs, test.Command.DefaultArgs) - } - - return exec.CommandContext(ctx, "bash", "-c", strings.Join(cmdArgs, " ")), nil -} - -func newSGTestCommand(cmd run.Command, args []string) sgTestCommand { - return sgTestCommand{ - Command: cmd, - args: args, - } -} diff --git a/go.mod b/go.mod index 10eb5a74d0a..83ac9ac72ab 100644 --- a/go.mod +++ b/go.mod @@ -267,7 +267,6 @@ require ( github.com/invopop/jsonschema v0.12.0 github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/mroth/weightedrand/v2 v2.0.1 - github.com/nxadm/tail v1.4.11 github.com/oschwald/maxminddb-golang v1.12.0 github.com/pkoukk/tiktoken-go v0.1.6 github.com/prometheus/statsd_exporter v0.22.7 @@ -393,7 +392,6 @@ require ( go.uber.org/goleak v1.3.0 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gotest.tools/v3 v3.5.1 // indirect ) diff --git a/go.sum b/go.sum index b5e9cddb611..06e8532c1fc 100644 --- a/go.sum +++ b/go.sum @@ -530,7 +530,6 @@ github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUork github.com/fsnotify/fsnotify v1.4.3-0.20170329110642-4da3e2cfbabc/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fullstorydev/grpcui v1.3.1 h1:lVXozTNkJJouBL+wpmvxMnltiwYp8mgyd0TRs93i6Rw= @@ -1394,9 +1393,8 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d h1:VhgPp6v9qf9Agr/56bj7Y/xa04UccTW04VP0Qed4vnQ= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= -github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= -github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= @@ -2235,7 +2233,6 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220906165534-d0df966e6959/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/sg.config.yaml b/sg.config.yaml index 383a63988e1..b97a3b431f2 100644 --- a/sg.config.yaml +++ b/sg.config.yaml @@ -602,6 +602,8 @@ commands: env: SYNTACTIC_CODE_INTEL_WORKER_ADDR: 127.0.0.1:6076 + + executor-template: &executor_template # TMPDIR is set here so it's not set in the `install` process, which would trip up `go build`. cmd: | @@ -1008,9 +1010,7 @@ commands: bazelCommands: blobstore: - target: //cmd/blobstore - docsite: - runTarget: //doc:serve + target: //cmd/blobstore:blobstore searcher: target: //cmd/searcher syntax-highlighter: @@ -1117,7 +1117,6 @@ commandsets: - ibazel bazelCommands: - blobstore - - docsite - frontend - worker - repo-updater @@ -1125,15 +1124,16 @@ commandsets: - gitserver-1 - searcher - symbols - # - syntax-highlighter + - syntax-highlighter commands: - web + # TODO https://github.com/sourcegraph/devx-support/issues/537 + # - docsite - zoekt-index-0 - zoekt-index-1 - zoekt-web-0 - zoekt-web-1 - caddy - # If you modify this command set, please consider also updating the dotcom runset. enterprise: &enterprise_set requiresDevPrivate: true @@ -1255,6 +1255,7 @@ commandsets: - syntactic-code-intel-worker-0 - syntactic-code-intel-worker-1 + codeintel: requiresDevPrivate: true checks: