Implement batchSpec.State in GraphQL layer (#25894)

This commit is contained in:
Thorsten Ball 2021-10-12 16:03:28 +02:00 committed by GitHub
parent 44e1fa1e86
commit 3f05dde6d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 211 additions and 33 deletions

View File

@ -436,7 +436,7 @@ type BatchSpecResolver interface {
ActAsCampaignSpec() bool
AutoApplyEnabled() bool
State(context.Context) string
State(context.Context) (string, error)
StartedAt(ctx context.Context) (*DateTime, error)
FinishedAt(ctx context.Context) (*DateTime, error)
FailureMessage(ctx context.Context) (*string, error)

View File

@ -2611,10 +2611,6 @@ enum BatchSpecWorkspaceState {
"""
PROCESSING
"""
An error occured while executing. Will be retried eventually.
"""
ERRORED
"""
A fatal error occured while executing. No retries will be made.
"""
FAILED
@ -3188,16 +3184,16 @@ type ChangesetJobError {
The possible states of a batch spec.
"""
enum BatchSpecState {
"""
The spec is not yet enqueued for processing.
"""
PENDING
"""
This spec is being processed.
"""
PROCESSING
"""
This spec errored while processing.
"""
ERRORED
"""
This spec failed to be processed.
"""

View File

@ -53,6 +53,10 @@ type batchSpecResolver struct {
workspaces []*btypes.BatchSpecWorkspace
workspacesErr error
executionJobsOnce sync.Once
executionJobs []*btypes.BatchSpecWorkspaceExecutionJob
executionJobsErr error
validateSpecsOnce sync.Once
validateSpecsErr error
@ -336,17 +340,82 @@ func (r *batchSpecResolver) AutoApplyEnabled() bool {
return false
}
func (r *batchSpecResolver) State(ctx context.Context) string {
func (r *batchSpecResolver) State(ctx context.Context) (string, error) {
if !r.batchSpec.CreatedFromRaw {
return "COMPLETED"
return "COMPLETED", nil
}
validationErr := r.validateChangesetSpecs(ctx)
if validationErr != nil {
return "FAILED"
return "FAILED", nil
}
workspaces, err := r.computeBatchSpecWorkspaces(ctx)
if err != nil {
return "", err
}
if len(workspaces) == 0 {
return "PENDING", nil
}
var ids []int64
for _, ws := range workspaces {
ids = append(ids, ws.ID)
}
jobs, err := r.computeExecutionJobs(ctx, ids)
if err != nil {
return "", err
}
if len(jobs) == 0 {
return "PENDING", nil
}
var (
processing bool
failed bool
canceled bool
canceling bool
allFinished bool
)
for _, j := range jobs {
switch j.State {
case btypes.BatchSpecWorkspaceExecutionJobStateProcessing:
if j.Cancel {
canceling = true
} else {
processing = true
}
allFinished = false
case btypes.BatchSpecWorkspaceExecutionJobStateCompleted:
allFinished = true
case btypes.BatchSpecWorkspaceExecutionJobStateFailed:
if j.Cancel {
canceled = true
} else {
failed = true
}
allFinished = true
}
}
switch {
case canceling:
return "CANCELING", nil
case canceled && allFinished:
return "CANCELED", nil
case processing:
return "PROCESSING", nil
case allFinished && !failed:
return "COMPLETED", nil
case allFinished && failed:
return "FAILED", nil
default:
return "QUEUED", nil
}
// TODO(ssbc): not implemented
return "PROCESSING"
}
func (r *batchSpecResolver) StartedAt(ctx context.Context) (*graphqlbackend.DateTime, error) {
@ -514,3 +583,12 @@ func (r *batchSpecResolver) computeBatchSpecWorkspaces(ctx context.Context) ([]*
})
return r.workspaces, r.workspacesErr
}
func (r *batchSpecResolver) computeExecutionJobs(ctx context.Context, workspaceIDs []int64) ([]*btypes.BatchSpecWorkspaceExecutionJob, error) {
r.executionJobsOnce.Do(func() {
opts := store.ListBatchSpecWorkspaceExecutionJobsOpts{BatchSpecWorkspaceIDs: workspaceIDs}
r.executionJobs, r.executionJobsErr = r.store.ListBatchSpecWorkspaceExecutionJobs(ctx, opts)
})
return r.executionJobs, r.workspacesErr
}

View File

@ -8,6 +8,8 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/graph-gophers/graphql-go"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/internal/batches/resolvers/apitest"
@ -20,6 +22,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/extsvc"
"github.com/sourcegraph/sourcegraph/internal/observation"
batcheslib "github.com/sourcegraph/sourcegraph/lib/batches"
"github.com/sourcegraph/sourcegraph/lib/batches/schema"
"github.com/sourcegraph/sourcegraph/lib/batches/yaml"
)
@ -261,9 +264,11 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) {
admin := ct.CreateTestUser(t, db, true)
adminCtx := actor.WithActor(ctx, actor.FromUser(admin.ID))
cstore := store.New(db, &observation.TestContext, nil)
rs, _ := ct.CreateTestRepos(t, ctx, db, 3)
svc := service.New(cstore)
bstore := store.New(db, &observation.TestContext, nil)
svc := service.New(bstore)
spec, err := svc.CreateBatchSpecFromRaw(adminCtx, service.CreateBatchSpecFromRawOpts{
RawSpec: ct.TestRawBatchSpecYAML,
NamespaceUserID: admin.ID,
@ -272,7 +277,7 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) {
t.Fatal(err)
}
s, err := graphqlbackend.NewSchema(db, &Resolver{store: cstore}, nil, nil, nil, nil, nil, nil, nil)
s, err := graphqlbackend.NewSchema(db, &Resolver{store: bstore}, nil, nil, nil, nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
@ -287,6 +292,12 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) {
adminAPIID := string(graphqlbackend.MarshalUserID(admin.ID))
applyUrl := fmt.Sprintf("/users/%s/batch-changes/apply/%s", admin.Username, apiID)
codeHosts := apitest.BatchChangesCodeHostsConnection{
TotalCount: 1,
Nodes: []apitest.BatchChangesCodeHost{
{ExternalServiceKind: "GITHUB", ExternalServiceURL: "https://github.com/"},
},
}
want := apitest.BatchSpec{
Typename: "BatchSpec",
ID: apiID,
@ -299,6 +310,9 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) {
Creator: &apitest.User{ID: adminAPIID, DatabaseID: admin.ID, SiteAdmin: true},
ViewerCanAdminister: true,
AllCodeHosts: codeHosts,
OnlyWithoutCredential: codeHosts,
CreatedAt: graphqlbackend.DateTime{Time: spec.CreatedAt.Truncate(time.Second)},
ExpiresAt: &graphqlbackend.DateTime{Time: spec.ExpiresAt().Truncate(time.Second)},
@ -306,25 +320,115 @@ func TestBatchSpecResolver_BatchSpecCreatedFromRaw(t *testing.T) {
Nodes: []apitest.ChangesetSpec{},
},
AllCodeHosts: apitest.BatchChangesCodeHostsConnection{
Nodes: []apitest.BatchChangesCodeHost{},
},
OnlyWithoutCredential: apitest.BatchChangesCodeHostsConnection{
Nodes: []apitest.BatchChangesCodeHost{},
},
// TODO(ssbc): not implemented yet
State: "PROCESSING",
State: "PENDING",
}
input := map[string]interface{}{"batchSpec": apiID}
{
var response struct{ Node apitest.BatchSpec }
apitest.MustExec(adminCtx, t, s, input, &response, queryBatchSpecNode)
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
if diff := cmp.Diff(want, response.Node); diff != "" {
t.Fatalf("unexpected response (-want +got):\n%s", diff)
// Now enqueue jobs
var jobs []*btypes.BatchSpecWorkspaceExecutionJob
for _, repo := range rs {
ws := &btypes.BatchSpecWorkspace{BatchSpecID: spec.ID, RepoID: repo.ID, Steps: []batcheslib.Step{}}
if err := bstore.CreateBatchSpecWorkspace(ctx, ws); err != nil {
t.Fatal(err)
}
job := &btypes.BatchSpecWorkspaceExecutionJob{BatchSpecWorkspaceID: ws.ID}
if err := bstore.CreateBatchSpecWorkspaceExecutionJob(ctx, job); err != nil {
t.Fatal(err)
}
jobs = append(jobs, job)
}
want.State = "QUEUED"
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
// 1/3 jobs processing
setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateProcessing)
want.State = "PROCESSING"
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
// 3/3 processing
setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateProcessing)
setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateProcessing)
// Expect same state
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
// 1/3 jobs complete, 2/3 processing
setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateCompleted)
// Expect same state
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
// 3/3 jobs complete
setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateCompleted)
setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateCompleted)
want.State = "COMPLETED"
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
// 1/3 jobs is failed, 2/3 completed
setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateFailed)
want.State = "FAILED"
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
// 1/3 jobs is failed, 2/3 still processing
setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateProcessing)
setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateProcessing)
want.State = "PROCESSING"
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
// 3/3 jobs canceling and processing
setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateProcessing)
setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateProcessing)
setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateProcessing)
setJobCancel(t, ctx, bstore, jobs[0])
setJobCancel(t, ctx, bstore, jobs[1])
setJobCancel(t, ctx, bstore, jobs[2])
want.State = "CANCELING"
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
// 3/3 canceling and failed
setJobState(t, ctx, bstore, jobs[0], btypes.BatchSpecWorkspaceExecutionJobStateFailed)
setJobState(t, ctx, bstore, jobs[1], btypes.BatchSpecWorkspaceExecutionJobStateFailed)
setJobState(t, ctx, bstore, jobs[2], btypes.BatchSpecWorkspaceExecutionJobStateFailed)
want.State = "CANCELED"
queryAndAssertBatchSpec(t, adminCtx, s, apiID, want)
}
func queryAndAssertBatchSpec(t *testing.T, ctx context.Context, s *graphql.Schema, id string, want apitest.BatchSpec) {
t.Helper()
input := map[string]interface{}{"batchSpec": id}
var response struct{ Node apitest.BatchSpec }
apitest.MustExec(ctx, t, s, input, &response, queryBatchSpecNode)
if diff := cmp.Diff(want, response.Node); diff != "" {
t.Fatalf("unexpected batch spec (-want +got):\n%s", diff)
}
}
func setJobState(t *testing.T, ctx context.Context, s *store.Store, job *btypes.BatchSpecWorkspaceExecutionJob, state btypes.BatchSpecWorkspaceExecutionJobState) {
t.Helper()
job.State = state
err := s.Exec(ctx, sqlf.Sprintf("UPDATE batch_spec_workspace_execution_jobs SET state = %s WHERE id = %s", job.State, job.ID))
if err != nil {
t.Fatalf("failed to set job state: %s", err)
}
}
func setJobCancel(t *testing.T, ctx context.Context, s *store.Store, job *btypes.BatchSpecWorkspaceExecutionJob) {
t.Helper()
job.Cancel = true
err := s.Exec(ctx, sqlf.Sprintf("UPDATE batch_spec_workspace_execution_jobs SET cancel = true WHERE id = %s", job.ID))
if err != nil {
t.Fatalf("failed to set job state: %s", err)
}
}