Implement executor queue for batch changes executor (#22555)

This commit contains several smaller pieces:

- Run executor-queue as part of enterprise

- Make health server port configurable

Otherwise, it's impossible to run two executors on the same machine (required for dev when running code intel AND batch changes).

- Make port for executor queue configurable

- Add batch spec execution DB entity

- Overpromise implementation of batches queue in executor-queue

- Add shared config for reusing env vars across queues

These fields will also be required in the batches queue, so we need to make them sharable. Defining them in both places causes a validation error.

- Add batch spec executions queue to executor-queue

- Add resetter for executor worker
This commit is contained in:
Erik Seliger 2021-07-05 12:27:40 +02:00 committed by GitHub
parent bce37c82aa
commit 968bea1eea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 561 additions and 37 deletions

View File

@ -5,3 +5,4 @@ The executor-queue service maintains the executor work queues. Executor instance
## Work queues
- The `codeintel` queue contains unprocessed lsif_index records
- The `batches` queue contains unprocessed batch_spec_execution records

View File

@ -7,11 +7,10 @@ import (
"github.com/sourcegraph/sourcegraph/internal/env"
)
const port = 3191
type Config struct {
env.BaseConfig
Port int
MaximumNumTransactions int
JobRequeueDelay time.Duration
JobCleanupInterval time.Duration
@ -19,6 +18,7 @@ type Config struct {
}
func (c *Config) Load() {
c.Port = c.GetInt("EXECUTOR_QUEUE_PORT", "3191", "The port to listen on.")
c.MaximumNumTransactions = c.GetInt("EXECUTOR_QUEUE_MAXIMUM_NUM_TRANSACTIONS", "10", "Number of jobs that can be processing at one time.")
c.JobRequeueDelay = c.GetInterval("EXECUTOR_QUEUE_JOB_REQUEUE_DELAY", "1m", "The requeue delay of jobs assigned to an unreachable executor.")
c.JobCleanupInterval = c.GetInterval("EXECUTOR_QUEUE_JOB_CLEANUP_INTERVAL", "10s", "Interval between cleanup runs.")
@ -27,7 +27,7 @@ func (c *Config) Load() {
func (c *Config) ServerOptions(queueOptions map[string]apiserver.QueueOptions) apiserver.Options {
return apiserver.Options{
Port: port,
Port: c.Port,
QueueOptions: queueOptions,
MaximumNumTransactions: c.MaximumNumTransactions,
RequeueDelay: c.JobRequeueDelay,

View File

@ -0,0 +1,18 @@
package config
import "github.com/sourcegraph/sourcegraph/internal/env"
// SharedConfig defines common items that are used by multiple queues.
type SharedConfig struct {
env.BaseConfig
FrontendURL string
FrontendUsername string
FrontendPassword string
}
func (c *SharedConfig) Load() {
c.FrontendURL = c.Get("EXECUTOR_FRONTEND_URL", "", "The external URL of the sourcegraph instance.")
c.FrontendUsername = c.Get("EXECUTOR_FRONTEND_USERNAME", "", "The username supplied to the frontend.")
c.FrontendPassword = c.Get("EXECUTOR_FRONTEND_PASSWORD", "", "The password supplied to the frontend.")
}

View File

@ -0,0 +1,14 @@
package batches
import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/config"
"github.com/sourcegraph/sourcegraph/internal/env"
)
type Config struct {
env.BaseConfig
Shared *config.SharedConfig
}
func (c *Config) Load() {}

View File

@ -0,0 +1,25 @@
package batches
import (
"database/sql"
apiserver "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/server"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/background"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
)
func QueueOptions(db dbutil.DB, config *Config, observationContext *observation.Context) apiserver.QueueOptions {
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) {
return transformRecord(record.(*btypes.BatchSpecExecution), config)
}
return apiserver.QueueOptions{
Store: background.NewExecutorStore(basestore.NewWithDB(db, sql.TxOptions{}), observationContext),
RecordTransformer: recordTransformer,
}
}

View File

@ -0,0 +1,63 @@
package batches
import (
"fmt"
"net/url"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
)
// transformRecord transforms a *btypes.BatchSpecExecution into an apiclient.Job.
func transformRecord(exec *btypes.BatchSpecExecution, config *Config) (apiclient.Job, error) {
srcEndpoint, err := makeURL(config.Shared.FrontendURL, config.Shared.FrontendUsername, config.Shared.FrontendPassword)
if err != nil {
return apiclient.Job{}, err
}
redactedSrcEndpoint, err := makeURL(config.Shared.FrontendURL, "USERNAME_REMOVED", "PASSWORD_REMOVED")
if err != nil {
return apiclient.Job{}, err
}
return apiclient.Job{
ID: int(exec.ID),
VirtualMachineFiles: map[string]string{"spec.yml": exec.BatchSpec},
CliSteps: []apiclient.CliStep{
{
Commands: []string{
"batch",
"preview",
"-f", "spec.yml",
},
Dir: ".",
Env: []string{
fmt.Sprintf("SRC_ENDPOINT=%s", srcEndpoint),
},
},
},
RedactedValues: map[string]string{
// 🚨 SECURITY: Catch leak of upload endpoint. This is necessary in addition
// to the below in case the username or password contains illegal URL characters,
// which are then urlencoded and are not replaceable via byte comparison.
srcEndpoint: redactedSrcEndpoint,
// 🚨 SECURITY: Catch uses of fragments pulled from URL to construct another target
// (in src-cli). We only pass the constructed URL to src-cli, which we trust not to
// ship the values to a third party, but not to trust to ensure the values are absent
// from the command's stdout or stderr streams.
config.Shared.FrontendUsername: "USERNAME_REMOVED",
config.Shared.FrontendPassword: "PASSWORD_REMOVED",
},
}, nil
}
func makeURL(base, username, password string) (string, error) {
u, err := url.Parse(base)
if err != nil {
return "", err
}
u.User = url.UserPassword(username, password)
return u.String(), nil
}

View File

@ -0,0 +1,53 @@
package batches
import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/config"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
)
func TestTransformRecord(t *testing.T) {
testBatchSpec := `batchSpec: yeah`
index := &btypes.BatchSpecExecution{
ID: 42,
BatchSpec: testBatchSpec,
}
config := &Config{
Shared: &config.SharedConfig{
FrontendURL: "https://test.io",
FrontendUsername: "test*",
FrontendPassword: "hunter2",
},
}
job, err := transformRecord(index, config)
if err != nil {
t.Fatalf("unexpected error transforming record: %s", err)
}
expected := apiclient.Job{
ID: 42,
VirtualMachineFiles: map[string]string{"spec.yml": testBatchSpec},
CliSteps: []apiclient.CliStep{
{
Commands: []string{
"batch", "preview", "-f", "spec.yml",
},
Dir: ".",
Env: []string{"SRC_ENDPOINT=https://test%2A:hunter2@test.io"},
},
},
RedactedValues: map[string]string{
"https://test%2A:hunter2@test.io": "https://USERNAME_REMOVED:PASSWORD_REMOVED@test.io",
"test*": "USERNAME_REMOVED",
"hunter2": "PASSWORD_REMOVED",
},
}
if diff := cmp.Diff(expected, job); diff != "" {
t.Errorf("unexpected job (-want +got):\n%s", diff)
}
}

View File

@ -1,17 +1,14 @@
package codeintel
import "github.com/sourcegraph/sourcegraph/internal/env"
import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/config"
"github.com/sourcegraph/sourcegraph/internal/env"
)
type Config struct {
env.BaseConfig
FrontendURL string
FrontendUsername string
FrontendPassword string
Shared *config.SharedConfig
}
func (c *Config) Load() {
c.FrontendURL = c.Get("EXECUTOR_FRONTEND_URL", "", "The external URL of the sourcegraph instance.")
c.FrontendUsername = c.Get("EXECUTOR_FRONTEND_USERNAME", "", "The username supplied to the frontend.")
c.FrontendPassword = c.Get("EXECUTOR_FRONTEND_PASSWORD", "", "The password supplied to the frontend.")
}
func (c *Config) Load() {}

View File

@ -33,12 +33,12 @@ func transformRecord(index store.Index, config *Config) (apiclient.Job, error) {
})
}
srcEndpoint, err := makeURL(config.FrontendURL, config.FrontendUsername, config.FrontendPassword)
srcEndpoint, err := makeURL(config.Shared.FrontendURL, config.Shared.FrontendUsername, config.Shared.FrontendPassword)
if err != nil {
return apiclient.Job{}, err
}
redactedSrcEndpoint, err := makeURL(config.FrontendURL, "USERNAME_REMOVED", "PASSWORD_REMOVED")
redactedSrcEndpoint, err := makeURL(config.Shared.FrontendURL, "USERNAME_REMOVED", "PASSWORD_REMOVED")
if err != nil {
return apiclient.Job{}, err
}
@ -86,8 +86,8 @@ func transformRecord(index store.Index, config *Config) (apiclient.Job, error) {
// (in src-cli). We only pass the constructed URL to src-cli, which we trust not to
// ship the values to a third party, but not to trust to ensure the values are absent
// from the command's stdout or stderr streams.
config.FrontendUsername: "USERNAME_REMOVED",
config.FrontendPassword: "PASSWORD_REMOVED",
config.Shared.FrontendUsername: "USERNAME_REMOVED",
config.Shared.FrontendPassword: "PASSWORD_REMOVED",
},
}, nil
}

