Delete BatchSpecExecutions (#25178)

This commit is contained in:
Thorsten Ball 2021-09-22 12:03:57 +02:00 committed by GitHub
parent dfefa4796f
commit 98286a052a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 62 additions and 1414 deletions

View File

@ -8,6 +8,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/cmd/frontend/enterprise"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/internal/codeintel"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/internal/executorqueue/config"
@ -15,7 +17,6 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/internal/executorqueue/metrics"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/internal/executorqueue/queues/batches"
codeintelqueue "github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/internal/executorqueue/queues/codeintel"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/trace"
@ -63,9 +64,8 @@ func Init(ctx context.Context, db dbutil.DB, outOfBandMigrationRunner *oobmigrat
// Register queues. If this set changes, be sure to also update the list of valid
// queue names in ./metrics/queue_allocation.go.
queueOptions := map[string]handler.QueueOptions{
"codeintel": codeintelqueue.QueueOptions(db, codeintelConfig, observationContext),
"batches": batches.QueueOptions(db, batchesConfig, observationContext),
"batch-spec-workspaces": batches.WorkspaceExecutionQueueOptions(db, batchesConfig, observationContext),
"codeintel": codeintelqueue.QueueOptions(db, codeintelConfig, observationContext),
"batches": batches.QueueOptions(db, batchesConfig, observationContext),
}
handler, err := codeintel.NewCodeIntelUploadHandler(ctx, db, true)

View File

@ -16,19 +16,6 @@ import (
)
func QueueOptions(db dbutil.DB, config *Config, observationContext *observation.Context) handler.QueueOptions {
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return transformRecord(ctx, db, record.(*btypes.BatchSpecExecution), config)
}
store := background.NewExecutorStore(basestore.NewHandleWithDB(db, sql.TxOptions{}), observationContext)
return handler.QueueOptions{
Store: store,
RecordTransformer: recordTransformer,
CanceledRecordsFetcher: store.FetchCanceled,
}
}
func WorkspaceExecutionQueueOptions(db dbutil.DB, config *Config, observationContext *observation.Context) handler.QueueOptions {
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
batchesStore := store.New(db, observationContext, nil)
return transformBatchSpecWorkspaceExecutionJobRecord(ctx, batchesStore, record.(*btypes.BatchSpecWorkspaceExecutionJob), config)

View File

@ -18,97 +18,6 @@ import (
batcheslib "github.com/sourcegraph/sourcegraph/lib/batches"
)
// transformRecord transforms a *btypes.BatchSpecExecution into an apiclient.Job.
func transformRecord(ctx context.Context, db dbutil.DB, exec *btypes.BatchSpecExecution, config *Config) (apiclient.Job, error) {
// TODO: createAccessToken is a bit of technical debt until we figure out a
// better solution. The problem is that src-cli needs to make requests to
// the Sourcegraph instance *on behalf of the user*.
//
// Ideally we'd have something like one-time tokens that
// * we could hand to src-cli
// * are not visible to the user in the Sourcegraph web UI
// * valid only for the duration of the batch spec execution
// * and cleaned up after batch spec is executed
//
// Until then we create a fresh access token every time.
//
// GetOrCreate doesn't work because once an access token has been created
// in the database Sourcegraph can't access the plain-text token anymore.
// Only a hash for verification is kept in the database.
token, err := createAccessToken(ctx, db, exec.UserID)
if err != nil {
return apiclient.Job{}, err
}
frontendURL := conf.Get().ExternalURL
srcEndpoint, err := makeURL(frontendURL, config.Shared.FrontendUsername, config.Shared.FrontendPassword)
if err != nil {
return apiclient.Job{}, err
}
redactedSrcEndpoint, err := makeURL(frontendURL, "USERNAME_REMOVED", "PASSWORD_REMOVED")
if err != nil {
return apiclient.Job{}, err
}
cliEnv := []string{
fmt.Sprintf("SRC_ENDPOINT=%s", srcEndpoint),
fmt.Sprintf("SRC_ACCESS_TOKEN=%s", token),
}
var namespaceName string
if exec.NamespaceUserID != 0 {
user, err := database.Users(db).GetByID(ctx, exec.NamespaceUserID)
if err != nil {
return apiclient.Job{}, err
}
namespaceName = user.Username
} else {
org, err := database.Orgs(db).GetByID(ctx, exec.NamespaceOrgID)
if err != nil {
return apiclient.Job{}, err
}
namespaceName = org.Name
}
return apiclient.Job{
ID: int(exec.ID),
VirtualMachineFiles: map[string]string{"spec.yml": exec.BatchSpec},
CliSteps: []apiclient.CliStep{
{
Commands: []string{
"batch",
"preview",
"-f", "spec.yml",
"-text-only",
"-skip-errors",
"-n", namespaceName,
},
Dir: ".",
Env: cliEnv,
},
},
RedactedValues: map[string]string{
// 🚨 SECURITY: Catch leak of upload endpoint. This is necessary in addition
// to the below in case the username or password contains illegal URL characters,
// which are then urlencoded and are not replaceable via byte comparison.
srcEndpoint: redactedSrcEndpoint,
// 🚨 SECURITY: Catch uses of fragments pulled from URL to construct another target
// (in src-cli). We only pass the constructed URL to src-cli, which we trust not to
// ship the values to a third party, but not to trust to ensure the values are absent
// from the command's stdout or stderr streams.
config.Shared.FrontendUsername: "USERNAME_REMOVED",
config.Shared.FrontendPassword: "PASSWORD_REMOVED",
// 🚨 SECURITY: Redact the access token used for src-cli to talk to
// Sourcegraph instance.
token: "SRC_ACCESS_TOKEN_REMOVED",
},
}, nil
}
const (
accessTokenNote = "batch-spec-execution"
accessTokenScope = "user:all"

View File

@ -22,76 +22,6 @@ import (
"github.com/sourcegraph/sourcegraph/schema"
)
func TestTransformRecord(t *testing.T) {
accessToken := "thisissecret-dont-tell-anyone"
database.Mocks.AccessTokens.Create = func(subjectUserID int32, scopes []string, note string, creatorID int32) (int64, string, error) {
return 1234, accessToken, nil
}
t.Cleanup(func() { database.Mocks.AccessTokens.Create = nil })
testBatchSpec := `batchSpec: yeah`
index := &btypes.BatchSpecExecution{
ID: 42,
UserID: 1,
NamespaceUserID: 1,
BatchSpec: testBatchSpec,
}
conf.Mock(&conf.Unified{SiteConfiguration: schema.SiteConfiguration{ExternalURL: "https://test.io"}})
t.Cleanup(func() {
conf.Mock(nil)
})
config := &Config{
Shared: &config.SharedConfig{
FrontendUsername: "test*",
FrontendPassword: "hunter2",
},
}
database.Mocks.Users.GetByID = func(ctx context.Context, id int32) (*types.User, error) {
return &types.User{
Username: "john_namespace",
}, nil
}
t.Cleanup(func() {
database.Mocks.Users.GetByID = nil
})
job, err := transformRecord(context.Background(), &dbtesting.MockDB{}, index, config)
if err != nil {
t.Fatalf("unexpected error transforming record: %s", err)
}
expected := apiclient.Job{
ID: 42,
VirtualMachineFiles: map[string]string{"spec.yml": testBatchSpec},
CliSteps: []apiclient.CliStep{
{
Commands: []string{
"batch", "preview",
"-f", "spec.yml",
"-text-only",
"-skip-errors",
"-n", "john_namespace",
},
Dir: ".",
Env: []string{
"SRC_ENDPOINT=https://test%2A:hunter2@test.io",
"SRC_ACCESS_TOKEN=" + accessToken,
},
},
},
RedactedValues: map[string]string{
"https://test%2A:hunter2@test.io": "https://USERNAME_REMOVED:PASSWORD_REMOVED@test.io",
"test*": "USERNAME_REMOVED",
"hunter2": "PASSWORD_REMOVED",
accessToken: "SRC_ACCESS_TOKEN_REMOVED",
},
}
if diff := cmp.Diff(expected, job); diff != "" {
t.Errorf("unexpected job (-want +got):\n%s", diff)
}
}
func TestTransformBatchSpecWorkspaceExecutionJobRecord(t *testing.T) {
accessToken := "thisissecret-dont-tell-anyone"
database.Mocks.AccessTokens.Create = func(subjectUserID int32, scopes []string, note string, creatorID int32) (int64, string, error) {

View File

@ -18,7 +18,6 @@ func Routines(ctx context.Context, batchesStore *store.Store, cf *httpcli.Factor
reconcilerWorkerStore := NewReconcilerDBWorkerStore(batchesStore.Handle(), observationContext)
bulkProcessorWorkerStore := NewBulkOperationDBWorkerStore(batchesStore.Handle(), observationContext)
specExecutionWorkerStore := NewExecutorStore(batchesStore.Handle(), observationContext)
batchSpecWorkspaceExecutionWorkerStore := NewBatchSpecWorkspaceExecutionWorkerStore(batchesStore.Handle(), observationContext)
batchSpecResolutionWorkerStore := newBatchSpecResolutionWorkerStore(batchesStore.Handle(), observationContext)
@ -34,8 +33,6 @@ func Routines(ctx context.Context, batchesStore *store.Store, cf *httpcli.Factor
newBulkOperationWorker(ctx, batchesStore, bulkProcessorWorkerStore, sourcer, metrics),
newBulkOperationWorkerResetter(bulkProcessorWorkerStore, metrics),
newBatchSpecExecutionResetter(specExecutionWorkerStore, metrics),
newBatchSpecResolutionWorker(ctx, batchesStore, batchSpecResolutionWorkerStore, metrics),
newBatchSpecResolutionWorkerResetter(batchSpecResolutionWorkerStore, metrics),

View File

@ -22,7 +22,17 @@ import (
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
// batchSpecWorkspaceExecutionJobStalledJobMaximumAge is the maximum allowable
// duration between updating the state of a job as "processing" and locking the
// record during processing. An unlocked row that is marked as processing
// likely indicates that the executor that dequeued the job has died. There
// should be a nearly-zero delay between these states during normal operation.
const batchSpecWorkspaceExecutionJobStalledJobMaximumAge = time.Second * 25
// batchSpecWorkspaceExecutionJobMaximumNumResets is the maximum number of
// times a job can be reset. If a job's failed attempts counter reaches this
// threshold, it will be moved into "errored" rather than "queued" on its next
// reset.
const batchSpecWorkspaceExecutionJobMaximumNumResets = 3
func scanFirstBatchSpecWorkspaceExecutionJobRecord(rows *sql.Rows, err error) (workerutil.Record, bool, error) {

View File

@ -249,3 +249,5 @@ stdout: {"operation":"UPLOADING_CHANGESET_SPECS","timestamp":"2021-09-09T13:20:3
})
}
}
func intptr(i int) *int { return &i }

View File

@ -1,21 +0,0 @@
package background
import (
"time"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
// newBatchSpecExecutionResetter creates a dbworker.Resetter that re-enqueues
// lost batch_spec_execution jobs for processing.
func newBatchSpecExecutionResetter(workerStore dbworkerstore.Store, metrics batchChangesMetrics) *dbworker.Resetter {
options := dbworker.ResetterOptions{
Name: "batch_spec_executor_resetter",
Interval: 1 * time.Minute,
Metrics: metrics.executionResetterMetrics,
}
resetter := dbworker.NewResetter(workerStore, options)
return resetter
}

View File

@ -1,194 +0,0 @@
package background
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/cockroachdb/errors"
"github.com/graph-gophers/graphql-go"
"github.com/graph-gophers/graphql-go/relay"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
batcheslib "github.com/sourcegraph/sourcegraph/lib/batches"
)
// executorStalledJobMaximumAge is the maximum allowable duration between updating the state of a
// job as "processing" and locking the record during processing. An unlocked row that is
// marked as processing likely indicates that the executor that dequeued the job has died.
// There should be a nearly-zero delay between these states during normal operation.
const executorStalledJobMaximumAge = time.Second * 25
// executorMaximumNumResets is the maximum number of times a job can be reset. If a job's failed
// attempts counter reaches this threshold, it will be moved into "errored" rather than
// "queued" on its next reset.
const executorMaximumNumResets = 3
var executorWorkerStoreOptions = dbworkerstore.Options{
Name: "batch_spec_executor_worker_store",
TableName: "batch_spec_executions",
ColumnExpressions: store.BatchSpecExecutionColumns,
Scan: scanFirstExecutionRecord,
OrderByExpression: sqlf.Sprintf("batch_spec_executions.created_at, batch_spec_executions.id"),
StalledMaxAge: executorStalledJobMaximumAge,
MaxNumResets: executorMaximumNumResets,
// Explicitly disable retries.
MaxNumRetries: 0,
}
type ExecutorStore interface {
dbworkerstore.Store
FetchCanceled(ctx context.Context, executorName string) (canceledIDs []int, err error)
}
// NewExecutorStore creates a dbworker store that wraps the batch_spec_executions
// table.
func NewExecutorStore(handle *basestore.TransactableHandle, observationContext *observation.Context) ExecutorStore {
return &executorStore{
Store: dbworkerstore.NewWithMetrics(handle, executorWorkerStoreOptions, observationContext),
observationContext: observationContext,
}
}
var _ dbworkerstore.Store = &executorStore{}
// executorStore is a thin wrapper around dbworkerstore.Store that allows us to
// extract information out of the ExecutionLogEntry field and persisting it to
// separate columns when marking a job as complete.
type executorStore struct {
dbworkerstore.Store
observationContext *observation.Context
}
// markCompleteQuery is taken from internal/workerutil/dbworker/store/store.go
//
// If that one changes we need to update this one here too.
const markCompleteQuery = `
UPDATE batch_spec_executions
SET state = 'completed', finished_at = clock_timestamp(), batch_spec_id = (SELECT id FROM batch_specs WHERE rand_id = %s)
WHERE id = %s AND state = 'processing' AND worker_hostname = %s
RETURNING id
`
func (s *executorStore) MarkComplete(ctx context.Context, id int, options dbworkerstore.MarkFinalOptions) (_ bool, err error) {
batchesStore := store.New(s.Store.Handle().DB(), s.observationContext, nil)
batchSpecRandID, err := loadAndExtractBatchSpecRandID(ctx, batchesStore, int64(id))
if err != nil {
// If we couldn't extract the batch spec rand id, we mark the job as failed
return s.Store.MarkFailed(ctx, id, fmt.Sprintf("failed to extract batch spec ID: %s", err), options)
}
_, ok, err := basestore.ScanFirstInt(batchesStore.Query(ctx, sqlf.Sprintf(markCompleteQuery, batchSpecRandID, id, options.WorkerHostname)))
return ok, err
}
func (s *executorStore) FetchCanceled(ctx context.Context, executorName string) (canceledIDs []int, err error) {
batchesStore := store.New(s.Store.Handle().DB(), s.observationContext, nil)
t := true
cs, _, err := batchesStore.ListBatchSpecExecutions(ctx, store.ListBatchSpecExecutionsOpts{
Cancel: &t,
State: btypes.BatchSpecExecutionStateProcessing,
WorkerHostname: executorName,
})
if err != nil {
return nil, err
}
ids := make([]int, 0, len(cs))
for _, c := range cs {
ids = append(ids, c.RecordID())
}
return ids, nil
}
func loadAndExtractBatchSpecRandID(ctx context.Context, s *store.Store, id int64) (string, error) {
exec, err := s.GetBatchSpecExecution(ctx, store.GetBatchSpecExecutionOpts{ID: id})
if err != nil {
return "", err
}
if len(exec.ExecutionLogs) < 1 {
return "", errors.New("no execution logs")
}
return extractBatchSpecRandID(exec.ExecutionLogs)
}
var ErrNoBatchSpecRandID = errors.New("no batch spec rand id found in execution logs")
func extractBatchSpecRandID(logs []workerutil.ExecutionLogEntry) (string, error) {
var (
entry workerutil.ExecutionLogEntry
found bool
)
for _, e := range logs {
if e.Key == "step.src.0" {
entry = e
found = true
break
}
}
if !found {
return "", ErrNoBatchSpecRandID
}
var batchSpecRandID string
for _, l := range strings.Split(entry.Out, "\n") {
const outputLinePrefix = "stdout: "
if !strings.HasPrefix(l, outputLinePrefix) {
continue
}
jsonPart := l[len(outputLinePrefix):]
var e batcheslib.LogEvent
if err := json.Unmarshal([]byte(jsonPart), &e); err != nil {
// If we can't unmarshal the line as JSON we skip it
continue
}
if e.Operation == batcheslib.LogEventOperationCreatingBatchSpec && e.Status == batcheslib.LogEventStatusSuccess {
url, ok := e.Metadata["batchSpecURL"]
if !ok {
return "", ErrNoBatchSpecRandID
}
urlStr, ok := url.(string)
if !ok {
return "", ErrNoBatchSpecRandID
}
parts := strings.Split(urlStr, "/")
if len(parts) == 0 {
return "", ErrNoBatchSpecRandID
}
batchSpecGraphQLID := graphql.ID(parts[len(parts)-1])
if err := relay.UnmarshalSpec(batchSpecGraphQLID, &batchSpecRandID); err != nil {
// If we can't extract the ID we simply return our main error
return "", ErrNoBatchSpecRandID
}
return batchSpecRandID, nil
}
}
return batchSpecRandID, ErrNoBatchSpecRandID
}
// scanFirstExecutionRecord scans a slice of batch change executions and returns the first.
func scanFirstExecutionRecord(rows *sql.Rows, err error) (workerutil.Record, bool, error) {
return store.ScanFirstBatchSpecExecution(rows, err)
}

View File

@ -1,230 +0,0 @@
package background
import (
"context"
"testing"
"time"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
ct "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/testing"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
)
func TestLoadAndExtractBatchSpecRandID(t *testing.T) {
db := dbtest.NewDB(t, "")
user := ct.CreateTestUser(t, db, true)
s := store.New(db, &observation.TestContext, nil)
workStore := dbworkerstore.NewWithMetrics(s.Handle(), executorWorkerStoreOptions, &observation.TestContext)
t.Run("success", func(t *testing.T) {
specExec := &btypes.BatchSpecExecution{
State: btypes.BatchSpecExecutionStateProcessing,
BatchSpec: `name: testing`,
UserID: user.ID,
NamespaceUserID: user.ID,
}
if err := s.CreateBatchSpecExecution(context.Background(), specExec); err != nil {
t.Fatal(err)
}
entries := []workerutil.ExecutionLogEntry{
{
Key: "setup.firecracker.start",
Command: []string{"ignite", "run"},
StartTime: time.Now().Add(-5 * time.Second),
Out: `stdout: cool`,
DurationMs: intptr(200),
},
{
Key: "step.src.0",
Command: []string{"src", "batch", "preview", "-f", "spec.yml", "-text-only"},
StartTime: time.Now().Add(-5 * time.Second),
Out: `stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"STARTED"}
stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"SUCCESS"}
stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.528Z","status":"STARTED"}
stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.535Z","status":"SUCCESS","metadata":{"batchSpecURL":"http://USERNAME_REMOVED:PASSWORD_REMOVED@localhost:3080/users/mrnugget/batch-changes/apply/QmF0Y2hTcGVjOiJBZFBMTDU5SXJmWCI="}}
`,
DurationMs: intptr(200),
},
}
for i, e := range entries {
entryID, err := workStore.AddExecutionLogEntry(context.Background(), int(specExec.ID), e, dbworkerstore.ExecutionLogEntryOptions{})
if err != nil {
t.Fatal(err)
}
if entryID != i+1 {
t.Fatalf("AddExecutionLogEntry returned wrong entryID. want=%d, have=%d", i+1, entryID)
}
}
want := "AdPLL59IrfX"
have, err := loadAndExtractBatchSpecRandID(context.Background(), s, specExec.ID)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if have != want {
t.Fatalf("wrong rand id extracted. want=%s, have=%s", want, have)
}
})
t.Run("without log entry", func(t *testing.T) {
specExec := &btypes.BatchSpecExecution{
State: btypes.BatchSpecExecutionStateProcessing,
BatchSpec: `name: testing`,
UserID: user.ID,
NamespaceUserID: user.ID,
}
if err := s.CreateBatchSpecExecution(context.Background(), specExec); err != nil {
t.Fatal(err)
}
_, err := loadAndExtractBatchSpecRandID(context.Background(), s, specExec.ID)
if err == nil {
t.Fatalf("expected error but got none")
}
if err.Error() != "no execution logs" {
t.Fatalf("wrong error: %q", err.Error())
}
})
}
func TestExtractBatchSpecRandID(t *testing.T) {
tests := []struct {
name string
entries []workerutil.ExecutionLogEntry
wantRandID string
wantErr error
}{
{
name: "success",
entries: []workerutil.ExecutionLogEntry{
{Key: "setup.firecracker.start"},
// Reduced log output because we don't care about _all_ lines
{
Key: "step.src.0",
Out: `stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"STARTED"}
stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"SUCCESS"}
stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.528Z","status":"STARTED"}
stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.535Z","status":"SUCCESS","metadata":{"batchSpecURL":"http://USERNAME_REMOVED:PASSWORD_REMOVED@localhost:3080/users/mrnugget/batch-changes/apply/QmF0Y2hTcGVjOiJBZFBMTDU5SXJmWCI="}}
`,
},
},
// Run `echo "QmF0Y2hTcGVjOiJBZFBMTDU5SXJmWCI=" |base64 -d` to get this
wantRandID: "AdPLL59IrfX",
},
{
name: "no step.src.0 log entry",
entries: []workerutil.ExecutionLogEntry{},
wantErr: ErrNoBatchSpecRandID,
},
{
name: "no url in the output",
entries: []workerutil.ExecutionLogEntry{
{
Key: "step.src.0",
Out: `stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"STARTED"}
stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"SUCCESS"}
`,
},
},
wantErr: ErrNoBatchSpecRandID,
},
{
name: "invalid url in the output",
entries: []workerutil.ExecutionLogEntry{
{
Key: "step.src.0",
Out: `stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"STARTED"}
stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"SUCCESS"}
stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.528Z","status":"STARTED"}
stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.535Z","status":"SUCCESS","metadata":{"batchSpecURL":"http://horse.txt"}}
`,
},
},
wantErr: ErrNoBatchSpecRandID,
},
{
name: "additional text in log output",
entries: []workerutil.ExecutionLogEntry{
{
Key: "step.src.0",
Out: `stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"STARTED"}
stderr: HORSE
stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"SUCCESS"}
stderr: HORSE
stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.528Z","status":"STARTED"}
stderr: HORSE
stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.535Z","status":"SUCCESS","metadata":{"batchSpecURL":"http://USERNAME_REMOVED:PASSWORD_REMOVED@localhost:3080/users/mrnugget/batch-changes/apply/QmF0Y2hTcGVjOiJBZFBMTDU5SXJmWCI="}}
`,
},
},
wantRandID: "AdPLL59IrfX",
},
{
name: "invalid json",
entries: []workerutil.ExecutionLogEntry{
{
Key: "step.src.0",
Out: `stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.481Z","status":"STARTED"}
stdout: {HOOOORSE}
stdout: {HORSE}
stdout: {HORSE}
`,
},
},
wantErr: ErrNoBatchSpecRandID,
},
{
name: "non-json output inbetween valid json",
entries: []workerutil.ExecutionLogEntry{
{
Key: "step.src.0",
Out: `stdout: {"operation":"PARSING_BATCH_SPEC","timestamp":"2021-07-12T12:25:33.965Z","status":"STARTED"}
stdout: No changeset specs created
stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-12T12:26:01.165Z","status":"SUCCESS","metadata":{"batchSpecURL":"https://example.com/users/erik/batch-changes/apply/QmF0Y2hTcGVjOiI5cFZPcHJyTUhNQiI="}}`,
},
},
wantRandID: "9pVOprrMHMB",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
have, err := extractBatchSpecRandID(tt.entries)
if tt.wantErr != nil {
if err != tt.wantErr {
t.Fatalf("wrong error. want=%s, got=%s", tt.wantErr, err)
}
return
}
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if have != tt.wantRandID {
t.Fatalf("wrong batch spec rand id extracted. want=%q, have=%q", tt.wantRandID, have)
}
})
}
}
func intptr(v int) *int { return &v }

View File

@ -15,7 +15,6 @@ type batchChangesMetrics struct {
bulkProcessorWorkerMetrics workerutil.WorkerMetrics
reconcilerWorkerResetterMetrics dbworker.ResetterMetrics
bulkProcessorWorkerResetterMetrics dbworker.ResetterMetrics
executionResetterMetrics dbworker.ResetterMetrics
batchSpecResolutionWorkerMetrics workerutil.WorkerMetrics
batchSpecResolutionWorkerResetterMetrics dbworker.ResetterMetrics
@ -29,7 +28,6 @@ func newMetrics(observationContext *observation.Context) batchChangesMetrics {
bulkProcessorWorkerMetrics: workerutil.NewMetrics(observationContext, "batch_changes_bulk_processor", nil),
reconcilerWorkerResetterMetrics: makeResetterMetrics(observationContext, "batch_changes_reconciler"),
bulkProcessorWorkerResetterMetrics: makeResetterMetrics(observationContext, "batch_changes_bulk_processor"),
executionResetterMetrics: makeResetterMetrics(observationContext, "batch_spec_executor"),
batchSpecResolutionWorkerMetrics: workerutil.NewMetrics(observationContext, "batch_changes_batch_spec_resolution_worker", nil),
batchSpecResolutionWorkerResetterMetrics: makeResetterMetrics(observationContext, "batch_changes_batch_spec_resolution_worker_resetter"),

View File

@ -1,385 +0,0 @@
package store
import (
"context"
"database/sql"
"github.com/cockroachdb/errors"
"github.com/keegancsmith/sqlf"
"github.com/lib/pq"
"github.com/opentracing/opentracing-go/log"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
var BatchSpecExecutionColumns = []*sqlf.Query{
sqlf.Sprintf("batch_spec_executions.id"),
sqlf.Sprintf("batch_spec_executions.rand_id"),
sqlf.Sprintf("batch_spec_executions.state"),
sqlf.Sprintf("batch_spec_executions.failure_message"),
sqlf.Sprintf("batch_spec_executions.started_at"),
sqlf.Sprintf("batch_spec_executions.finished_at"),
sqlf.Sprintf("batch_spec_executions.process_after"),
sqlf.Sprintf("batch_spec_executions.num_resets"),
sqlf.Sprintf("batch_spec_executions.num_failures"),
sqlf.Sprintf(`batch_spec_executions.execution_logs`),
sqlf.Sprintf("batch_spec_executions.worker_hostname"),
sqlf.Sprintf(`batch_spec_executions.created_at`),
sqlf.Sprintf(`batch_spec_executions.updated_at`),
sqlf.Sprintf(`batch_spec_executions.batch_spec`),
sqlf.Sprintf(`batch_spec_executions.batch_spec_id`),
sqlf.Sprintf(`batch_spec_executions.user_id`),
sqlf.Sprintf(`batch_spec_executions.namespace_user_id`),
sqlf.Sprintf(`batch_spec_executions.namespace_org_id`),
sqlf.Sprintf(`batch_spec_executions.cancel`),
}
var batchSpecExecutionInsertColumns = []*sqlf.Query{
sqlf.Sprintf("rand_id"),
sqlf.Sprintf("batch_spec"),
sqlf.Sprintf("batch_spec_id"),
sqlf.Sprintf("user_id"),
sqlf.Sprintf("namespace_user_id"),
sqlf.Sprintf("namespace_org_id"),
sqlf.Sprintf("created_at"),
sqlf.Sprintf("updated_at"),
}
// CreateBatchSpecExecution creates the given BatchSpecExecution.
func (s *Store) CreateBatchSpecExecution(ctx context.Context, b *btypes.BatchSpecExecution) (err error) {
ctx, endObservation := s.operations.createBatchSpecExecution.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})
if b.CreatedAt.IsZero() {
b.CreatedAt = s.now()
}
if b.UpdatedAt.IsZero() {
b.UpdatedAt = b.CreatedAt
}
q, err := createBatchSpecExecutionQuery(b)
if err != nil {
return err
}
return s.query(ctx, q, func(sc scanner) error { return scanBatchSpecExecution(b, sc) })
}
var createBatchSpecExecutionQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_spec_executions.go:CreateBatchSpecExecution
INSERT INTO batch_spec_executions (%s)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING %s`
func createBatchSpecExecutionQuery(c *btypes.BatchSpecExecution) (*sqlf.Query, error) {
if c.RandID == "" {
var err error
if c.RandID, err = RandomID(); err != nil {
return nil, errors.Wrap(err, "creating RandID failed")
}
}
return sqlf.Sprintf(
createBatchSpecExecutionQueryFmtstr,
sqlf.Join(batchSpecExecutionInsertColumns, ", "),
c.RandID,
c.BatchSpec,
nullInt64Column(c.BatchSpecID),
c.UserID,
nullInt32Column(c.NamespaceUserID),
nullInt32Column(c.NamespaceOrgID),
c.CreatedAt,
c.UpdatedAt,
sqlf.Join(BatchSpecExecutionColumns, ", "),
), nil
}
// CancelBatchSpecExecution cancels the given BatchSpecExecution.
func (s *Store) CancelBatchSpecExecution(ctx context.Context, randID string) (exec *btypes.BatchSpecExecution, err error) {
ctx, endObservation := s.operations.cancelBatchSpecExecution.With(ctx, &err, observation.Args{LogFields: []log.Field{log.String("randID", randID)}})
defer endObservation(1, observation.Args{})
q := s.cancelBatchSpecExecutionQuery(randID)
var b btypes.BatchSpecExecution
err = s.query(ctx, q, func(sc scanner) error { return scanBatchSpecExecution(&b, sc) })
if err != nil {
return nil, err
}
if b.ID == 0 {
return nil, ErrNoResults
}
return &b, err
}
var cancelBatchSpecExecutionQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_spec_executions.go:CancelBatchSpecExecution
WITH candidate AS (
SELECT
id
FROM
batch_spec_executions
WHERE
rand_id = %s
AND
-- It must be queued or processing, we cannot cancel jobs that have already completed.
state IN (%s, %s)
FOR UPDATE
)
UPDATE
batch_spec_executions
SET
cancel = TRUE,
-- If the execution is still queued, we directly abort, otherwise we keep the
-- state, so the worker can to teardown and, at some point, mark it failed itself.
state = CASE WHEN batch_spec_executions.state = %s THEN batch_spec_executions.state ELSE %s END,
finished_at = CASE WHEN batch_spec_executions.state = %s THEN batch_spec_executions.finished_at ELSE %s END,
updated_at = %s
WHERE
id IN (SELECT id FROM candidate)
RETURNING %s
`
func (s *Store) cancelBatchSpecExecutionQuery(randID string) *sqlf.Query {
return sqlf.Sprintf(
cancelBatchSpecExecutionQueryFmtstr,
randID,
btypes.BatchSpecExecutionStateQueued,
btypes.BatchSpecExecutionStateProcessing,
btypes.BatchSpecExecutionStateProcessing,
btypes.BatchSpecExecutionStateFailed,
btypes.BatchSpecExecutionStateProcessing,
s.now(),
s.now(),
sqlf.Join(BatchSpecExecutionColumns, ", "),
)
}
// GetBatchSpecExecutionOpts captures the query options needed for getting a BatchSpecExecution.
type GetBatchSpecExecutionOpts struct {
ID int64
RandID string
}
// GetBatchSpecExecution gets a BatchSpecExecution matching the given options.
func (s *Store) GetBatchSpecExecution(ctx context.Context, opts GetBatchSpecExecutionOpts) (exec *btypes.BatchSpecExecution, err error) {
ctx, endObservation := s.operations.getBatchSpecExecution.With(ctx, &err, observation.Args{LogFields: []log.Field{
log.Int("ID", int(opts.ID)),
log.String("randID", opts.RandID),
}})
defer endObservation(1, observation.Args{})
q, err := getBatchSpecExecutionQuery(&opts)
if err != nil {
return nil, err
}
var b btypes.BatchSpecExecution
err = s.query(ctx, q, func(sc scanner) (err error) {
return scanBatchSpecExecution(&b, sc)
})
if err != nil {
return nil, err
}
if b.ID == 0 {
return nil, ErrNoResults
}
return &b, nil
}
var getBatchSpecExecutionQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_spec_executions.go:GetBatchSpecExecution
SELECT %s FROM batch_spec_executions
WHERE %s
LIMIT 1
`
func getBatchSpecExecutionQuery(opts *GetBatchSpecExecutionOpts) (*sqlf.Query, error) {
var preds []*sqlf.Query
if opts.ID != 0 {
preds = append(preds, sqlf.Sprintf("id = %s", opts.ID))
}
if opts.RandID != "" {
preds = append(preds, sqlf.Sprintf("rand_id = %s", opts.RandID))
}
if len(preds) == 0 {
return nil, errors.New("no predicates given")
}
return sqlf.Sprintf(
getBatchSpecExecutionQueryFmtstr,
sqlf.Join(BatchSpecExecutionColumns, ", "),
sqlf.Join(preds, "\n AND "),
), nil
}
// ListBatchSpecExecutionsOpts captures the query options needed for
// listing batch spec executions.
type ListBatchSpecExecutionsOpts struct {
LimitOpts
Cursor int64
Cancel *bool
State btypes.BatchSpecExecutionState
WorkerHostname string
}
// ListBatchSpecExecutions lists batch changes with the given filters.
func (s *Store) ListBatchSpecExecutions(ctx context.Context, opts ListBatchSpecExecutionsOpts) (cs []*btypes.BatchSpecExecution, next int64, err error) {
ctx, endObservation := s.operations.listBatchSpecExecutions.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})
q := listBatchSpecExecutionsQuery(opts)
cs = make([]*btypes.BatchSpecExecution, 0, opts.DBLimit())
err = s.query(ctx, q, func(sc scanner) error {
var c btypes.BatchSpecExecution
if err := scanBatchSpecExecution(&c, sc); err != nil {
return err
}
cs = append(cs, &c)
return nil
})
if opts.Limit != 0 && len(cs) == opts.DBLimit() {
next = cs[len(cs)-1].ID
cs = cs[:len(cs)-1]
}
return cs, next, err
}
var listBatchSpecExecutionsQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_spec_execution.go:ListBatchSpecExecutions
SELECT %s FROM batch_spec_executions
WHERE %s
ORDER BY id DESC
`
func listBatchSpecExecutionsQuery(opts ListBatchSpecExecutionsOpts) *sqlf.Query {
preds := []*sqlf.Query{}
if opts.Cursor != 0 {
preds = append(preds, sqlf.Sprintf("batch_spec_executions.id <= %s", opts.Cursor))
}
if opts.Cancel != nil {
preds = append(preds, sqlf.Sprintf("batch_spec_executions.cancel = %s", *opts.Cancel))
}
if opts.State != "" {
preds = append(preds, sqlf.Sprintf("batch_spec_executions.state = %s", opts.State))
}
if opts.WorkerHostname != "" {
preds = append(preds, sqlf.Sprintf("batch_spec_executions.worker_hostname = %s", opts.WorkerHostname))
}
if len(preds) == 0 {
preds = append(preds, sqlf.Sprintf("TRUE"))
}
return sqlf.Sprintf(
listBatchSpecExecutionsQueryFmtstr+opts.LimitOpts.ToDB(),
sqlf.Join(BatchSpecExecutionColumns, ", "),
sqlf.Join(preds, "\n AND "),
)
}
func scanBatchSpecExecution(b *btypes.BatchSpecExecution, sc scanner) error {
var executionLogs []dbworkerstore.ExecutionLogEntry
if err := sc.Scan(
&b.ID,
&b.RandID,
&b.State,
&b.FailureMessage,
&b.StartedAt,
&b.FinishedAt,
&b.ProcessAfter,
&b.NumResets,
&b.NumFailures,
pq.Array(&executionLogs),
&b.WorkerHostname,
&b.CreatedAt,
&b.UpdatedAt,
&b.BatchSpec,
&dbutil.NullInt64{N: &b.BatchSpecID},
&b.UserID,
&dbutil.NullInt32{N: &b.NamespaceUserID},
&dbutil.NullInt32{N: &b.NamespaceOrgID},
&b.Cancel,
); err != nil {
return err
}
for _, entry := range executionLogs {
b.ExecutionLogs = append(b.ExecutionLogs, workerutil.ExecutionLogEntry(entry))
}
return nil
}
// scanBatchSpecExecutions scans a slice of batch spec executions from the rows.
func scanBatchSpecExecutions(rows *sql.Rows, queryErr error) (_ []*btypes.BatchSpecExecution, err error) {
if queryErr != nil {
return nil, queryErr
}
defer func() { err = basestore.CloseRows(rows, err) }()
var execs []*btypes.BatchSpecExecution
for rows.Next() {
exec := &btypes.BatchSpecExecution{}
if err := scanBatchSpecExecution(exec, rows); err != nil {
return nil, err
}
execs = append(execs, exec)
}
return execs, nil
}
// ScanFirstBatchSpecExecution scans a slice of batch spec executions from the
// rows and returns the first.
func ScanFirstBatchSpecExecution(rows *sql.Rows, err error) (*btypes.BatchSpecExecution, bool, error) {
execs, err := scanBatchSpecExecutions(rows, err)
if err != nil || len(execs) == 0 {
return &btypes.BatchSpecExecution{}, false, err
}
return execs[0], true, nil
}
// CountBatchSpecExecutionsOpts captures the query options needed for
// counting batch spec executions.
type CountBatchSpecExecutionsOpts struct {
// Nothing yet.
}
// CountBatchSpecExecutions returns the number of batch spec executions in the database.
func (s *Store) CountBatchSpecExecutions(ctx context.Context, opts CountBatchSpecExecutionsOpts) (count int, err error) {
return s.queryCount(ctx, countBatchSpecExecutionsQuery(&opts))
}
var countBatchSpecExecutionsQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_spec_execution.go:CountBatchSpecExecutions
SELECT COUNT(batch_spec_executions.id)
FROM batch_spec_executions
WHERE %s
`
func countBatchSpecExecutionsQuery(opts *CountBatchSpecExecutionsOpts) *sqlf.Query {
// Nothing yet.
return sqlf.Sprintf(countBatchSpecExecutionsQueryFmtstr, "TRUE")
}

View File

@ -1,280 +0,0 @@
package store
import (
"context"
"strconv"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/keegancsmith/sqlf"
ct "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/testing"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
)
func testStoreBatchSpecExecutions(t *testing.T, ctx context.Context, s *Store, clock ct.Clock) {
testBatchSpec := `theSpec: yeah`
execs := make([]*btypes.BatchSpecExecution, 0, 2)
for i := 0; i < cap(execs); i++ {
c := &btypes.BatchSpecExecution{
State: btypes.BatchSpecExecutionStateQueued,
BatchSpec: testBatchSpec,
UserID: int32(i + 123),
NamespaceUserID: int32(i + 345),
BatchSpecID: int64(i + 567),
}
execs = append(execs, c)
}
t.Run("Create", func(t *testing.T) {
for _, exec := range execs {
if err := s.CreateBatchSpecExecution(ctx, exec); err != nil {
t.Fatal(err)
}
have := exec
want := &btypes.BatchSpecExecution{
ID: have.ID,
RandID: have.RandID,
CreatedAt: clock.Now(),
UpdatedAt: clock.Now(),
State: btypes.BatchSpecExecutionStateQueued,
BatchSpec: have.BatchSpec,
UserID: have.UserID,
NamespaceUserID: have.NamespaceUserID,
BatchSpecID: have.BatchSpecID,
}
if have.ID == 0 {
t.Fatal("ID should not be zero")
}
if have.RandID == "" {
t.Fatal("RandID should not be empty")
}
if diff := cmp.Diff(have, want); diff != "" {
t.Fatal(diff)
}
}
})
t.Run("Get", func(t *testing.T) {
t.Run("GetByID", func(t *testing.T) {
for i, exec := range execs {
t.Run(strconv.Itoa(i), func(t *testing.T) {
have, err := s.GetBatchSpecExecution(ctx, GetBatchSpecExecutionOpts{ID: exec.ID})
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(have, exec); diff != "" {
t.Fatal(diff)
}
})
}
})
t.Run("GetByRandID", func(t *testing.T) {
for i, exec := range execs {
t.Run(strconv.Itoa(i), func(t *testing.T) {
have, err := s.GetBatchSpecExecution(ctx, GetBatchSpecExecutionOpts{RandID: exec.RandID})
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(have, exec); diff != "" {
t.Fatal(diff)
}
})
}
})
t.Run("NoResults", func(t *testing.T) {
opts := GetBatchSpecExecutionOpts{ID: 0xdeadbeef}
_, have := s.GetBatchSpecExecution(ctx, opts)
want := ErrNoResults
if have != want {
t.Fatalf("have err %v, want %v", have, want)
}
})
})
t.Run("List", func(t *testing.T) {
execs[0].WorkerHostname = "asdf-host"
execs[0].Cancel = true
execs[0].State = btypes.BatchSpecExecutionStateProcessing
if err := s.Exec(ctx, sqlf.Sprintf("UPDATE batch_spec_executions SET worker_hostname = %s, cancel = %s, state = %s WHERE id = %s", execs[0].WorkerHostname, execs[0].Cancel, execs[0].State, execs[0].ID)); err != nil {
t.Fatal(err)
}
execs[1].WorkerHostname = "nvm-host"
if err := s.Exec(ctx, sqlf.Sprintf("UPDATE batch_spec_executions SET worker_hostname = %s WHERE id = %s", execs[1].WorkerHostname, execs[1].ID)); err != nil {
t.Fatal(err)
}
// The batch spec execution store returns the executions in reversed order.
reversedBatchSpecExecutions := make([]*btypes.BatchSpecExecution, len(execs))
for i, c := range execs {
reversedBatchSpecExecutions[len(execs)-i-1] = c
}
t.Run("All", func(t *testing.T) {
have, _, err := s.ListBatchSpecExecutions(ctx, ListBatchSpecExecutionsOpts{})
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(have, reversedBatchSpecExecutions); diff != "" {
t.Fatalf("invalid executions returned: %s", diff)
}
})
t.Run("WorkerHostname", func(t *testing.T) {
for _, exec := range reversedBatchSpecExecutions {
have, _, err := s.ListBatchSpecExecutions(ctx, ListBatchSpecExecutionsOpts{
WorkerHostname: exec.WorkerHostname,
})
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(have, []*btypes.BatchSpecExecution{exec}); diff != "" {
t.Fatalf("invalid executions returned: %s", diff)
}
}
})
t.Run("State", func(t *testing.T) {
for _, exec := range reversedBatchSpecExecutions {
have, _, err := s.ListBatchSpecExecutions(ctx, ListBatchSpecExecutionsOpts{
State: exec.State,
})
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(have, []*btypes.BatchSpecExecution{exec}); diff != "" {
t.Fatalf("invalid executions returned: %s", diff)
}
}
})
t.Run("Cancel", func(t *testing.T) {
for _, exec := range reversedBatchSpecExecutions {
have, _, err := s.ListBatchSpecExecutions(ctx, ListBatchSpecExecutionsOpts{
Cancel: &exec.Cancel,
})
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(have, []*btypes.BatchSpecExecution{exec}); diff != "" {
t.Fatalf("invalid executions returned: %s", diff)
}
}
})
t.Run("With Limit", func(t *testing.T) {
for i := 1; i <= len(reversedBatchSpecExecutions); i++ {
cs, next, err := s.ListBatchSpecExecutions(ctx, ListBatchSpecExecutionsOpts{LimitOpts: LimitOpts{Limit: i}})
if err != nil {
t.Fatal(err)
}
{
have, want := next, int64(0)
if i < len(reversedBatchSpecExecutions) {
want = reversedBatchSpecExecutions[i].ID
}
if have != want {
t.Fatalf("limit: %v: have next %v, want %v", i, have, want)
}
}
{
have, want := cs, reversedBatchSpecExecutions[:i]
if len(have) != len(want) {
t.Fatalf("listed %d batch changes, want: %d", len(have), len(want))
}
if diff := cmp.Diff(have, want); diff != "" {
t.Fatal(diff)
}
}
}
})
t.Run("With Cursor", func(t *testing.T) {
var cursor int64
for i := 1; i <= len(reversedBatchSpecExecutions); i++ {
opts := ListBatchSpecExecutionsOpts{Cursor: cursor, LimitOpts: LimitOpts{Limit: 1}}
have, next, err := s.ListBatchSpecExecutions(ctx, opts)
if err != nil {
t.Fatal(err)
}
want := reversedBatchSpecExecutions[i-1 : i]
if diff := cmp.Diff(have, want); diff != "" {
t.Fatalf("opts: %+v, diff: %s", opts, diff)
}
cursor = next
}
})
})
t.Run("CancelBatchSpecExecution", func(t *testing.T) {
t.Run("Queued", func(t *testing.T) {
record, err := s.CancelBatchSpecExecution(ctx, execs[1].RandID)
if err != nil {
t.Fatal(err)
}
if have, want := record.State, btypes.BatchSpecExecutionStateFailed; have != want {
t.Errorf("invalid state: have=%q want=%q", have, want)
}
if have, want := record.Cancel, true; have != want {
t.Errorf("invalid cancel value: have=%t want=%t", have, want)
}
if record.FinishedAt == nil {
t.Error("finished_at not set")
} else if have, want := *record.FinishedAt, s.now(); !have.Equal(want) {
t.Errorf("invalid finished_at: have=%s want=%s", have, want)
}
if have, want := record.UpdatedAt, s.now(); !have.Equal(want) {
t.Errorf("invalid updated_at: have=%s want=%s", have, want)
}
})
t.Run("Processing", func(t *testing.T) {
record, err := s.CancelBatchSpecExecution(ctx, execs[0].RandID)
if err != nil {
t.Fatal(err)
}
if have, want := record.State, btypes.BatchSpecExecutionStateProcessing; have != want {
t.Errorf("invalid state: have=%q want=%q", have, want)
}
if have, want := record.Cancel, true; have != want {
t.Errorf("invalid cancel value: have=%t want=%t", have, want)
}
if record.FinishedAt != nil {
t.Error("finished_at set")
}
if have, want := record.UpdatedAt, s.now(); !have.Equal(want) {
t.Errorf("invalid updated_at: have=%s want=%s", have, want)
}
})
t.Run("Invalid current state", func(t *testing.T) {
if err := s.Exec(ctx, sqlf.Sprintf("UPDATE batch_spec_executions SET state = 'completed' WHERE id = %s", execs[0].ID)); err != nil {
t.Fatal(err)
}
_, err := s.CancelBatchSpecExecution(ctx, execs[0].RandID)
if err == nil {
t.Fatal("got unexpected nil error")
}
if err != ErrNoResults {
t.Fatal(err)
}
})
})
}

View File

@ -375,9 +375,6 @@ AND NOT EXISTS (
AND NOT EXISTS (
SELECT 1 FROM changeset_specs WHERE batch_spec_id = batch_specs.id
)
AND NOT EXISTS (
SELECT 1 FROM batch_spec_executions WHERE batch_spec_id = batch_specs.id
);
`
func scanBatchSpec(c *btypes.BatchSpec, s scanner) error {

View File

@ -290,11 +290,10 @@ func testStoreBatchSpecs(t *testing.T, ctx context.Context, s *Store, clock ct.C
overTTL := clock.Now().Add(-btypes.BatchSpecTTL - 1*time.Minute)
tests := []struct {
createdAt time.Time
hasBatchChange bool
hasChangesetSpecs bool
hasBatchSpecExecution bool
wantDeleted bool
createdAt time.Time
hasBatchChange bool
hasChangesetSpecs bool
wantDeleted bool
}{
{createdAt: underTTL, wantDeleted: false},
{createdAt: overTTL, wantDeleted: true},
@ -305,11 +304,8 @@ func testStoreBatchSpecs(t *testing.T, ctx context.Context, s *Store, clock ct.C
{hasBatchChange: true, hasChangesetSpecs: true, createdAt: underTTL, wantDeleted: false},
{hasBatchChange: true, hasChangesetSpecs: true, createdAt: overTTL, wantDeleted: false},
{hasBatchSpecExecution: true, createdAt: underTTL, wantDeleted: false},
{hasBatchSpecExecution: true, createdAt: overTTL, wantDeleted: false},
{hasBatchChange: true, hasBatchSpecExecution: true, hasChangesetSpecs: true, createdAt: underTTL, wantDeleted: false},
{hasBatchChange: true, hasBatchSpecExecution: true, hasChangesetSpecs: true, createdAt: overTTL, wantDeleted: false},
{hasBatchChange: true, hasChangesetSpecs: true, createdAt: underTTL, wantDeleted: false},
{hasBatchChange: true, hasChangesetSpecs: true, createdAt: overTTL, wantDeleted: false},
}
for _, tc := range tests {
@ -347,16 +343,6 @@ func testStoreBatchSpecs(t *testing.T, ctx context.Context, s *Store, clock ct.C
}
}
if tc.hasBatchSpecExecution {
batchSpecExecution := &btypes.BatchSpecExecution{
NamespaceUserID: 1,
BatchSpecID: batchSpec.ID,
}
if err := s.CreateBatchSpecExecution(ctx, batchSpecExecution); err != nil {
t.Fatal(err)
}
}
if err := s.DeleteExpiredBatchSpecs(ctx); err != nil {
t.Fatal(err)
}

View File

@ -34,7 +34,6 @@ func TestIntegration(t *testing.T) {
t.Run("UserDeleteCascades", storeTest(db, nil, testUserDeleteCascades))
t.Run("ChangesetJobs", storeTest(db, nil, testStoreChangesetJobs))
t.Run("BulkOperations", storeTest(db, nil, testStoreBulkOperations))
t.Run("BatchSpecExecutions", storeTest(db, nil, testStoreBatchSpecExecutions))
t.Run("BatchSpecWorkspaces", storeTest(db, nil, testStoreBatchSpecWorkspaces))
t.Run("BatchSpecWorkspaceExecutionJobs", storeTest(db, nil, testStoreBatchSpecWorkspaceExecutionJobs))
t.Run("BatchSpecResolutionJobs", storeTest(db, nil, testStoreBatchSpecResolutionJobs))

View File

@ -1,54 +0,0 @@
package types
import (
"strings"
"time"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
)
type BatchSpecExecutionState string
const (
BatchSpecExecutionStateQueued BatchSpecExecutionState = "queued"
BatchSpecExecutionStateErrored BatchSpecExecutionState = "errored"
BatchSpecExecutionStateFailed BatchSpecExecutionState = "failed"
BatchSpecExecutionStateCompleted BatchSpecExecutionState = "completed"
BatchSpecExecutionStateProcessing BatchSpecExecutionState = "processing"
)
type BatchSpecExecution struct {
ID int64
RandID string
State BatchSpecExecutionState
FailureMessage *string
StartedAt *time.Time
FinishedAt *time.Time
ProcessAfter *time.Time
NumResets int64
NumFailures int64
ExecutionLogs []workerutil.ExecutionLogEntry
WorkerHostname string
CreatedAt time.Time
UpdatedAt time.Time
BatchSpec string
BatchSpecID int64
UserID int32
NamespaceUserID int32
NamespaceOrgID int32
Cancel bool
}
func (e BatchSpecExecution) RecordID() int {
return int(e.ID)
}
func (e BatchSpecExecution) GQLState() string {
if e.Cancel {
if e.State == BatchSpecExecutionStateFailed {
return "CANCELED"
}
return "CANCELING"
}
return strings.ToUpper(string(e.State))
}

View File

@ -76,43 +76,6 @@ Indexes:
```
# Table "public.batch_spec_executions"
```
Column | Type | Collation | Nullable | Default
-------------------+--------------------------+-----------+----------+---------------------------------------------------
id | bigint | | not null | nextval('batch_spec_executions_id_seq'::regclass)
state | text | | | 'queued'::text
failure_message | text | | |
started_at | timestamp with time zone | | |
finished_at | timestamp with time zone | | |
process_after | timestamp with time zone | | |
num_resets | integer | | not null | 0
num_failures | integer | | not null | 0
execution_logs | json[] | | |
worker_hostname | text | | not null | ''::text
created_at | timestamp with time zone | | not null | now()
updated_at | timestamp with time zone | | not null | now()
batch_spec | text | | not null |
batch_spec_id | integer | | |
user_id | integer | | |
namespace_user_id | integer | | |
namespace_org_id | integer | | |
rand_id | text | | not null |
last_heartbeat_at | timestamp with time zone | | |
cancel | boolean | | | false
Indexes:
"batch_spec_executions_pkey" PRIMARY KEY, btree (id)
"batch_spec_executions_rand_id" btree (rand_id)
Check constraints:
"batch_spec_executions_has_1_namespace" CHECK ((namespace_user_id IS NULL) <> (namespace_org_id IS NULL))
Foreign-key constraints:
"batch_spec_executions_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) DEFERRABLE
"batch_spec_executions_namespace_org_id_fkey" FOREIGN KEY (namespace_org_id) REFERENCES orgs(id) DEFERRABLE
"batch_spec_executions_namespace_user_id_fkey" FOREIGN KEY (namespace_user_id) REFERENCES users(id) DEFERRABLE
"batch_spec_executions_user_id_fkey" FOREIGN KEY (user_id) REFERENCES users(id) DEFERRABLE
```
# Table "public.batch_spec_resolution_jobs"
```
Column | Type | Collation | Nullable | Default
@ -217,7 +180,6 @@ Foreign-key constraints:
"batch_specs_user_id_fkey" FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL DEFERRABLE
Referenced by:
TABLE "batch_changes" CONSTRAINT "batch_changes_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) DEFERRABLE
TABLE "batch_spec_executions" CONSTRAINT "batch_spec_executions_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) DEFERRABLE
TABLE "batch_spec_resolution_jobs" CONSTRAINT "batch_spec_resolution_jobs_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) ON DELETE CASCADE DEFERRABLE
TABLE "batch_spec_workspaces" CONSTRAINT "batch_spec_workspaces_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) ON DELETE CASCADE DEFERRABLE
TABLE "changeset_specs" CONSTRAINT "changeset_specs_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) DEFERRABLE
@ -1440,7 +1402,6 @@ Check constraints:
"orgs_name_valid_chars" CHECK (name ~ '^[a-zA-Z0-9](?:[a-zA-Z0-9]|[-.](?=[a-zA-Z0-9]))*-?$'::citext)
Referenced by:
TABLE "batch_changes" CONSTRAINT "batch_changes_namespace_org_id_fkey" FOREIGN KEY (namespace_org_id) REFERENCES orgs(id) ON DELETE CASCADE DEFERRABLE
TABLE "batch_spec_executions" CONSTRAINT "batch_spec_executions_namespace_org_id_fkey" FOREIGN KEY (namespace_org_id) REFERENCES orgs(id) DEFERRABLE
TABLE "cm_monitors" CONSTRAINT "cm_monitors_org_id_fk" FOREIGN KEY (namespace_org_id) REFERENCES orgs(id) ON DELETE CASCADE
TABLE "cm_recipients" CONSTRAINT "cm_recipients_org_id_fk" FOREIGN KEY (namespace_org_id) REFERENCES orgs(id) ON DELETE CASCADE
TABLE "feature_flag_overrides" CONSTRAINT "feature_flag_overrides_namespace_org_id_fkey" FOREIGN KEY (namespace_org_id) REFERENCES orgs(id) ON DELETE CASCADE
@ -2094,8 +2055,6 @@ Referenced by:
TABLE "batch_changes" CONSTRAINT "batch_changes_initial_applier_id_fkey" FOREIGN KEY (initial_applier_id) REFERENCES users(id) ON DELETE SET NULL DEFERRABLE
TABLE "batch_changes" CONSTRAINT "batch_changes_last_applier_id_fkey" FOREIGN KEY (last_applier_id) REFERENCES users(id) ON DELETE SET NULL DEFERRABLE
TABLE "batch_changes" CONSTRAINT "batch_changes_namespace_user_id_fkey" FOREIGN KEY (namespace_user_id) REFERENCES users(id) ON DELETE CASCADE DEFERRABLE
TABLE "batch_spec_executions" CONSTRAINT "batch_spec_executions_namespace_user_id_fkey" FOREIGN KEY (namespace_user_id) REFERENCES users(id) DEFERRABLE
TABLE "batch_spec_executions" CONSTRAINT "batch_spec_executions_user_id_fkey" FOREIGN KEY (user_id) REFERENCES users(id) DEFERRABLE
TABLE "batch_specs" CONSTRAINT "batch_specs_user_id_fkey" FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL DEFERRABLE
TABLE "changeset_jobs" CONSTRAINT "changeset_jobs_user_id_fkey" FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE DEFERRABLE
TABLE "changeset_specs" CONSTRAINT "changeset_specs_user_id_fkey" FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL DEFERRABLE

View File

@ -0,0 +1,33 @@
BEGIN;
CREATE TABLE IF NOT EXISTS batch_spec_executions (
id BIGSERIAL PRIMARY KEY,
rand_id TEXT NOT NULL,
state TEXT DEFAULT 'queued',
failure_message TEXT,
process_after TIMESTAMP WITH TIME ZONE,
started_at TIMESTAMP WITH TIME ZONE,
finished_at TIMESTAMP WITH TIME ZONE,
last_heartbeat_at TIMESTAMP WITH TIME ZONE,
num_resets INTEGER NOT NULL DEFAULT 0,
num_failures INTEGER NOT NULL DEFAULT 0,
execution_logs JSON[],
worker_hostname TEXT NOT NULL DEFAULT '',
cancel BOOL DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
batch_spec TEXT NOT NULL,
batch_spec_id integer REFERENCES batch_specs(id) DEFERRABLE,
user_id INTEGER REFERENCES users(id),
namespace_org_id INTEGER REFERENCES orgs(id),
namespace_user_id INTEGER REFERENCES users(id)
);
ALTER TABLE IF EXISTS batch_spec_executions ADD CONSTRAINT batch_spec_executions_has_1_namespace CHECK ((namespace_user_id IS NULL) <> (namespace_org_id IS NULL));
CREATE INDEX IF NOT EXISTS batch_spec_executions_rand_id ON batch_spec_executions USING btree (rand_id);
COMMIT;

View File

@ -0,0 +1,5 @@
BEGIN;
DROP TABLE IF EXISTS batch_spec_executions;
COMMIT;

View File

@ -430,7 +430,7 @@ commands:
cmd: |
env TMPDIR="$HOME/.sourcegraph/batches-executor-temp" .bin/executor
env:
EXECUTOR_QUEUE_NAME: batch-spec-workspaces
EXECUTOR_QUEUE_NAME: batches
SRC_PROF_HTTP: ":6093"
# If you want to use this, either start it with `sg run batches-executor-firecracker` or
@ -443,7 +443,7 @@ commands:
.bin/executor
env:
EXECUTOR_USE_FIRECRACKER: true
EXECUTOR_QUEUE_NAME: batch-spec-workspaces
EXECUTOR_QUEUE_NAME: batches
SRC_PROF_HTTP: ":6093"
minio: