From 3f05dde6d4010205a4d40ce5de20dd5fe8106edb Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Tue, 12 Oct 2021 16:03:28 +0200 Subject: [PATCH] Implement batchSpec.State in GraphQL layer (#25894) --- cmd/frontend/graphqlbackend/batches.go | 2 +- cmd/frontend/graphqlbackend/batches.graphql | 14 +- .../internal/batches/resolvers/batch_spec.go | 88 ++++++++++- .../batches/resolvers/batch_spec_test.go | 140 +++++++++++++++--- 4 files changed, 211 insertions(+), 33 deletions(-) diff --git a/cmd/frontend/graphqlbackend/batches.go b/cmd/frontend/graphqlbackend/batches.go index 27e62136089..edc541158db 100644 --- a/cmd/frontend/graphqlbackend/batches.go +++ b/cmd/frontend/graphqlbackend/batches.go @@ -436,7 +436,7 @@ type BatchSpecResolver interface { ActAsCampaignSpec() bool AutoApplyEnabled() bool - State(context.Context) string + State(context.Context) (string, error) StartedAt(ctx context.Context) (*DateTime, error) FinishedAt(ctx context.Context) (*DateTime, error) FailureMessage(ctx context.Context) (*string, error) diff --git a/cmd/frontend/graphqlbackend/batches.graphql b/cmd/frontend/graphqlbackend/batches.graphql index 2d34f9c8925..45931e74806 100644 --- a/cmd/frontend/graphqlbackend/batches.graphql +++ b/cmd/frontend/graphqlbackend/batches.graphql @@ -2611,10 +2611,6 @@ enum BatchSpecWorkspaceState { """ PROCESSING """ - An error occured while executing. Will be retried eventually. - """ - ERRORED - """ A fatal error occured while executing. No retries will be made. """ FAILED @@ -3188,16 +3184,16 @@ type ChangesetJobError { The possible states of a batch spec. """ enum BatchSpecState { + """ + The spec is not yet enqueued for processing. + """ + PENDING + """ This spec is being processed. """ PROCESSING - """ - This spec errored while processing. - """ - ERRORED - """ This spec failed to be processed. """ diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec.go b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec.go index 24b1a8e3fcc..65c2c54a41c 100644 --- a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec.go +++ b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec.go @@ -53,6 +53,10 @@ type batchSpecResolver struct { workspaces []*btypes.BatchSpecWorkspace workspacesErr error + executionJobsOnce sync.Once + executionJobs []*btypes.BatchSpecWorkspaceExecutionJob + executionJobsErr error + validateSpecsOnce sync.Once validateSpecsErr error @@ -336,17 +340,82 @@ func (r *batchSpecResolver) AutoApplyEnabled() bool { return false } -func (r *batchSpecResolver) State(ctx context.Context) string { +func (r *batchSpecResolver) State(ctx context.Context) (string, error) { if !r.batchSpec.CreatedFromRaw { - return "COMPLETED" + return "COMPLETED", nil } validationErr := r.validateChangesetSpecs(ctx) if validationErr != nil { - return "FAILED" + return "FAILED", nil + } + + workspaces, err := r.computeBatchSpecWorkspaces(ctx) + if err != nil { + return "", err + } + if len(workspaces) == 0 { + return "PENDING", nil + } + + var ids []int64 + for _, ws := range workspaces { + ids = append(ids, ws.ID) + } + + jobs, err := r.computeExecutionJobs(ctx, ids) + if err != nil { + return "", err + } + if len(jobs) == 0 { + return "PENDING", nil + } + + var ( + processing bool + failed bool + canceled bool + + canceling bool + allFinished bool + ) + + for _, j := range jobs { + switch j.State { + case btypes.BatchSpecWorkspaceExecutionJobStateProcessing: + if j.Cancel { + canceling = true + } else { + processing = true + } + + allFinished = false + case btypes.BatchSpecWorkspaceExecutionJobStateCompleted: + allFinished = true + case btypes.BatchSpecWorkspaceExecutionJobStateFailed: + if j.Cancel { + canceled = true + } else { + failed = true + } + allFinished = true + } + } + + switch { + case canceling: + return "CANCELING", nil + case canceled && allFinished: + return "CANCELED", nil + case processing: + return "PROCESSING", nil + case allFinished && !failed: + return "COMPLETED", nil + case allFinished && failed: + return "FAILED", nil + default: + return "QUEUED", nil } - // TODO(ssbc): not implemented - return "PROCESSING" } func (r *batchSpecResolver) StartedAt(ctx context.Context) (*graphqlbackend.DateTime, error) { @@ -514,3 +583,12 @@ func (r *batchSpecResolver) computeBatchSpecWorkspaces(ctx context.Context) ([]* }) return r.workspaces, r.workspacesErr } + +func (r *batchSpecResolver) computeExecutionJobs(ctx context.Context, workspaceIDs []int64) ([]*btypes.BatchSpecWorkspaceExecutionJob, error) { + r.executionJobsOnce.Do(func() { + opts := store.ListBatchSpecWorkspaceExecutionJobsOpts{BatchSpecWorkspaceIDs: workspaceIDs} + + r.executionJobs, r.executionJobsErr = r.store.ListBatchSpecWorkspaceExecutionJobs(ctx, opts) + }) + return r.executionJobs, r.workspacesErr +} diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_test.go b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_test.go index 8a4a1b1be9c..a504840a44f 100644 --- a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_test.go +++ b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_test.go @@ -8,6 +8,8 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/graph-gophers/graphql-go" + "github.com/keegancsmith/sqlf" "github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend" "github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/internal/batches/resolvers/apitest" @@ -20,6 +22,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/database/dbtest" "github.com/sourcegraph/sourcegraph/internal/extsvc" "github.com/sourcegraph/sourcegraph/internal/observation" + batcheslib "github.com/sourcegraph/sourcegraph/lib/batches" "github.com/sourcegraph/sourcegraph/lib/batches/schema" "github.com/sourcegraph/sourcegraph/lib/batches/yaml" ) @@ -261,9 +264,11 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) { admin := ct.CreateTestUser(t, db, true) adminCtx := actor.WithActor(ctx, actor.FromUser(admin.ID)) - cstore := store.New(db, &observation.TestContext, nil) + rs, _ := ct.CreateTestRepos(t, ctx, db, 3) - svc := service.New(cstore) + bstore := store.New(db, &observation.TestContext, nil) + + svc := service.New(bstore) spec, err := svc.CreateBatchSpecFromRaw(adminCtx, service.CreateBatchSpecFromRawOpts{ RawSpec: ct.TestRawBatchSpecYAML, NamespaceUserID: admin.ID, @@ -272,7 +277,7 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) { t.Fatal(err) } - s, err := graphqlbackend.NewSchema(db, &Resolver{store: cstore}, nil, nil, nil, nil, nil, nil, nil) + s, err := graphqlbackend.NewSchema(db, &Resolver{store: bstore}, nil, nil, nil, nil, nil, nil, nil) if err != nil { t.Fatal(err) } @@ -287,6 +292,12 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) { adminAPIID := string(graphqlbackend.MarshalUserID(admin.ID)) applyUrl := fmt.Sprintf("/users/%s/batch-changes/apply/%s", admin.Username, apiID) + codeHosts := apitest.BatchChangesCodeHostsConnection{ + TotalCount: 1, + Nodes: []apitest.BatchChangesCodeHost{ + {ExternalServiceKind: "GITHUB", ExternalServiceURL: "https://github.com/"}, + }, + } want := apitest.BatchSpec{ Typename: "BatchSpec", ID: apiID, @@ -299,6 +310,9 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) { Creator: &apitest.User{ID: adminAPIID, DatabaseID: admin.ID, SiteAdmin: true}, ViewerCanAdminister: true, + AllCodeHosts: codeHosts, + OnlyWithoutCredential: codeHosts, + CreatedAt: graphqlbackend.DateTime{Time: spec.CreatedAt.Truncate(time.Second)}, ExpiresAt: &graphqlbackend.DateTime{Time: spec.ExpiresAt().Truncate(time.Second)}, @@ -306,25 +320,115 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) { Nodes: []apitest.ChangesetSpec{}, }, - AllCodeHosts: apitest.BatchChangesCodeHostsConnection{ - Nodes: []apitest.BatchChangesCodeHost{}, - }, - OnlyWithoutCredential: apitest.BatchChangesCodeHostsConnection{ - Nodes: []apitest.BatchChangesCodeHost{}, - }, - - // TODO(ssbc): not implemented yet - State: "PROCESSING", + State: "PENDING", } - input := map[string]interface{}{"batchSpec": apiID} - { - var response struct{ Node apitest.BatchSpec } - apitest.MustExec(adminCtx, t, s, input, &response, queryBatchSpecNode) + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) - if diff := cmp.Diff(want, response.Node); diff != "" { - t.Fatalf("unexpected response (-want +got):\n%s", diff) + // Now enqueue jobs + var jobs []*btypes.BatchSpecWorkspaceExecutionJob + for _, repo := range rs { + ws := &btypes.BatchSpecWorkspace{BatchSpecID: spec.ID, RepoID: repo.ID, Steps: []batcheslib.Step{}} + if err := bstore.CreateBatchSpecWorkspace(ctx, ws); err != nil { + t.Fatal(err) } + + job := &btypes.BatchSpecWorkspaceExecutionJob{BatchSpecWorkspaceID: ws.ID} + if err := bstore.CreateBatchSpecWorkspaceExecutionJob(ctx, job); err != nil { + t.Fatal(err) + } + jobs = append(jobs, job) + } + + want.State = "QUEUED" + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) + + // 1/3 jobs processing + setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateProcessing) + want.State = "PROCESSING" + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) + + // 3/3 processing + setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateProcessing) + setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateProcessing) + // Expect same state + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) + + // 1/3 jobs complete, 2/3 processing + setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateCompleted) + // Expect same state + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) + + // 3/3 jobs complete + setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateCompleted) + setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateCompleted) + want.State = "COMPLETED" + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) + + // 1/3 jobs is failed, 2/3 completed + setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateFailed) + want.State = "FAILED" + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) + + // 1/3 jobs is failed, 2/3 still processing + setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateProcessing) + setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateProcessing) + want.State = "PROCESSING" + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) + + // 3/3 jobs canceling and processing + setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateProcessing) + setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateProcessing) + setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateProcessing) + setJobCancel(t, ctx, bstore, jobs[0]) + setJobCancel(t, ctx, bstore, jobs[1]) + setJobCancel(t, ctx, bstore, jobs[2]) + + want.State = "CANCELING" + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) + + // 3/3 canceling and failed + setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateFailed) + setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateFailed) + setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateFailed) + + want.State = "CANCELED" + queryAndAssertBatchSpec(t, adminCtx, s, apiID, want) +} + +func queryAndAssertBatchSpec(t *testing.T, ctx context.Context, s *graphql.Schema, id string, want apitest.BatchSpec) { + t.Helper() + + input := map[string]interface{}{"batchSpec": id} + + var response struct{ Node apitest.BatchSpec } + + apitest.MustExec(ctx, t, s, input, &response, queryBatchSpecNode) + + if diff := cmp.Diff(want, response.Node); diff != "" { + t.Fatalf("unexpected batch spec (-want +got):\n%s", diff) + } +} + +func setJobState(t *testing.T, ctx context.Context, s *store.Store, job *btypes.BatchSpecWorkspaceExecutionJob, state btypes.BatchSpecWorkspaceExecutionJobState) { + t.Helper() + + job.State = state + + err := s.Exec(ctx, sqlf.Sprintf("UPDATE batch_spec_workspace_execution_jobs SET state = %s WHERE id = %s", job.State, job.ID)) + if err != nil { + t.Fatalf("failed to set job state: %s", err) + } +} + +func setJobCancel(t *testing.T, ctx context.Context, s *store.Store, job *btypes.BatchSpecWorkspaceExecutionJob) { + t.Helper() + + job.Cancel = true + + err := s.Exec(ctx, sqlf.Sprintf("UPDATE batch_spec_workspace_execution_jobs SET cancel = true WHERE id = %s", job.ID)) + if err != nil { + t.Fatalf("failed to set job state: %s", err) } }