View File

@ -5,6 +5,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/config"
store "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/stores/dbstore"
apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
)
@ -27,9 +28,11 @@ func TestTransformRecord(t *testing.T) {
Outfile: "",
}
config := &Config{
FrontendURL: "https://test.io",
FrontendUsername: "test*",
FrontendPassword: "hunter2",
Shared: &config.SharedConfig{
FrontendURL: "https://test.io",
FrontendUsername: "test*",
FrontendPassword: "hunter2",
},
}
job, err := transformRecord(index, config)
@ -104,9 +107,11 @@ func TestTransformRecordWithoutIndexer(t *testing.T) {
Outfile: "other/path/lsif.dump",
}
config := &Config{
FrontendURL: "https://test.io",
FrontendUsername: "test*",
FrontendPassword: "hunter2",
Shared: &config.SharedConfig{
FrontendURL: "https://test.io",
FrontendUsername: "test*",
FrontendPassword: "hunter2",
},
}
job, err := transformRecord(index, config)

View File

@ -9,6 +9,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/queues/batches"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/queues/codeintel"
apiserver "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/server"
"github.com/sourcegraph/sourcegraph/internal/conf"
@ -30,7 +31,8 @@ type config interface {
func main() {
serviceConfig := &Config{}
codeintelConfig := &codeintel.Config{}
configs := []config{serviceConfig, codeintelConfig}
batchesConfig := &batches.Config{}
configs := []config{serviceConfig, codeintelConfig, batchesConfig}
for _, config := range configs {
config.Load()
@ -71,6 +73,7 @@ func main() {
// Initialize queues
queueOptions := map[string]apiserver.QueueOptions{
"codeintel": codeintel.QueueOptions(db, codeintelConfig, observationContext),
"batches": batches.QueueOptions(db, batchesConfig, observationContext),
}
for queueName, options := range queueOptions {

View File

@ -30,6 +30,7 @@ type Config struct {
FirecrackerDiskSpace string
ImageArchivesPath string
DisableHealthServer bool
HealthServerPort int
MaximumRuntimePerJob time.Duration
}
@ -48,6 +49,7 @@ func (c *Config) Load() {
c.FirecrackerDiskSpace = c.Get("EXECUTOR_FIRECRACKER_DISK_SPACE", "20G", "How much disk space to allocate to each virtual machine or container.")
c.ImageArchivesPath = c.Get("EXECUTOR_IMAGE_ARCHIVE_PATH", "", "Where to store tar archives of docker images shared by virtual machines.")
c.DisableHealthServer = c.GetBool("EXECUTOR_DISABLE_HEALTHSERVER", "false", "Whether or not to disable the health server.")
c.HealthServerPort = c.GetInt("EXECUTOR_HEALTH_SERVER_PORT", "3192", "The port to listen on for the health server.")
c.MaximumRuntimePerJob = c.GetInterval("EXECUTOR_MAXIMUM_RUNTIME_PER_JOB", "30m", "The maximum wall time that can be spent on a single job.")
}

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
@ -21,8 +22,6 @@ import (
"github.com/sourcegraph/sourcegraph/internal/workerutil"
)
const addr = ":3192"
func main() {
config := &Config{}
config.Load()
@ -53,7 +52,7 @@ func main() {
worker.NewWorker(config.APIWorkerOptions(nil), observationContext),
}
if !config.DisableHealthServer {
routines = append(routines, httpserver.NewFromAddr(addr, &http.Server{
routines = append(routines, httpserver.NewFromAddr(fmt.Sprintf(":%d", config.HealthServerPort), &http.Server{
ReadTimeout: 75 * time.Second,
WriteTimeout: 10 * time.Minute,
Handler: httpserver.NewHandler(nil),

View File

@ -3,17 +3,28 @@ package background
import (
"context"
"github.com/inconshreveable/log15"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/scheduler"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/sources"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
"github.com/sourcegraph/sourcegraph/internal/gitserver"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/httpcli"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/trace"
)
func Routines(ctx context.Context, batchesStore *store.Store, cf *httpcli.Factory) []goroutine.BackgroundRoutine {
sourcer := sources.NewSourcer(cf)
metrics := newMetrics()
observationContext := &observation.Context{
Logger: log15.Root(),
Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()},
Registerer: prometheus.DefaultRegisterer,
}
metrics := newMetrics(observationContext)
routines := []goroutine.BackgroundRoutine{
newReconcilerWorker(ctx, batchesStore, gitserver.DefaultClient, sourcer, metrics),
@ -25,6 +36,8 @@ func Routines(ctx context.Context, batchesStore *store.Store, cf *httpcli.Factor
newBulkOperationWorker(ctx, batchesStore, sourcer, metrics),
newBulkOperationWorkerResetter(batchesStore, metrics),
newBatchSpecExecutionResetter(batchesStore, observationContext, metrics),
}
return routines
}

View File

@ -0,0 +1,24 @@
package background
import (
"time"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
)
// newBatchSpecExecutionResetter creates a dbworker.Resetter that re-enqueues
// lost batch_spec_execution jobs for processing.
func newBatchSpecExecutionResetter(s *store.Store, observationContext *observation.Context, metrics batchChangesMetrics) *dbworker.Resetter {
workerStore := NewExecutorStore(s, observationContext)
options := dbworker.ResetterOptions{
Name: "batch_spec_executor_resetter",
Interval: 1 * time.Minute,
Metrics: metrics.executionResetterMetrics,
}
resetter := dbworker.NewResetter(workerStore, options)
return resetter
}

View File

@ -0,0 +1,48 @@
package background
import (
"database/sql"
"time"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
// executorStalledJobMaximumAge is the maximum allowable duration between updating the state of a
// job as "processing" and locking the record during processing. An unlocked row that is
// marked as processing likely indicates that the executor that dequeued the job has died.
// There should be a nearly-zero delay between these states during normal operation.
const executorStalledJobMaximumAge = time.Second * 5
// executorMaximumNumResets is the maximum number of times a job can be reset. If a job's failed
// attempts counter reaches this threshold, it will be moved into "errored" rather than
// "queued" on its next reset.
const executorMaximumNumResets = 3
var executorWorkerStoreOptions = dbworkerstore.Options{
Name: "batch_spec_executor_worker_store",
TableName: "batch_spec_executions",
ColumnExpressions: store.BatchSpecExecutionColumns,
Scan: scanFirstExecutionRecord,
OrderByExpression: sqlf.Sprintf("batch_spec_executions.created_at, batch_spec_executions.id"),
StalledMaxAge: executorStalledJobMaximumAge,
MaxNumResets: executorMaximumNumResets,
// Explicitly disable retries.
MaxNumRetries: 0,
}
// NewExecutorStore creates a dbworker store that wraps the batch_spec_executions
// table.
func NewExecutorStore(s basestore.ShareableStore, observationContext *observation.Context) dbworkerstore.Store {
return dbworkerstore.NewWithMetrics(s.Handle(), executorWorkerStoreOptions, observationContext)
}
// scanFirstExecutionRecord scans a slice of batch change executions and returns the first.
func scanFirstExecutionRecord(rows *sql.Rows, err error) (workerutil.Record, bool, error) {
return store.ScanFirstBatchSpecExecution(rows, err)
}

View File

@ -3,12 +3,9 @@ package background
import (
"fmt"
"github.com/inconshreveable/log15"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
)
@ -18,20 +15,16 @@ type batchChangesMetrics struct {
bulkProcessorWorkerMetrics workerutil.WorkerMetrics
reconcilerWorkerResetterMetrics dbworker.ResetterMetrics
bulkProcessorWorkerResetterMetrics dbworker.ResetterMetrics
executionResetterMetrics dbworker.ResetterMetrics
}
func newMetrics() batchChangesMetrics {
observationContext := &observation.Context{
Logger: log15.Root(),
Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()},
Registerer: prometheus.DefaultRegisterer,
}
func newMetrics(observationContext *observation.Context) batchChangesMetrics {
return batchChangesMetrics{
reconcilerWorkerMetrics: workerutil.NewMetrics(observationContext, "batch_changes_reconciler", nil),
bulkProcessorWorkerMetrics: workerutil.NewMetrics(observationContext, "batch_changes_bulk_processor", nil),
reconcilerWorkerResetterMetrics: makeResetterMetrics(observationContext, "batch_changes_reconciler"),
bulkProcessorWorkerResetterMetrics: makeResetterMetrics(observationContext, "batch_changes_bulk_processor"),
executionResetterMetrics: makeResetterMetrics(observationContext, "batch_spec_executor"),
}
}

View File

@ -0,0 +1,174 @@
package store
import (
"context"
"database/sql"
"github.com/keegancsmith/sqlf"
"github.com/lib/pq"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
var BatchSpecExecutionColumns = []*sqlf.Query{
sqlf.Sprintf("batch_spec_executions.id"),
sqlf.Sprintf("batch_spec_executions.state"),
sqlf.Sprintf("batch_spec_executions.failure_message"),
sqlf.Sprintf("batch_spec_executions.started_at"),
sqlf.Sprintf("batch_spec_executions.finished_at"),
sqlf.Sprintf("batch_spec_executions.process_after"),
sqlf.Sprintf("batch_spec_executions.num_resets"),
sqlf.Sprintf("batch_spec_executions.num_failures"),
sqlf.Sprintf(`batch_spec_executions.execution_logs`),
sqlf.Sprintf("batch_spec_executions.worker_hostname"),
sqlf.Sprintf(`batch_spec_executions.created_at`),
sqlf.Sprintf(`batch_spec_executions.updated_at`),
sqlf.Sprintf(`batch_spec_executions.batch_spec`),
sqlf.Sprintf(`batch_spec_executions.batch_spec_id`),
}
var batchSpecExecutionInsertColumns = []*sqlf.Query{
sqlf.Sprintf("batch_spec"),
sqlf.Sprintf("created_at"),
sqlf.Sprintf("updated_at"),
}
// CreateBatchSpecExecution creates the given BatchSpecExecution.
func (s *Store) CreateBatchSpecExecution(ctx context.Context, b *btypes.BatchSpecExecution) error {
if b.CreatedAt.IsZero() {
b.CreatedAt = s.now()
}
if b.UpdatedAt.IsZero() {
b.UpdatedAt = b.CreatedAt
}
q, err := createBatchSpecExecutionQuery(b)
if err != nil {
return err
}
return s.query(ctx, q, func(sc scanner) error { return scanBatchSpecExecution(b, sc) })
}
var createBatchSpecExecutionQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_spec_executions.go:CreateBatchSpecExecution
INSERT INTO batch_spec_executions (%s)
VALUES (%s, %s, %s)
RETURNING %s`
func createBatchSpecExecutionQuery(c *btypes.BatchSpecExecution) (*sqlf.Query, error) {
return sqlf.Sprintf(
createBatchSpecExecutionQueryFmtstr,
sqlf.Join(batchSpecExecutionInsertColumns, ", "),
c.BatchSpec,
c.CreatedAt,
c.UpdatedAt,
sqlf.Join(BatchSpecExecutionColumns, ", "),
), nil
}
// GetBatchSpecExecutionOpts captures the query options needed for getting a BatchSpecExecution.
type GetBatchSpecExecutionOpts struct {
ID int64
}
// GetBatchSpecExecution gets a BatchSpecExecution matching the given options.
func (s *Store) GetBatchSpecExecution(ctx context.Context, opts GetBatchSpecExecutionOpts) (*btypes.BatchSpecExecution, error) {
q := getBatchSpecExecutionQuery(&opts)
var b btypes.BatchSpecExecution
err := s.query(ctx, q, func(sc scanner) (err error) {
return scanBatchSpecExecution(&b, sc)
})
if err != nil {
return nil, err
}
if b.ID == 0 {
return nil, ErrNoResults
}
return &b, nil
}
var getBatchSpecExecutionQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_spec_executions.go:GetBatchSpecExecution
SELECT %s FROM batch_spec_executions
WHERE %s
LIMIT 1
`
func getBatchSpecExecutionQuery(opts *GetBatchSpecExecutionOpts) *sqlf.Query {
preds := []*sqlf.Query{
sqlf.Sprintf("id = %s", opts.ID),
}
return sqlf.Sprintf(
getBatchSpecExecutionQueryFmtstr,
sqlf.Join(BatchSpecExecutionColumns, ", "),
sqlf.Join(preds, "\n AND "),
)
}
func scanBatchSpecExecution(b *btypes.BatchSpecExecution, sc scanner) error {
var executionLogs []dbworkerstore.ExecutionLogEntry
if err := sc.Scan(
&b.ID,
&b.State,
&b.FailureMessage,
&b.StartedAt,
&b.FinishedAt,
&b.ProcessAfter,
&b.NumResets,
&b.NumFailures,
pq.Array(&executionLogs),
&b.WorkerHostname,
&b.CreatedAt,
&b.UpdatedAt,
&b.BatchSpec,
&dbutil.NullInt64{N: &b.BatchSpecID},
); err != nil {
return err
}
for _, entry := range executionLogs {
b.ExecutionLogs = append(b.ExecutionLogs, workerutil.ExecutionLogEntry(entry))
}
return nil
}
// scanBatchSpecExecutions scans a slice of batch spec executions from the rows.
func scanBatchSpecExecutions(rows *sql.Rows, queryErr error) (_ []*btypes.BatchSpecExecution, err error) {
if queryErr != nil {
return nil, queryErr
}
defer func() { err = basestore.CloseRows(rows, err) }()
var execs []*btypes.BatchSpecExecution
for rows.Next() {
exec := &btypes.BatchSpecExecution{}
if err := scanBatchSpecExecution(exec, rows); err != nil {
return nil, err
}
execs = append(execs, exec)
}
return execs, nil
}
// ScanFirstBatchSpecExecution scans a slice of batch spec executions from the
// rows and returns the first.
func ScanFirstBatchSpecExecution(rows *sql.Rows, err error) (*btypes.BatchSpecExecution, bool, error) {
execs, err := scanBatchSpecExecutions(rows, err)
if err != nil || len(execs) == 0 {
return &btypes.BatchSpecExecution{}, false, err
}
return execs[0], true, nil
}

View File

@ -0,0 +1,38 @@
package types
import (
"time"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
)
type BatchSpecExecutionState string
const (
BatchSpecExecutionStateQueued BatchSpecExecutionState = "queued"
BatchSpecExecutionStateErrored BatchSpecExecutionState = "errored"
BatchSpecExecutionStateFailed BatchSpecExecutionState = "failed"
BatchSpecExecutionStateCompleted BatchSpecExecutionState = "completed"
BatchSpecExecutionStateProcessing BatchSpecExecutionState = "processing"
)
type BatchSpecExecution struct {
ID int64
State BatchSpecExecutionState
FailureMessage *string
StartedAt *time.Time
FinishedAt *time.Time
ProcessAfter *time.Time
NumResets int64
NumFailures int64
ExecutionLogs []workerutil.ExecutionLogEntry
WorkerHostname string
CreatedAt time.Time
UpdatedAt time.Time
BatchSpec string
BatchSpecID int64
}
func (i BatchSpecExecution) RecordID() int {
return int(i.ID)
}

View File

@ -76,6 +76,31 @@ Indexes:
```
# Table "public.batch_spec_executions"
```
Column | Type | Collation | Nullable | Default
-----------------+--------------------------+-----------+----------+---------------------------------------------------
id | bigint | | not null | nextval('batch_spec_executions_id_seq'::regclass)
state | text | | | 'queued'::text
failure_message | text | | |
started_at | timestamp with time zone | | |
finished_at | timestamp with time zone | | |
process_after | timestamp with time zone | | |
num_resets | integer | | not null | 0
num_failures | integer | | not null | 0
execution_logs | json[] | | |
worker_hostname | text | | not null | ''::text
created_at | timestamp with time zone | | not null | now()
updated_at | timestamp with time zone | | not null | now()
batch_spec | text | | not null |
batch_spec_id | integer | | |
Indexes:
"batch_spec_executions_pkey" PRIMARY KEY, btree (id)
Foreign-key constraints:
"batch_spec_executions_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id)
```
# Table "public.batch_specs"
```
Column | Type | Collation | Nullable | Default
@ -98,6 +123,7 @@ Foreign-key constraints:
"batch_specs_user_id_fkey" FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL DEFERRABLE
Referenced by:
TABLE "batch_changes" CONSTRAINT "batch_changes_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) DEFERRABLE
TABLE "batch_spec_executions" CONSTRAINT "batch_spec_executions_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id)
TABLE "changeset_specs" CONSTRAINT "changeset_specs_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) DEFERRABLE
```

View File

@ -0,0 +1,5 @@
BEGIN;
DROP TABLE IF EXISTS batch_spec_executions;
COMMIT;

View File

@ -0,0 +1,22 @@
BEGIN;
CREATE TABLE IF NOT EXISTS batch_spec_executions (
id BIGSERIAL PRIMARY KEY,
state TEXT DEFAULT 'queued',
failure_message TEXT,
started_at TIMESTAMP WITH TIME ZONE,
finished_at TIMESTAMP WITH TIME ZONE,
process_after TIMESTAMP WITH TIME ZONE,
num_resets INTEGER NOT NULL DEFAULT 0,
num_failures INTEGER NOT NULL DEFAULT 0,
execution_logs JSON[],
worker_hostname TEXT NOT NULL DEFAULT '',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
batch_spec TEXT NOT NULL,
batch_spec_id integer REFERENCES batch_specs(id)
);
COMMIT;

View File

@ -513,6 +513,7 @@ commandsets:
- zoekt-indexserver-1
- zoekt-webserver-0
- zoekt-webserver-1
- executor-queue
enterprise-codeintel:
- enterprise-frontend