Stream ExecutionLogEntries from executor (#22884)

Instead of writing all entries to the database once the job is finished we write them continuously.

Co-authored-by: Thorsten Ball <mrnugget@gmail.com>
This commit is contained in:
Erik Seliger 2021-07-22 17:09:36 +02:00 committed by GitHub
parent c255ca770c
commit e807a2c0ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 32 deletions

View File

@ -22,11 +22,6 @@ type commandRunner interface {
const firecrackerContainerDir = "/work"
var commonFirecrackerFlags = []string{
"--runtime", "docker",
"--network-plugin", "docker-bridge",
}
// formatFirecrackerCommand constructs the command to run on the host via a Firecracker
// virtual machine in order to invoke the given spec. If the spec specifies an image, then
// the command will be run inside of a container inside of the VM. Otherwise, the command
@ -119,7 +114,8 @@ func setupFirecracker(ctx context.Context, runner commandRunner, logger *Logger,
Key: "setup.firecracker.start",
Command: flatten(
"ignite", "run",
commonFirecrackerFlags,
"--runtime", "docker",
"--network-plugin", "docker-bridge",
firecrackerResourceFlags(options.ResourceOptions),
firecrackerCopyfileFlags(repoDir, imageKeys, options),
"--ssh",
@ -215,6 +211,7 @@ func firecrackerCopyfileFlags(dir string, imageKeys []string, options Options) [
return intersperse("--copy-files", copyfiles)
}
// NOTE: The options.FirecreackerOptions.ImageArchivesPath needs to exist on the host
func tarfilePathOnHost(key string, options Options) string {
return filepath.Join(options.FirecrackerOptions.ImageArchivesPath, fmt.Sprintf("%s.tar", key))
}

View File

@ -1,41 +1,85 @@
package command
import (
"context"
"strings"
"github.com/inconshreveable/log15"
"github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
)
type executionLogEntryStore interface {
AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry) error
}
// Logger tracks command invocations and stores the command's output and
// error stream values.
type Logger struct {
store executionLogEntryStore
done chan struct{}
entries chan workerutil.ExecutionLogEntry
job executor.Job
record workerutil.Record
replacer *strings.Replacer
entries []workerutil.ExecutionLogEntry
}
// NewLogger creates a new logger instance with the given replacement map.
// When the log messages are serialized, any occurrence of sensitive values
// are replace with a non-sensitive value.
func NewLogger(replacements map[string]string) *Logger {
// logEntryBufSize is the maximum number of log entries that are logged by the
// task execution but not yet written to the database.
const logEntryBufsize = 50
// NewLogger creates a new logger instance with the given store, job, record,
// and replacement map.
// When the log messages are serialized, any occurrence of sensitive values are
// replace with a non-sensitive value.
// Each log message is written to the store in a goroutine. The Flush method
// must be called to ensure all entries are written.
func NewLogger(store executionLogEntryStore, job executor.Job, record workerutil.Record, replacements map[string]string) *Logger {
oldnew := make([]string, 0, len(replacements)*2)
for k, v := range replacements {
oldnew = append(oldnew, k, v)
}
return &Logger{
l := &Logger{
store: store,
job: job,
record: record,
done: make(chan struct{}),
entries: make(chan workerutil.ExecutionLogEntry, logEntryBufsize),
replacer: strings.NewReplacer(oldnew...),
}
go l.writeEntries()
return l
}
// Flush waits until all entries have been written to the store.
func (l *Logger) Flush() {
close(l.entries)
<-l.done
}
// Log redacts secrets from the given log entry and stores it.
func (l *Logger) Log(entry workerutil.ExecutionLogEntry) {
l.entries = append(l.entries, redact(entry, l.replacer))
redactedEntry := redact(entry, l.replacer)
l.entries <- redactedEntry
}
// Entries returns a copy of the stored log entries.
func (l *Logger) Entries() (entries []workerutil.ExecutionLogEntry) {
entries = append(entries, l.entries...)
return entries
func (l *Logger) writeEntries() {
defer func() { close(l.done) }()
for entry := range l.entries {
log15.Info("Writing log entry", "jobID", l.job.ID, "repositoryName", l.job.RepositoryName, "commit", l.job.Commit)
// Perform this outside of the task execution context. If there is a timeout or
// cancellation error we don't want to skip uploading these logs as users will
// often want to see how far something progressed prior to a timeout.
if err := l.store.AddExecutionLogEntry(context.Background(), l.record.RecordID(), entry); err != nil {
log15.Warn("Failed to upload executor log entry for job", "id", l.record.RecordID(), "repositoryName", l.job.RepositoryName, "commit", l.job.Commit, "error", err)
}
}
}
func redact(entry workerutil.ExecutionLogEntry, replacer *strings.Replacer) workerutil.ExecutionLogEntry {

View File

@ -67,20 +67,8 @@ func (h *handler) Handle(ctx context.Context, record workerutil.Record) (err err
// interpolate into the command. No command that we run on the host leaks environment
// variables, and the user-specified commands (which could leak their environment) are
// run in a clean VM.
logger := command.NewLogger(union(h.options.RedactedValues, job.RedactedValues))
defer func() {
log15.Info("Writing log entries", "jobID", job.ID, "repositoryName", job.RepositoryName, "commit", job.Commit)
for _, entry := range logger.Entries() {
// Perform this outside of the task execution context. If there is a timeout or
// cancellation error we don't want to skip uploading these logs as users will
// often want to see how far something progressed prior to a timeout.
if err := h.store.AddExecutionLogEntry(context.Background(), record.RecordID(), entry); err != nil {
log15.Warn("Failed to upload executor log entry for job", "id", record.RecordID(), "repositoryName", job.RepositoryName, "commit", job.Commit, "error", err)
}
}
}()
logger := command.NewLogger(h.store, job, record, union(h.options.RedactedValues, job.RedactedValues))
defer logger.Flush()
// Create a working directory for this job which will be removed once the job completes.
// If a repository is supplied as part of the job configuration, it will be cloned into

View File

@ -415,7 +415,8 @@ commands:
cmd: |
env TMPDIR="$HOME/.sourcegraph/executor-temp" .bin/executor
install: |
go build -o .bin/executor github.com/sourcegraph/sourcegraph/enterprise/cmd/executor
go build -o .bin/executor github.com/sourcegraph/sourcegraph/enterprise/cmd/executor &&
mkdir -p $EXECUTOR_IMAGE_ARCHIVE_PATH
checkBinary: .bin/executor
env:
EXECUTOR_QUEUE_NAME: TEMPLATE
@ -443,6 +444,20 @@ commands:
EXECUTOR_HEALTH_SERVER_PORT: "3193"
SRC_PROF_HTTP: ":6093"
# If you want to use this, either start it with `sg run batches-executor-firecracker` or
# modify the `commandsets.batches` in your local `sg.config.overwrite.yaml`
batches-executor-firecracker:
<<: *executor_template
cmd: |
env TMPDIR="$HOME/.sourcegraph/batches-executor-temp" \
sudo --preserve-env=TMPDIR,EXECUTOR_QUEUE_NAME,EXECUTOR_HEALTH_SERVER_PORT,SRC_PROF_HTTP,EXECUTOR_FRONTEND_URL,EXECUTOR_FRONTEND_USERNAME,EXECUTOR_FRONTEND_PASSWORD,EXECUTOR_QUEUE_URL,EXECUTOR_USE_FIRECRACKER,EXECUTOR_IMAGE_ARCHIVE_PATH \
.bin/executor
env:
EXECUTOR_USE_FIRECRACKER: true
EXECUTOR_QUEUE_NAME: batches
EXECUTOR_HEALTH_SERVER_PORT: "3193"
SRC_PROF_HTTP: ":6093"
minio:
cmd: |
docker inspect $CONTAINER >/dev/null 2>&1 && docker rm -f $CONTAINER