From 968bea1eeacf150caa03e2ddddc23949bf64e99a Mon Sep 17 00:00:00 2001 From: Erik Seliger Date: Mon, 5 Jul 2021 12:27:40 +0200 Subject: [PATCH] Implement executor queue for batch changes executor (#22555) This commit contains several smaller pieces: - Run executor-queue as part of enterprise - Make health server port configurable Otherwise, it's impossible to run two executors on the same machine (required for dev when running code intel AND batch changes). - Make port for executor queue configurable - Add batch spec execution DB entity - Overpromise implementation of batches queue in executor-queue - Add shared config for reusing env vars across queues These fields will also be required in the batches queue, so we need to make them sharable. Defining them in both places causes a validation error. - Add batch spec executions queue to executor-queue - Add resetter for executor worker --- enterprise/cmd/executor-queue/README.md | 1 + enterprise/cmd/executor-queue/config.go | 6 +- .../executor-queue/internal/config/shared.go | 18 ++ .../internal/queues/batches/config.go | 14 ++ .../internal/queues/batches/queue.go | 25 +++ .../internal/queues/batches/transform.go | 63 +++++++ .../internal/queues/batches/transform_test.go | 53 ++++++ .../internal/queues/codeintel/config.go | 15 +- .../internal/queues/codeintel/transform.go | 8 +- .../queues/codeintel/transform_test.go | 17 +- enterprise/cmd/executor-queue/main.go | 5 +- enterprise/cmd/executor/config.go | 2 + enterprise/cmd/executor/main.go | 5 +- .../internal/batches/background/background.go | 15 +- .../batches/background/execution_resetter.go | 24 +++ .../batches/background/executor_store.go | 48 +++++ .../internal/batches/background/metrics.go | 13 +- .../batches/store/batch_spec_execution.go | 174 ++++++++++++++++++ .../batches/types/batch_spec_execution.go | 38 ++++ internal/database/schema.md | 26 +++ ...95843_batch_spec_executions_table.down.sql | 5 + ...8395843_batch_spec_executions_table.up.sql | 22 +++ sg.config.yaml | 1 + 23 files changed, 561 insertions(+), 37 deletions(-) create mode 100644 enterprise/cmd/executor-queue/internal/config/shared.go create mode 100644 enterprise/cmd/executor-queue/internal/queues/batches/config.go create mode 100644 enterprise/cmd/executor-queue/internal/queues/batches/queue.go create mode 100644 enterprise/cmd/executor-queue/internal/queues/batches/transform.go create mode 100644 enterprise/cmd/executor-queue/internal/queues/batches/transform_test.go create mode 100644 enterprise/internal/batches/background/execution_resetter.go create mode 100644 enterprise/internal/batches/background/executor_store.go create mode 100644 enterprise/internal/batches/store/batch_spec_execution.go create mode 100644 enterprise/internal/batches/types/batch_spec_execution.go create mode 100644 migrations/frontend/1528395843_batch_spec_executions_table.down.sql create mode 100644 migrations/frontend/1528395843_batch_spec_executions_table.up.sql diff --git a/enterprise/cmd/executor-queue/README.md b/enterprise/cmd/executor-queue/README.md index 5c6b658dcb3..97403249665 100644 --- a/enterprise/cmd/executor-queue/README.md +++ b/enterprise/cmd/executor-queue/README.md @@ -5,3 +5,4 @@ The executor-queue service maintains the executor work queues. Executor instance ## Work queues - The `codeintel` queue contains unprocessed lsif_index records +- The `batches` queue contains unprocessed batch_spec_execution records diff --git a/enterprise/cmd/executor-queue/config.go b/enterprise/cmd/executor-queue/config.go index cd8cae30dc1..cad322801eb 100644 --- a/enterprise/cmd/executor-queue/config.go +++ b/enterprise/cmd/executor-queue/config.go @@ -7,11 +7,10 @@ import ( "github.com/sourcegraph/sourcegraph/internal/env" ) -const port = 3191 - type Config struct { env.BaseConfig + Port int MaximumNumTransactions int JobRequeueDelay time.Duration JobCleanupInterval time.Duration @@ -19,6 +18,7 @@ type Config struct { } func (c *Config) Load() { + c.Port = c.GetInt("EXECUTOR_QUEUE_PORT", "3191", "The port to listen on.") c.MaximumNumTransactions = c.GetInt("EXECUTOR_QUEUE_MAXIMUM_NUM_TRANSACTIONS", "10", "Number of jobs that can be processing at one time.") c.JobRequeueDelay = c.GetInterval("EXECUTOR_QUEUE_JOB_REQUEUE_DELAY", "1m", "The requeue delay of jobs assigned to an unreachable executor.") c.JobCleanupInterval = c.GetInterval("EXECUTOR_QUEUE_JOB_CLEANUP_INTERVAL", "10s", "Interval between cleanup runs.") @@ -27,7 +27,7 @@ func (c *Config) Load() { func (c *Config) ServerOptions(queueOptions map[string]apiserver.QueueOptions) apiserver.Options { return apiserver.Options{ - Port: port, + Port: c.Port, QueueOptions: queueOptions, MaximumNumTransactions: c.MaximumNumTransactions, RequeueDelay: c.JobRequeueDelay, diff --git a/enterprise/cmd/executor-queue/internal/config/shared.go b/enterprise/cmd/executor-queue/internal/config/shared.go new file mode 100644 index 00000000000..e636967cf68 --- /dev/null +++ b/enterprise/cmd/executor-queue/internal/config/shared.go @@ -0,0 +1,18 @@ +package config + +import "github.com/sourcegraph/sourcegraph/internal/env" + +// SharedConfig defines common items that are used by multiple queues. +type SharedConfig struct { + env.BaseConfig + + FrontendURL string + FrontendUsername string + FrontendPassword string +} + +func (c *SharedConfig) Load() { + c.FrontendURL = c.Get("EXECUTOR_FRONTEND_URL", "", "The external URL of the sourcegraph instance.") + c.FrontendUsername = c.Get("EXECUTOR_FRONTEND_USERNAME", "", "The username supplied to the frontend.") + c.FrontendPassword = c.Get("EXECUTOR_FRONTEND_PASSWORD", "", "The password supplied to the frontend.") +} diff --git a/enterprise/cmd/executor-queue/internal/queues/batches/config.go b/enterprise/cmd/executor-queue/internal/queues/batches/config.go new file mode 100644 index 00000000000..32d31088173 --- /dev/null +++ b/enterprise/cmd/executor-queue/internal/queues/batches/config.go @@ -0,0 +1,14 @@ +package batches + +import ( + "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/config" + "github.com/sourcegraph/sourcegraph/internal/env" +) + +type Config struct { + env.BaseConfig + + Shared *config.SharedConfig +} + +func (c *Config) Load() {} diff --git a/enterprise/cmd/executor-queue/internal/queues/batches/queue.go b/enterprise/cmd/executor-queue/internal/queues/batches/queue.go new file mode 100644 index 00000000000..3ee6ceaa019 --- /dev/null +++ b/enterprise/cmd/executor-queue/internal/queues/batches/queue.go @@ -0,0 +1,25 @@ +package batches + +import ( + "database/sql" + + apiserver "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/server" + "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/background" + btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types" + apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/dbutil" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/workerutil" +) + +func QueueOptions(db dbutil.DB, config *Config, observationContext *observation.Context) apiserver.QueueOptions { + recordTransformer := func(record workerutil.Record) (apiclient.Job, error) { + return transformRecord(record.(*btypes.BatchSpecExecution), config) + } + + return apiserver.QueueOptions{ + Store: background.NewExecutorStore(basestore.NewWithDB(db, sql.TxOptions{}), observationContext), + RecordTransformer: recordTransformer, + } +} diff --git a/enterprise/cmd/executor-queue/internal/queues/batches/transform.go b/enterprise/cmd/executor-queue/internal/queues/batches/transform.go new file mode 100644 index 00000000000..fbfdd7eac8e --- /dev/null +++ b/enterprise/cmd/executor-queue/internal/queues/batches/transform.go @@ -0,0 +1,63 @@ +package batches + +import ( + "fmt" + "net/url" + + btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types" + apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor" +) + +// transformRecord transforms a *btypes.BatchSpecExecution into an apiclient.Job. +func transformRecord(exec *btypes.BatchSpecExecution, config *Config) (apiclient.Job, error) { + srcEndpoint, err := makeURL(config.Shared.FrontendURL, config.Shared.FrontendUsername, config.Shared.FrontendPassword) + if err != nil { + return apiclient.Job{}, err + } + + redactedSrcEndpoint, err := makeURL(config.Shared.FrontendURL, "USERNAME_REMOVED", "PASSWORD_REMOVED") + if err != nil { + return apiclient.Job{}, err + } + + return apiclient.Job{ + ID: int(exec.ID), + VirtualMachineFiles: map[string]string{"spec.yml": exec.BatchSpec}, + CliSteps: []apiclient.CliStep{ + { + Commands: []string{ + "batch", + "preview", + "-f", "spec.yml", + }, + Dir: ".", + Env: []string{ + fmt.Sprintf("SRC_ENDPOINT=%s", srcEndpoint), + }, + }, + }, + RedactedValues: map[string]string{ + // 🚨 SECURITY: Catch leak of upload endpoint. This is necessary in addition + // to the below in case the username or password contains illegal URL characters, + // which are then urlencoded and are not replaceable via byte comparison. + srcEndpoint: redactedSrcEndpoint, + + // 🚨 SECURITY: Catch uses of fragments pulled from URL to construct another target + // (in src-cli). We only pass the constructed URL to src-cli, which we trust not to + // ship the values to a third party, but not to trust to ensure the values are absent + // from the command's stdout or stderr streams. + config.Shared.FrontendUsername: "USERNAME_REMOVED", + config.Shared.FrontendPassword: "PASSWORD_REMOVED", + }, + }, nil +} + +func makeURL(base, username, password string) (string, error) { + u, err := url.Parse(base) + if err != nil { + return "", err + } + + u.User = url.UserPassword(username, password) + return u.String(), nil +} diff --git a/enterprise/cmd/executor-queue/internal/queues/batches/transform_test.go b/enterprise/cmd/executor-queue/internal/queues/batches/transform_test.go new file mode 100644 index 00000000000..fc33c4bb96d --- /dev/null +++ b/enterprise/cmd/executor-queue/internal/queues/batches/transform_test.go @@ -0,0 +1,53 @@ +package batches + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + + "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/config" + btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types" + apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor" +) + +func TestTransformRecord(t *testing.T) { + testBatchSpec := `batchSpec: yeah` + index := &btypes.BatchSpecExecution{ + ID: 42, + BatchSpec: testBatchSpec, + } + config := &Config{ + Shared: &config.SharedConfig{ + FrontendURL: "https://test.io", + FrontendUsername: "test*", + FrontendPassword: "hunter2", + }, + } + + job, err := transformRecord(index, config) + if err != nil { + t.Fatalf("unexpected error transforming record: %s", err) + } + + expected := apiclient.Job{ + ID: 42, + VirtualMachineFiles: map[string]string{"spec.yml": testBatchSpec}, + CliSteps: []apiclient.CliStep{ + { + Commands: []string{ + "batch", "preview", "-f", "spec.yml", + }, + Dir: ".", + Env: []string{"SRC_ENDPOINT=https://test%2A:hunter2@test.io"}, + }, + }, + RedactedValues: map[string]string{ + "https://test%2A:hunter2@test.io": "https://USERNAME_REMOVED:PASSWORD_REMOVED@test.io", + "test*": "USERNAME_REMOVED", + "hunter2": "PASSWORD_REMOVED", + }, + } + if diff := cmp.Diff(expected, job); diff != "" { + t.Errorf("unexpected job (-want +got):\n%s", diff) + } +} diff --git a/enterprise/cmd/executor-queue/internal/queues/codeintel/config.go b/enterprise/cmd/executor-queue/internal/queues/codeintel/config.go index de488de5ded..5c5540faabe 100644 --- a/enterprise/cmd/executor-queue/internal/queues/codeintel/config.go +++ b/enterprise/cmd/executor-queue/internal/queues/codeintel/config.go @@ -1,17 +1,14 @@ package codeintel -import "github.com/sourcegraph/sourcegraph/internal/env" +import ( + "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/config" + "github.com/sourcegraph/sourcegraph/internal/env" +) type Config struct { env.BaseConfig - FrontendURL string - FrontendUsername string - FrontendPassword string + Shared *config.SharedConfig } -func (c *Config) Load() { - c.FrontendURL = c.Get("EXECUTOR_FRONTEND_URL", "", "The external URL of the sourcegraph instance.") - c.FrontendUsername = c.Get("EXECUTOR_FRONTEND_USERNAME", "", "The username supplied to the frontend.") - c.FrontendPassword = c.Get("EXECUTOR_FRONTEND_PASSWORD", "", "The password supplied to the frontend.") -} +func (c *Config) Load() {} diff --git a/enterprise/cmd/executor-queue/internal/queues/codeintel/transform.go b/enterprise/cmd/executor-queue/internal/queues/codeintel/transform.go index f1982218de9..0d74664e76e 100644 --- a/enterprise/cmd/executor-queue/internal/queues/codeintel/transform.go +++ b/enterprise/cmd/executor-queue/internal/queues/codeintel/transform.go @@ -33,12 +33,12 @@ func transformRecord(index store.Index, config *Config) (apiclient.Job, error) { }) } - srcEndpoint, err := makeURL(config.FrontendURL, config.FrontendUsername, config.FrontendPassword) + srcEndpoint, err := makeURL(config.Shared.FrontendURL, config.Shared.FrontendUsername, config.Shared.FrontendPassword) if err != nil { return apiclient.Job{}, err } - redactedSrcEndpoint, err := makeURL(config.FrontendURL, "USERNAME_REMOVED", "PASSWORD_REMOVED") + redactedSrcEndpoint, err := makeURL(config.Shared.FrontendURL, "USERNAME_REMOVED", "PASSWORD_REMOVED") if err != nil { return apiclient.Job{}, err } @@ -86,8 +86,8 @@ func transformRecord(index store.Index, config *Config) (apiclient.Job, error) { // (in src-cli). We only pass the constructed URL to src-cli, which we trust not to // ship the values to a third party, but not to trust to ensure the values are absent // from the command's stdout or stderr streams. - config.FrontendUsername: "USERNAME_REMOVED", - config.FrontendPassword: "PASSWORD_REMOVED", + config.Shared.FrontendUsername: "USERNAME_REMOVED", + config.Shared.FrontendPassword: "PASSWORD_REMOVED", }, }, nil } diff --git a/enterprise/cmd/executor-queue/internal/queues/codeintel/transform_test.go b/enterprise/cmd/executor-queue/internal/queues/codeintel/transform_test.go index 116608fee16..4f725c41bf3 100644 --- a/enterprise/cmd/executor-queue/internal/queues/codeintel/transform_test.go +++ b/enterprise/cmd/executor-queue/internal/queues/codeintel/transform_test.go @@ -5,6 +5,7 @@ import ( "github.com/google/go-cmp/cmp" + "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/config" store "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/stores/dbstore" apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor" ) @@ -27,9 +28,11 @@ func TestTransformRecord(t *testing.T) { Outfile: "", } config := &Config{ - FrontendURL: "https://test.io", - FrontendUsername: "test*", - FrontendPassword: "hunter2", + Shared: &config.SharedConfig{ + FrontendURL: "https://test.io", + FrontendUsername: "test*", + FrontendPassword: "hunter2", + }, } job, err := transformRecord(index, config) @@ -104,9 +107,11 @@ func TestTransformRecordWithoutIndexer(t *testing.T) { Outfile: "other/path/lsif.dump", } config := &Config{ - FrontendURL: "https://test.io", - FrontendUsername: "test*", - FrontendPassword: "hunter2", + Shared: &config.SharedConfig{ + FrontendURL: "https://test.io", + FrontendUsername: "test*", + FrontendPassword: "hunter2", + }, } job, err := transformRecord(index, config) diff --git a/enterprise/cmd/executor-queue/main.go b/enterprise/cmd/executor-queue/main.go index 33ff12827b9..b2715936d2b 100644 --- a/enterprise/cmd/executor-queue/main.go +++ b/enterprise/cmd/executor-queue/main.go @@ -9,6 +9,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" + "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/queues/batches" "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/queues/codeintel" apiserver "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/server" "github.com/sourcegraph/sourcegraph/internal/conf" @@ -30,7 +31,8 @@ type config interface { func main() { serviceConfig := &Config{} codeintelConfig := &codeintel.Config{} - configs := []config{serviceConfig, codeintelConfig} + batchesConfig := &batches.Config{} + configs := []config{serviceConfig, codeintelConfig, batchesConfig} for _, config := range configs { config.Load() @@ -71,6 +73,7 @@ func main() { // Initialize queues queueOptions := map[string]apiserver.QueueOptions{ "codeintel": codeintel.QueueOptions(db, codeintelConfig, observationContext), + "batches": batches.QueueOptions(db, batchesConfig, observationContext), } for queueName, options := range queueOptions { diff --git a/enterprise/cmd/executor/config.go b/enterprise/cmd/executor/config.go index 0eb89b9f680..40802f4222a 100644 --- a/enterprise/cmd/executor/config.go +++ b/enterprise/cmd/executor/config.go @@ -30,6 +30,7 @@ type Config struct { FirecrackerDiskSpace string ImageArchivesPath string DisableHealthServer bool + HealthServerPort int MaximumRuntimePerJob time.Duration } @@ -48,6 +49,7 @@ func (c *Config) Load() { c.FirecrackerDiskSpace = c.Get("EXECUTOR_FIRECRACKER_DISK_SPACE", "20G", "How much disk space to allocate to each virtual machine or container.") c.ImageArchivesPath = c.Get("EXECUTOR_IMAGE_ARCHIVE_PATH", "", "Where to store tar archives of docker images shared by virtual machines.") c.DisableHealthServer = c.GetBool("EXECUTOR_DISABLE_HEALTHSERVER", "false", "Whether or not to disable the health server.") + c.HealthServerPort = c.GetInt("EXECUTOR_HEALTH_SERVER_PORT", "3192", "The port to listen on for the health server.") c.MaximumRuntimePerJob = c.GetInterval("EXECUTOR_MAXIMUM_RUNTIME_PER_JOB", "30m", "The maximum wall time that can be spent on a single job.") } diff --git a/enterprise/cmd/executor/main.go b/enterprise/cmd/executor/main.go index e815621d18e..6e55230ba5f 100644 --- a/enterprise/cmd/executor/main.go +++ b/enterprise/cmd/executor/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "log" "net/http" "time" @@ -21,8 +22,6 @@ import ( "github.com/sourcegraph/sourcegraph/internal/workerutil" ) -const addr = ":3192" - func main() { config := &Config{} config.Load() @@ -53,7 +52,7 @@ func main() { worker.NewWorker(config.APIWorkerOptions(nil), observationContext), } if !config.DisableHealthServer { - routines = append(routines, httpserver.NewFromAddr(addr, &http.Server{ + routines = append(routines, httpserver.NewFromAddr(fmt.Sprintf(":%d", config.HealthServerPort), &http.Server{ ReadTimeout: 75 * time.Second, WriteTimeout: 10 * time.Minute, Handler: httpserver.NewHandler(nil), diff --git a/enterprise/internal/batches/background/background.go b/enterprise/internal/batches/background/background.go index 862fdb3e3b4..a940b7f7b72 100644 --- a/enterprise/internal/batches/background/background.go +++ b/enterprise/internal/batches/background/background.go @@ -3,17 +3,28 @@ package background import ( "context" + "github.com/inconshreveable/log15" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/scheduler" "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/sources" "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store" "github.com/sourcegraph/sourcegraph/internal/gitserver" "github.com/sourcegraph/sourcegraph/internal/goroutine" "github.com/sourcegraph/sourcegraph/internal/httpcli" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/trace" ) func Routines(ctx context.Context, batchesStore *store.Store, cf *httpcli.Factory) []goroutine.BackgroundRoutine { sourcer := sources.NewSourcer(cf) - metrics := newMetrics() + observationContext := &observation.Context{ + Logger: log15.Root(), + Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()}, + Registerer: prometheus.DefaultRegisterer, + } + metrics := newMetrics(observationContext) routines := []goroutine.BackgroundRoutine{ newReconcilerWorker(ctx, batchesStore, gitserver.DefaultClient, sourcer, metrics), @@ -25,6 +36,8 @@ func Routines(ctx context.Context, batchesStore *store.Store, cf *httpcli.Factor newBulkOperationWorker(ctx, batchesStore, sourcer, metrics), newBulkOperationWorkerResetter(batchesStore, metrics), + + newBatchSpecExecutionResetter(batchesStore, observationContext, metrics), } return routines } diff --git a/enterprise/internal/batches/background/execution_resetter.go b/enterprise/internal/batches/background/execution_resetter.go new file mode 100644 index 00000000000..416c3eafb1c --- /dev/null +++ b/enterprise/internal/batches/background/execution_resetter.go @@ -0,0 +1,24 @@ +package background + +import ( + "time" + + "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker" +) + +// newBatchSpecExecutionResetter creates a dbworker.Resetter that re-enqueues +// lost batch_spec_execution jobs for processing. +func newBatchSpecExecutionResetter(s *store.Store, observationContext *observation.Context, metrics batchChangesMetrics) *dbworker.Resetter { + workerStore := NewExecutorStore(s, observationContext) + + options := dbworker.ResetterOptions{ + Name: "batch_spec_executor_resetter", + Interval: 1 * time.Minute, + Metrics: metrics.executionResetterMetrics, + } + + resetter := dbworker.NewResetter(workerStore, options) + return resetter +} diff --git a/enterprise/internal/batches/background/executor_store.go b/enterprise/internal/batches/background/executor_store.go new file mode 100644 index 00000000000..ccf8733f986 --- /dev/null +++ b/enterprise/internal/batches/background/executor_store.go @@ -0,0 +1,48 @@ +package background + +import ( + "database/sql" + "time" + + "github.com/keegancsmith/sqlf" + + "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/workerutil" + dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" +) + +// executorStalledJobMaximumAge is the maximum allowable duration between updating the state of a +// job as "processing" and locking the record during processing. An unlocked row that is +// marked as processing likely indicates that the executor that dequeued the job has died. +// There should be a nearly-zero delay between these states during normal operation. +const executorStalledJobMaximumAge = time.Second * 5 + +// executorMaximumNumResets is the maximum number of times a job can be reset. If a job's failed +// attempts counter reaches this threshold, it will be moved into "errored" rather than +// "queued" on its next reset. +const executorMaximumNumResets = 3 + +var executorWorkerStoreOptions = dbworkerstore.Options{ + Name: "batch_spec_executor_worker_store", + TableName: "batch_spec_executions", + ColumnExpressions: store.BatchSpecExecutionColumns, + Scan: scanFirstExecutionRecord, + OrderByExpression: sqlf.Sprintf("batch_spec_executions.created_at, batch_spec_executions.id"), + StalledMaxAge: executorStalledJobMaximumAge, + MaxNumResets: executorMaximumNumResets, + // Explicitly disable retries. + MaxNumRetries: 0, +} + +// NewExecutorStore creates a dbworker store that wraps the batch_spec_executions +// table. +func NewExecutorStore(s basestore.ShareableStore, observationContext *observation.Context) dbworkerstore.Store { + return dbworkerstore.NewWithMetrics(s.Handle(), executorWorkerStoreOptions, observationContext) +} + +// scanFirstExecutionRecord scans a slice of batch change executions and returns the first. +func scanFirstExecutionRecord(rows *sql.Rows, err error) (workerutil.Record, bool, error) { + return store.ScanFirstBatchSpecExecution(rows, err) +} diff --git a/enterprise/internal/batches/background/metrics.go b/enterprise/internal/batches/background/metrics.go index aa43ad5d0a1..50973c37db3 100644 --- a/enterprise/internal/batches/background/metrics.go +++ b/enterprise/internal/batches/background/metrics.go @@ -3,12 +3,9 @@ package background import ( "fmt" - "github.com/inconshreveable/log15" - "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/sourcegraph/sourcegraph/internal/observation" - "github.com/sourcegraph/sourcegraph/internal/trace" "github.com/sourcegraph/sourcegraph/internal/workerutil" "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker" ) @@ -18,20 +15,16 @@ type batchChangesMetrics struct { bulkProcessorWorkerMetrics workerutil.WorkerMetrics reconcilerWorkerResetterMetrics dbworker.ResetterMetrics bulkProcessorWorkerResetterMetrics dbworker.ResetterMetrics + executionResetterMetrics dbworker.ResetterMetrics } -func newMetrics() batchChangesMetrics { - observationContext := &observation.Context{ - Logger: log15.Root(), - Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()}, - Registerer: prometheus.DefaultRegisterer, - } - +func newMetrics(observationContext *observation.Context) batchChangesMetrics { return batchChangesMetrics{ reconcilerWorkerMetrics: workerutil.NewMetrics(observationContext, "batch_changes_reconciler", nil), bulkProcessorWorkerMetrics: workerutil.NewMetrics(observationContext, "batch_changes_bulk_processor", nil), reconcilerWorkerResetterMetrics: makeResetterMetrics(observationContext, "batch_changes_reconciler"), bulkProcessorWorkerResetterMetrics: makeResetterMetrics(observationContext, "batch_changes_bulk_processor"), + executionResetterMetrics: makeResetterMetrics(observationContext, "batch_spec_executor"), } } diff --git a/enterprise/internal/batches/store/batch_spec_execution.go b/enterprise/internal/batches/store/batch_spec_execution.go new file mode 100644 index 00000000000..5686f1a5c29 --- /dev/null +++ b/enterprise/internal/batches/store/batch_spec_execution.go @@ -0,0 +1,174 @@ +package store + +import ( + "context" + "database/sql" + + "github.com/keegancsmith/sqlf" + "github.com/lib/pq" + + btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/dbutil" + "github.com/sourcegraph/sourcegraph/internal/workerutil" + dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" +) + +var BatchSpecExecutionColumns = []*sqlf.Query{ + sqlf.Sprintf("batch_spec_executions.id"), + sqlf.Sprintf("batch_spec_executions.state"), + sqlf.Sprintf("batch_spec_executions.failure_message"), + sqlf.Sprintf("batch_spec_executions.started_at"), + sqlf.Sprintf("batch_spec_executions.finished_at"), + sqlf.Sprintf("batch_spec_executions.process_after"), + sqlf.Sprintf("batch_spec_executions.num_resets"), + sqlf.Sprintf("batch_spec_executions.num_failures"), + sqlf.Sprintf(`batch_spec_executions.execution_logs`), + sqlf.Sprintf("batch_spec_executions.worker_hostname"), + sqlf.Sprintf(`batch_spec_executions.created_at`), + sqlf.Sprintf(`batch_spec_executions.updated_at`), + sqlf.Sprintf(`batch_spec_executions.batch_spec`), + sqlf.Sprintf(`batch_spec_executions.batch_spec_id`), +} + +var batchSpecExecutionInsertColumns = []*sqlf.Query{ + sqlf.Sprintf("batch_spec"), + sqlf.Sprintf("created_at"), + sqlf.Sprintf("updated_at"), +} + +// CreateBatchSpecExecution creates the given BatchSpecExecution. +func (s *Store) CreateBatchSpecExecution(ctx context.Context, b *btypes.BatchSpecExecution) error { + if b.CreatedAt.IsZero() { + b.CreatedAt = s.now() + } + + if b.UpdatedAt.IsZero() { + b.UpdatedAt = b.CreatedAt + } + + q, err := createBatchSpecExecutionQuery(b) + if err != nil { + return err + } + return s.query(ctx, q, func(sc scanner) error { return scanBatchSpecExecution(b, sc) }) +} + +var createBatchSpecExecutionQueryFmtstr = ` +-- source: enterprise/internal/batches/store/batch_spec_executions.go:CreateBatchSpecExecution +INSERT INTO batch_spec_executions (%s) +VALUES (%s, %s, %s) +RETURNING %s` + +func createBatchSpecExecutionQuery(c *btypes.BatchSpecExecution) (*sqlf.Query, error) { + return sqlf.Sprintf( + createBatchSpecExecutionQueryFmtstr, + sqlf.Join(batchSpecExecutionInsertColumns, ", "), + c.BatchSpec, + c.CreatedAt, + c.UpdatedAt, + sqlf.Join(BatchSpecExecutionColumns, ", "), + ), nil +} + +// GetBatchSpecExecutionOpts captures the query options needed for getting a BatchSpecExecution. +type GetBatchSpecExecutionOpts struct { + ID int64 +} + +// GetBatchSpecExecution gets a BatchSpecExecution matching the given options. +func (s *Store) GetBatchSpecExecution(ctx context.Context, opts GetBatchSpecExecutionOpts) (*btypes.BatchSpecExecution, error) { + q := getBatchSpecExecutionQuery(&opts) + + var b btypes.BatchSpecExecution + err := s.query(ctx, q, func(sc scanner) (err error) { + return scanBatchSpecExecution(&b, sc) + }) + if err != nil { + return nil, err + } + + if b.ID == 0 { + return nil, ErrNoResults + } + + return &b, nil +} + +var getBatchSpecExecutionQueryFmtstr = ` +-- source: enterprise/internal/batches/store/batch_spec_executions.go:GetBatchSpecExecution +SELECT %s FROM batch_spec_executions +WHERE %s +LIMIT 1 +` + +func getBatchSpecExecutionQuery(opts *GetBatchSpecExecutionOpts) *sqlf.Query { + preds := []*sqlf.Query{ + sqlf.Sprintf("id = %s", opts.ID), + } + + return sqlf.Sprintf( + getBatchSpecExecutionQueryFmtstr, + sqlf.Join(BatchSpecExecutionColumns, ", "), + sqlf.Join(preds, "\n AND "), + ) +} + +func scanBatchSpecExecution(b *btypes.BatchSpecExecution, sc scanner) error { + var executionLogs []dbworkerstore.ExecutionLogEntry + + if err := sc.Scan( + &b.ID, + &b.State, + &b.FailureMessage, + &b.StartedAt, + &b.FinishedAt, + &b.ProcessAfter, + &b.NumResets, + &b.NumFailures, + pq.Array(&executionLogs), + &b.WorkerHostname, + &b.CreatedAt, + &b.UpdatedAt, + &b.BatchSpec, + &dbutil.NullInt64{N: &b.BatchSpecID}, + ); err != nil { + return err + } + + for _, entry := range executionLogs { + b.ExecutionLogs = append(b.ExecutionLogs, workerutil.ExecutionLogEntry(entry)) + } + + return nil +} + +// scanBatchSpecExecutions scans a slice of batch spec executions from the rows. +func scanBatchSpecExecutions(rows *sql.Rows, queryErr error) (_ []*btypes.BatchSpecExecution, err error) { + if queryErr != nil { + return nil, queryErr + } + defer func() { err = basestore.CloseRows(rows, err) }() + + var execs []*btypes.BatchSpecExecution + for rows.Next() { + exec := &btypes.BatchSpecExecution{} + if err := scanBatchSpecExecution(exec, rows); err != nil { + return nil, err + } + + execs = append(execs, exec) + } + + return execs, nil +} + +// ScanFirstBatchSpecExecution scans a slice of batch spec executions from the +// rows and returns the first. +func ScanFirstBatchSpecExecution(rows *sql.Rows, err error) (*btypes.BatchSpecExecution, bool, error) { + execs, err := scanBatchSpecExecutions(rows, err) + if err != nil || len(execs) == 0 { + return &btypes.BatchSpecExecution{}, false, err + } + return execs[0], true, nil +} diff --git a/enterprise/internal/batches/types/batch_spec_execution.go b/enterprise/internal/batches/types/batch_spec_execution.go new file mode 100644 index 00000000000..da7f063f0b4 --- /dev/null +++ b/enterprise/internal/batches/types/batch_spec_execution.go @@ -0,0 +1,38 @@ +package types + +import ( + "time" + + "github.com/sourcegraph/sourcegraph/internal/workerutil" +) + +type BatchSpecExecutionState string + +const ( + BatchSpecExecutionStateQueued BatchSpecExecutionState = "queued" + BatchSpecExecutionStateErrored BatchSpecExecutionState = "errored" + BatchSpecExecutionStateFailed BatchSpecExecutionState = "failed" + BatchSpecExecutionStateCompleted BatchSpecExecutionState = "completed" + BatchSpecExecutionStateProcessing BatchSpecExecutionState = "processing" +) + +type BatchSpecExecution struct { + ID int64 + State BatchSpecExecutionState + FailureMessage *string + StartedAt *time.Time + FinishedAt *time.Time + ProcessAfter *time.Time + NumResets int64 + NumFailures int64 + ExecutionLogs []workerutil.ExecutionLogEntry + WorkerHostname string + CreatedAt time.Time + UpdatedAt time.Time + BatchSpec string + BatchSpecID int64 +} + +func (i BatchSpecExecution) RecordID() int { + return int(i.ID) +} diff --git a/internal/database/schema.md b/internal/database/schema.md index 8ba20496d8e..141a521a88c 100755 --- a/internal/database/schema.md +++ b/internal/database/schema.md @@ -76,6 +76,31 @@ Indexes: ``` +# Table "public.batch_spec_executions" +``` + Column | Type | Collation | Nullable | Default +-----------------+--------------------------+-----------+----------+--------------------------------------------------- + id | bigint | | not null | nextval('batch_spec_executions_id_seq'::regclass) + state | text | | | 'queued'::text + failure_message | text | | | + started_at | timestamp with time zone | | | + finished_at | timestamp with time zone | | | + process_after | timestamp with time zone | | | + num_resets | integer | | not null | 0 + num_failures | integer | | not null | 0 + execution_logs | json[] | | | + worker_hostname | text | | not null | ''::text + created_at | timestamp with time zone | | not null | now() + updated_at | timestamp with time zone | | not null | now() + batch_spec | text | | not null | + batch_spec_id | integer | | | +Indexes: + "batch_spec_executions_pkey" PRIMARY KEY, btree (id) +Foreign-key constraints: + "batch_spec_executions_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) + +``` + # Table "public.batch_specs" ``` Column | Type | Collation | Nullable | Default @@ -98,6 +123,7 @@ Foreign-key constraints: "batch_specs_user_id_fkey" FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL DEFERRABLE Referenced by: TABLE "batch_changes" CONSTRAINT "batch_changes_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) DEFERRABLE + TABLE "batch_spec_executions" CONSTRAINT "batch_spec_executions_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) TABLE "changeset_specs" CONSTRAINT "changeset_specs_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) DEFERRABLE ``` diff --git a/migrations/frontend/1528395843_batch_spec_executions_table.down.sql b/migrations/frontend/1528395843_batch_spec_executions_table.down.sql new file mode 100644 index 00000000000..23fa74022a0 --- /dev/null +++ b/migrations/frontend/1528395843_batch_spec_executions_table.down.sql @@ -0,0 +1,5 @@ +BEGIN; + +DROP TABLE IF EXISTS batch_spec_executions; + +COMMIT; diff --git a/migrations/frontend/1528395843_batch_spec_executions_table.up.sql b/migrations/frontend/1528395843_batch_spec_executions_table.up.sql new file mode 100644 index 00000000000..8d230bbed42 --- /dev/null +++ b/migrations/frontend/1528395843_batch_spec_executions_table.up.sql @@ -0,0 +1,22 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS batch_spec_executions ( + id BIGSERIAL PRIMARY KEY, + state TEXT DEFAULT 'queued', + failure_message TEXT, + started_at TIMESTAMP WITH TIME ZONE, + finished_at TIMESTAMP WITH TIME ZONE, + process_after TIMESTAMP WITH TIME ZONE, + num_resets INTEGER NOT NULL DEFAULT 0, + num_failures INTEGER NOT NULL DEFAULT 0, + execution_logs JSON[], + worker_hostname TEXT NOT NULL DEFAULT '', + + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + + batch_spec TEXT NOT NULL, + batch_spec_id integer REFERENCES batch_specs(id) +); + +COMMIT; diff --git a/sg.config.yaml b/sg.config.yaml index 398175bc550..441b0e83cb0 100644 --- a/sg.config.yaml +++ b/sg.config.yaml @@ -513,6 +513,7 @@ commandsets: - zoekt-indexserver-1 - zoekt-webserver-0 - zoekt-webserver-1 + - executor-queue enterprise-codeintel: - enterprise-frontend