From d5efc9330247814f8f263f169e85d7c998cefa35 Mon Sep 17 00:00:00 2001
From: Erik Seliger
Date: Wed, 29 Sep 2021 14:17:18 +0200
Subject: [PATCH] ssbc: Implement a subset of resolvers (#25182)
---
.../create/NewCreateBatchChangeContent.tsx | 11 +-
.../src/enterprise/batches/create/backend.ts | 17 +-
.../batches/create/examples/ExampleTabs.tsx | 21 +-
.../batches/create/examples/backend.ts | 115 +++++---
.../enterprise/batches/execution/backend.ts | 9 -
cmd/frontend/graphqlbackend/batches.go | 20 +-
cmd/frontend/graphqlbackend/batches.graphql | 22 +-
.../batches/resolvers/batch_change.go | 26 +-
.../internal/batches/resolvers/batch_spec.go | 212 +++++++++++----
.../resolvers/batch_spec_connection.go | 15 +-
.../batches/resolvers/batch_spec_workspace.go | 253 ++++++++++++------
.../batch_spec_workspace_connection.go | 89 ++++++
.../batch_spec_workspace_resolution.go | 81 ++++++
.../resolvers/batch_spec_workspace_step.go | 229 ++++++++++++++++
.../resolvers/changeset_spec_connection.go | 7 +-
.../internal/batches/resolvers/resolver.go | 162 ++++++++++-
.../batches/resolvers/resolver_test.go | 1 +
.../executorqueue/metrics/queue_allocation.go | 2 +-
.../batch_spec_workspace_creator_test.go | 2 +-
.../internal/batches/service/service.go | 148 ++++++++++
.../internal/batches/service/service_test.go | 170 ++++++++++++
.../store/batch_spec_resolution_jobs.go | 7 +-
.../batches/store/batch_spec_workspace.go | 22 +-
.../batch_spec_workspace_execution_jobs.go | 35 ++-
.../store/batch_spec_workspace_test.go | 4 +-
.../internal/batches/store/batch_specs.go | 61 ++++-
.../batches/store/batch_specs_test.go | 4 +-
.../internal/batches/store/changeset_specs.go | 22 ++
enterprise/internal/batches/store/store.go | 18 +-
lib/batches/json_logs.go | 208 +++++++++++++-
sg.config.yaml | 1 +
31 files changed, 1724 insertions(+), 270 deletions(-)
create mode 100644 enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_connection.go
create mode 100644 enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_resolution.go
create mode 100644 enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_step.go
diff --git a/client/web/src/enterprise/batches/create/NewCreateBatchChangeContent.tsx b/client/web/src/enterprise/batches/create/NewCreateBatchChangeContent.tsx
index 6a894970b35..5e79efeebb0 100644
--- a/client/web/src/enterprise/batches/create/NewCreateBatchChangeContent.tsx
+++ b/client/web/src/enterprise/batches/create/NewCreateBatchChangeContent.tsx
@@ -3,6 +3,7 @@ import React, { useCallback, useEffect, useMemo, useState } from 'react'
import { useHistory } from 'react-router'
import { CodeSnippet } from '@sourcegraph/branded/src/components/CodeSnippet'
+import { Scalars } from '@sourcegraph/shared/src/graphql-operations'
import {
SettingsCascadeProps,
SettingsOrgSubject,
@@ -30,16 +31,20 @@ export const NewCreateBatchChangeContent: React.FunctionComponent({ fileName: '', code: '' })
const [isLoading, setIsLoading] = useState(false)
const [selectedNamespace, setSelectedNamespace] = useState('')
+ const [previewID, setPreviewID] = useState()
const submitBatchSpec = useCallback(async () => {
+ if (!previewID) {
+ return
+ }
setIsLoading(true)
try {
- const execution = await createBatchSpec(spec.code)
+ const execution = await createBatchSpec(previewID)
history.push(`${execution.namespace.url}/batch-changes/executions/${execution.id}`)
} catch (error) {
setIsLoading(error)
}
- }, [spec.code, history])
+ }, [previewID, history])
return (
<>
@@ -56,7 +61,7 @@ export const NewCreateBatchChangeContent: React.FunctionComponent{' '}
for more options.
-
+
2. Execute the batch spec
Execute the batch spec to preview your batch change before publishing the results. There are two ways to
diff --git a/client/web/src/enterprise/batches/create/backend.ts b/client/web/src/enterprise/batches/create/backend.ts
index 7a64724d030..813ac43ebc9 100644
--- a/client/web/src/enterprise/batches/create/backend.ts
+++ b/client/web/src/enterprise/batches/create/backend.ts
@@ -1,13 +1,18 @@
import { dataOrThrowErrors, gql } from '@sourcegraph/shared/src/graphql/graphql'
import { requestGraphQL } from '../../../backend/graphql'
-import { BatchSpecCreateFields, CreateBatchSpecResult, CreateBatchSpecVariables } from '../../../graphql-operations'
+import {
+ BatchSpecCreateFields,
+ CreateBatchSpecResult,
+ CreateBatchSpecVariables,
+ Scalars,
+} from '../../../graphql-operations'
-export async function createBatchSpec(spec: string): Promise {
+export async function createBatchSpec(spec: Scalars['ID']): Promise {
const result = await requestGraphQL(
gql`
- mutation CreateBatchSpec($spec: String!) {
- createBatchSpecFromRaw(batchSpec: $spec) {
+ mutation CreateBatchSpec($id: ID!) {
+ executeBatchSpec(batchSpec: $id) {
...BatchSpecCreateFields
}
}
@@ -19,7 +24,7 @@ export async function createBatchSpec(spec: string): Promise void
+ setPreviewID: (id: Scalars['ID']) => void
}
-export const ExampleTabs: React.FunctionComponent = ({ isLightTheme, updateSpec }) => (
+export const ExampleTabs: React.FunctionComponent = ({ isLightTheme, updateSpec, setPreviewID }) => (
@@ -69,6 +70,7 @@ export const ExampleTabs: React.FunctionComponent = ({ isLight
isLightTheme={isLightTheme}
index={index}
updateSpec={updateSpec}
+ setPreviewID={setPreviewID}
/>
))}
@@ -97,6 +99,7 @@ const ExampleTab: React.FunctionComponent<{ index: number }> = ({ children, inde
interface ExampleTabPanelProps extends ThemeProps {
example: Example
updateSpec: (spec: Spec) => void
+ setPreviewID: (id: Scalars['ID']) => void
index: number
}
@@ -105,6 +108,7 @@ const ExampleTabPanel: React.FunctionComponent = ({
isLightTheme,
index,
updateSpec,
+ setPreviewID,
...props
}) => {
const { selectedIndex } = useTabsContext()
@@ -145,6 +149,9 @@ const ExampleTabPanel: React.FunctionComponent = ({
startWith(code),
debounceTime(5000),
switchMap(code => createBatchSpecFromRaw(code)),
+ switchMap(spec =>
+ fetchBatchSpec(spec.id).pipe(repeatWhen(completed => completed.pipe(delay(5000))))
+ ),
catchError(error => [asError(error)])
),
// Don't want to trigger on changes to code, it's just the initial value.
@@ -153,6 +160,12 @@ const ExampleTabPanel: React.FunctionComponent = ({
)
)
+ useEffect(() => {
+ if (preview && !isErrorLike(preview)) {
+ setPreviewID(preview.id)
+ }
+ }, [preview, setPreviewID])
+
// Update the spec in parent state whenever the code changes
useEffect(() => {
if (isSelected) {
diff --git a/client/web/src/enterprise/batches/create/examples/backend.ts b/client/web/src/enterprise/batches/create/examples/backend.ts
index 0c2223d37a5..06c90ee1124 100644
--- a/client/web/src/enterprise/batches/create/examples/backend.ts
+++ b/client/web/src/enterprise/batches/create/examples/backend.ts
@@ -5,11 +5,55 @@ import { dataOrThrowErrors, gql } from '@sourcegraph/shared/src/graphql/graphql'
import { requestGraphQL } from '../../../../backend/graphql'
import {
+ BatchSpecByID2Result,
+ BatchSpecByID2Variables,
BatchSpecWorkspacesFields,
CreateBatchSpecFromRawResult,
CreateBatchSpecFromRawVariables,
+ Scalars,
} from '../../../../graphql-operations'
+const fragment = gql`
+ fragment BatchSpecWorkspacesFields on BatchSpec {
+ id
+ originalInput
+ workspaceResolution {
+ workspaces {
+ nodes {
+ ...BatchSpecWorkspaceFields
+ }
+ }
+ state
+ allowIgnored
+ allowUnsupported
+ }
+ }
+
+ fragment BatchSpecWorkspaceFields on BatchSpecWorkspace {
+ repository {
+ id
+ name
+ url
+ }
+ ignored
+ branch {
+ id
+ abbrevName
+ displayName
+ target {
+ oid
+ }
+ }
+ path
+ onlyFetchWorkspace
+ steps {
+ run
+ container
+ }
+ searchResultPaths
+ }
+`
+
export function createBatchSpecFromRaw(spec: string): Observable {
return requestGraphQL(
gql`
@@ -19,49 +63,7 @@ export function createBatchSpecFromRaw(spec: string): Observable result.createBatchSpecFromRaw)
)
}
+
+export function fetchBatchSpec(id: Scalars['ID']): Observable {
+ return requestGraphQL(
+ gql`
+ query BatchSpecByID2($id: ID!) {
+ node(id: $id) {
+ __typename
+ ...BatchSpecWorkspacesFields
+ }
+ }
+
+ ${fragment}
+ `,
+ { id }
+ ).pipe(
+ map(dataOrThrowErrors),
+ map(data => {
+ if (!data.node) {
+ throw new Error('Not found')
+ }
+ if (data.node.__typename !== 'BatchSpec') {
+ throw new Error(`Node is a ${data.node.__typename}, not a BatchSpec`)
+ }
+ return data.node
+ })
+ )
+}
diff --git a/client/web/src/enterprise/batches/execution/backend.ts b/client/web/src/enterprise/batches/execution/backend.ts
index d11e1a3e209..f1fc204e4a2 100644
--- a/client/web/src/enterprise/batches/execution/backend.ts
+++ b/client/web/src/enterprise/batches/execution/backend.ts
@@ -34,15 +34,6 @@ const batchSpecExecutionFieldsFragment = gql`
namespaceName
}
}
-
- fragment BatchSpecExecutionLogEntryFields on ExecutionLogEntry {
- key
- command
- startTime
- exitCode
- durationMilliseconds
- out
- }
`
export const fetchBatchSpecExecution = (id: Scalars['ID']): Observable =>
diff --git a/cmd/frontend/graphqlbackend/batches.go b/cmd/frontend/graphqlbackend/batches.go
index 5a4dc54557b..7fb868a1828 100644
--- a/cmd/frontend/graphqlbackend/batches.go
+++ b/cmd/frontend/graphqlbackend/batches.go
@@ -165,6 +165,7 @@ type CreateBatchSpecFromRawArgs struct {
AllowUnsupported bool
Execute bool
NoCache bool
+ Namespace *graphql.ID
}
type ReplaceBatchSpecInputArgs struct {
@@ -181,7 +182,6 @@ type DeleteBatchSpecArgs struct {
}
type ExecuteBatchSpecArgs struct {
- Namespace graphql.ID
BatchSpec graphql.ID
NoCache bool
AutoApply bool
@@ -437,9 +437,9 @@ type BatchSpecResolver interface {
AutoApplyEnabled() bool
State() string
- StartedAt() *DateTime
- FinishedAt() *DateTime
- FailureMessage() *string
+ StartedAt(ctx context.Context) (*DateTime, error)
+ FinishedAt(ctx context.Context) (*DateTime, error)
+ FailureMessage(ctx context.Context) (*string, error)
WorkspaceResolution(ctx context.Context) (BatchSpecWorkspaceResolutionResolver, error)
ImportingChangesets(ctx context.Context, args *ListImportingChangesetsArgs) (ChangesetSpecConnectionResolver, error)
}
@@ -854,7 +854,7 @@ type BatchSpecWorkspaceResolutionResolver interface {
AllowIgnored() bool
AllowUnsupported() bool
- Workspaces(ctx context.Context, args *ListWorkspacesArgs) BatchSpecWorkspaceConnectionResolver
+ Workspaces(ctx context.Context, args *ListWorkspacesArgs) (BatchSpecWorkspaceConnectionResolver, error)
Unsupported(ctx context.Context) RepositoryConnectionResolver
RecentlyCompleted(ctx context.Context, args *ListRecentlyCompletedWorkspacesArgs) BatchSpecWorkspaceConnectionResolver
@@ -885,20 +885,20 @@ type BatchSpecWorkspaceResolver interface {
FailureMessage() *string
CachedResultFound() bool
- Stages() (BatchSpecWorkspaceStagesResolver, error)
+ Stages() BatchSpecWorkspaceStagesResolver
Repository(ctx context.Context) (*RepositoryResolver, error)
BatchSpec(ctx context.Context) (BatchSpecResolver, error)
Branch(ctx context.Context) (*GitRefResolver, error)
Path() string
- Steps() []BatchSpecWorkspaceStepResolver
+ Steps(ctx context.Context) ([]BatchSpecWorkspaceStepResolver, error)
SearchResultPaths() []string
OnlyFetchWorkspace() bool
Ignored() bool
- ChangesetSpecs() *[]ChangesetSpecResolver
+ ChangesetSpecs(ctx context.Context) (*[]ChangesetSpecResolver, error)
PlaceInQueue() *int32
}
@@ -919,10 +919,10 @@ type BatchSpecWorkspaceStepResolver interface {
FinishedAt() *DateTime
ExitCode() *int32
- Environment() []BatchSpecWorkspaceEnvironmentVariableResolver
+ Environment() ([]BatchSpecWorkspaceEnvironmentVariableResolver, error)
OutputVariables() *[]BatchSpecWorkspaceOutputVariableResolver
- DiffStat() *DiffStat
+ DiffStat(ctx context.Context) (*DiffStat, error)
Diff(ctx context.Context) (PreviewRepositoryComparisonResolver, error)
}
diff --git a/cmd/frontend/graphqlbackend/batches.graphql b/cmd/frontend/graphqlbackend/batches.graphql
index fa2e915c60b..2d34f9c8925 100644
--- a/cmd/frontend/graphqlbackend/batches.graphql
+++ b/cmd/frontend/graphqlbackend/batches.graphql
@@ -2075,6 +2075,14 @@ extend type Mutation {
TODO: Not implemented yet.
"""
noCache: Boolean = false
+
+ """
+ The namespace (either a user or organization). A batch spec can only be applied to (or
+ used to create) batch changes in this namespace.
+
+ Defaults to the user who created the batch spec.
+ """
+ namespace: ID
): BatchSpec!
"""
@@ -2131,8 +2139,9 @@ extend type Mutation {
deleteBatchSpec(batchSpec: ID!): EmptyResponse!
"""
- Enqueue the workspaces that resulted from evaluation in `createBatchSpecFromRaw`to be executed. These will eventually be moved into running state.
- resolution is done, to support fast edits.
+ Enqueue the workspaces that resulted from evaluation in
+ `createBatchSpecFromRaw`to be executed. These will eventually be moved into
+ running state. resolution is done, to support fast edits.
Once the workspace resolution is done, workspace jobs are move to state QUEUED.
If resolving is already done by the time this mutation is called, they are
enqueued immediately.
@@ -2145,7 +2154,9 @@ extend type Mutation {
TODO: This might be blocking with an error for now.
"""
executeBatchSpec(
- namespace: ID!
+ """
+ The ID of the batch spec.
+ """
batchSpec: ID!
"""
Don't use cache entries.
@@ -2670,9 +2681,10 @@ type BatchSpecWorkspace implements Node {
cachedResultFound: Boolean!
"""
- Executor stages of running in this workspace.
+ Executor stages of running in this workspace. Null, if the execution hasn't
+ started yet.
"""
- stages: BatchSpecWorkspaceStages!
+ stages: BatchSpecWorkspaceStages
"""
List of steps that will need to run over this workspace.
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/batch_change.go b/enterprise/cmd/frontend/internal/batches/resolvers/batch_change.go
index 2e43a75157b..6250c3ccd99 100644
--- a/enterprise/cmd/frontend/internal/batches/resolvers/batch_change.go
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/batch_change.go
@@ -11,6 +11,7 @@ import (
"github.com/graph-gophers/graphql-go"
"github.com/graph-gophers/graphql-go/relay"
+ "github.com/sourcegraph/sourcegraph/cmd/frontend/backend"
"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/state"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
@@ -291,6 +292,27 @@ func (r *batchChangeResolver) BatchSpecs(
ctx context.Context,
args *graphqlbackend.ListBatchSpecArgs,
) (graphqlbackend.BatchSpecConnectionResolver, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented yet")
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
+
+ if err := validateFirstParamDefaults(args.First); err != nil {
+ return nil, err
+ }
+ opts := store.ListBatchSpecsOpts{
+ BatchChangeID: r.batchChange.ID,
+ LimitOpts: store.LimitOpts{
+ Limit: int(args.First),
+ },
+ }
+ if args.After != nil {
+ id, err := strconv.Atoi(*args.After)
+ if err != nil {
+ return nil, err
+ }
+ opts.Cursor = int64(id)
+ }
+
+ return &batchSpecConnectionResolver{store: r.store, opts: opts}, nil
}
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec.go b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec.go
index 6bd2512b8fa..7e2b6e28d88 100644
--- a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec.go
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec.go
@@ -18,6 +18,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/errcode"
+ "github.com/sourcegraph/sourcegraph/lib/batches"
)
const batchSpecIDKind = "BatchSpec"
@@ -44,6 +45,14 @@ type batchSpecResolver struct {
namespace *graphqlbackend.NamespaceResolver
namespaceErr error
+ resolutionOnce sync.Once
+ resolution *btypes.BatchSpecResolutionJob
+ resolutionErr error
+
+ workspacesOnce sync.Once
+ workspaces []*btypes.BatchSpecWorkspace
+ workspacesErr error
+
// TODO(campaigns-deprecation): This should be removed once we remove campaigns completely
shouldActAsCampaignSpec bool
}
@@ -67,7 +76,9 @@ func (r *batchSpecResolver) ParsedInput() (graphqlbackend.JSONValue, error) {
}
func (r *batchSpecResolver) ChangesetSpecs(ctx context.Context, args *graphqlbackend.ChangesetSpecsConnectionArgs) (graphqlbackend.ChangesetSpecConnectionResolver, error) {
- opts := store.ListChangesetSpecsOpts{}
+ opts := store.ListChangesetSpecsOpts{
+ BatchSpecID: r.batchSpec.ID,
+ }
if err := validateFirstParamDefaults(args.First); err != nil {
return nil, err
}
@@ -81,9 +92,8 @@ func (r *batchSpecResolver) ChangesetSpecs(ctx context.Context, args *graphqlbac
}
return &changesetSpecConnectionResolver{
- store: r.store,
- opts: opts,
- batchSpecID: r.batchSpec.ID,
+ store: r.store,
+ opts: opts,
}, nil
}
@@ -154,35 +164,6 @@ func (r *batchSpecResolver) Namespace(ctx context.Context) (*graphqlbackend.Name
return r.computeNamespace(ctx)
}
-func (r *batchSpecResolver) computeNamespace(ctx context.Context) (*graphqlbackend.NamespaceResolver, error) {
- r.namespaceOnce.Do(func() {
- if r.preloadedNamespace != nil {
- r.namespace = r.preloadedNamespace
- return
- }
- var (
- err error
- n = &graphqlbackend.NamespaceResolver{}
- )
-
- if r.batchSpec.NamespaceUserID != 0 {
- n.Namespace, err = graphqlbackend.UserByIDInt32(ctx, r.store.DB(), r.batchSpec.NamespaceUserID)
- } else {
- n.Namespace, err = graphqlbackend.OrgByIDInt32(ctx, r.store.DB(), r.batchSpec.NamespaceOrgID)
- }
-
- if errcode.IsNotFound(err) {
- r.namespace = nil
- r.namespaceErr = errors.New("namespace of batch spec has been deleted")
- return
- }
-
- r.namespace = n
- r.namespaceErr = err
- })
- return r.namespace, r.namespaceErr
-}
-
func (r *batchSpecResolver) ApplyURL(ctx context.Context) (*string, error) {
// TODO(ssbc): not implemented
@@ -220,8 +201,8 @@ func (r *batchChangeDescriptionResolver) Description() string {
func (r *batchSpecResolver) DiffStat(ctx context.Context) (*graphqlbackend.DiffStat, error) {
specsConnection := &changesetSpecConnectionResolver{
- store: r.store,
- batchSpecID: r.batchSpec.ID,
+ store: r.store,
+ opts: store.ListChangesetSpecsOpts{BatchSpecID: r.batchSpec.ID},
}
specs, err := specsConnection.Nodes(ctx)
@@ -282,8 +263,13 @@ func (r *batchSpecResolver) SupersedingBatchSpec(ctx context.Context) (graphqlba
return nil, err
}
+ a := actor.FromContext(ctx)
+ if !a.IsAuthenticated() {
+ return nil, errors.New("user is not authenticated")
+ }
+
svc := service.New(r.store)
- newest, err := svc.GetNewestBatchSpec(ctx, r.store, r.batchSpec, actor.FromContext(ctx).UID)
+ newest, err := svc.GetNewestBatchSpec(ctx, r.store, r.batchSpec, a.UID)
if err != nil {
return nil, err
}
@@ -349,30 +335,156 @@ func (r *batchSpecResolver) AutoApplyEnabled() bool {
func (r *batchSpecResolver) State() string {
// TODO(ssbc): not implemented
- return "not implemented"
+ return "PROCESSING"
}
-func (r *batchSpecResolver) StartedAt() *graphqlbackend.DateTime {
- // TODO(ssbc): not implemented
- return nil
+func (r *batchSpecResolver) StartedAt(ctx context.Context) (*graphqlbackend.DateTime, error) {
+ resolution, err := r.computeResolutionJob(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if resolution == nil {
+ return nil, nil
+ }
+ workspaces, err := r.computeBatchSpecWorkspaces(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if len(workspaces) == 0 {
+ return nil, nil
+ }
+ // TODO: Look at earliest started_at time among all workspaces.
+ return nil, nil
}
-func (r *batchSpecResolver) FinishedAt() *graphqlbackend.DateTime {
- // TODO(ssbc): not implemented
- return nil
+func (r *batchSpecResolver) FinishedAt(ctx context.Context) (*graphqlbackend.DateTime, error) {
+ resolution, err := r.computeResolutionJob(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if resolution == nil {
+ return nil, nil
+ }
+ workspaces, err := r.computeBatchSpecWorkspaces(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if len(workspaces) == 0 {
+ return nil, nil
+ }
+ // TODO: Look at latest finished_at time among all workspaces, and ensure all are in a final state.
+ return nil, nil
}
-func (r *batchSpecResolver) FailureMessage() *string {
- // TODO(ssbc): not implemented
- return nil
+func (r *batchSpecResolver) FailureMessage(ctx context.Context) (*string, error) {
+ resolution, err := r.computeResolutionJob(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if resolution != nil {
+ return resolution.FailureMessage, nil
+ }
+ // TODO: look at execution jobs.
+ return nil, nil
}
func (r *batchSpecResolver) ImportingChangesets(ctx context.Context, args *graphqlbackend.ListImportingChangesetsArgs) (graphqlbackend.ChangesetSpecConnectionResolver, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented")
+ workspaces, err := r.computeBatchSpecWorkspaces(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ uniqueCSIDs := make(map[int64]struct{})
+ for _, w := range workspaces {
+ for _, id := range w.ChangesetSpecIDs {
+ if _, ok := uniqueCSIDs[id]; !ok {
+ uniqueCSIDs[id] = struct{}{}
+ }
+ }
+ }
+ specIDs := make([]int64, 0, len(uniqueCSIDs))
+ for id := range uniqueCSIDs {
+ specIDs = append(specIDs, id)
+ }
+
+ opts := store.ListChangesetSpecsOpts{
+ IDs: specIDs,
+ BatchSpecID: r.batchSpec.ID,
+ Type: batches.ChangesetSpecDescriptionTypeExisting,
+ }
+ if err := validateFirstParamDefaults(args.First); err != nil {
+ return nil, err
+ }
+ opts.Limit = int(args.First)
+ if args.After != nil {
+ id, err := strconv.Atoi(*args.After)
+ if err != nil {
+ return nil, err
+ }
+ opts.Cursor = int64(id)
+ }
+
+ return &changesetSpecConnectionResolver{store: r.store, opts: opts}, nil
}
func (r *batchSpecResolver) WorkspaceResolution(ctx context.Context) (graphqlbackend.BatchSpecWorkspaceResolutionResolver, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented")
+ resolution, err := r.computeResolutionJob(ctx)
+ if err != nil {
+ return nil, err
+ }
+ // TODO: switch to full error, once we can distinguish server side batch specs.
+ if resolution == nil {
+ return nil, nil
+ }
+ return &batchSpecWorkspaceResolutionResolver{store: r.store, resolution: resolution}, nil
+}
+
+func (r *batchSpecResolver) computeNamespace(ctx context.Context) (*graphqlbackend.NamespaceResolver, error) {
+ r.namespaceOnce.Do(func() {
+ if r.preloadedNamespace != nil {
+ r.namespace = r.preloadedNamespace
+ return
+ }
+ var (
+ err error
+ n = &graphqlbackend.NamespaceResolver{}
+ )
+
+ if r.batchSpec.NamespaceUserID != 0 {
+ n.Namespace, err = graphqlbackend.UserByIDInt32(ctx, r.store.DB(), r.batchSpec.NamespaceUserID)
+ } else {
+ n.Namespace, err = graphqlbackend.OrgByIDInt32(ctx, r.store.DB(), r.batchSpec.NamespaceOrgID)
+ }
+
+ if errcode.IsNotFound(err) {
+ r.namespace = nil
+ r.namespaceErr = errors.New("namespace of batch spec has been deleted")
+ return
+ }
+
+ r.namespace = n
+ r.namespaceErr = err
+ })
+ return r.namespace, r.namespaceErr
+}
+
+func (r *batchSpecResolver) computeResolutionJob(ctx context.Context) (*btypes.BatchSpecResolutionJob, error) {
+ r.resolutionOnce.Do(func() {
+ var err error
+ r.resolution, err = r.store.GetBatchSpecResolutionJob(ctx, store.GetBatchSpecResolutionJobOpts{BatchSpecID: r.batchSpec.ID})
+ if err != nil {
+ if err == store.ErrNoResults {
+ return
+ }
+ r.resolutionErr = err
+ }
+ })
+ return r.resolution, r.resolutionErr
+}
+
+func (r *batchSpecResolver) computeBatchSpecWorkspaces(ctx context.Context) ([]*btypes.BatchSpecWorkspace, error) {
+ r.workspacesOnce.Do(func() {
+ r.workspaces, _, r.workspacesErr = r.store.ListBatchSpecWorkspaces(ctx, store.ListBatchSpecWorkspacesOpts{BatchSpecID: r.batchSpec.ID})
+ })
+ return r.workspaces, r.workspacesErr
}
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_connection.go b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_connection.go
index a20fabf83ec..e3af50ec413 100644
--- a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_connection.go
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_connection.go
@@ -11,21 +11,20 @@ import (
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
)
-var _ graphqlbackend.BatchSpecConnectionResolver = &batchSpecConnectionResolver{}
-
type batchSpecConnectionResolver struct {
store *store.Store
opts store.ListBatchSpecsOpts
- // cache results because they are used by multiple fields
+ // Cache results because they are used by multiple fields.
once sync.Once
batchSpecs []*btypes.BatchSpec
next int64
err error
}
+var _ graphqlbackend.BatchSpecConnectionResolver = &batchSpecConnectionResolver{}
+
func (r *batchSpecConnectionResolver) Nodes(ctx context.Context) ([]graphqlbackend.BatchSpecResolver, error) {
- // TODO(ssbc): not implemented
nodes, _, err := r.compute(ctx)
if err != nil {
return nil, err
@@ -38,14 +37,13 @@ func (r *batchSpecConnectionResolver) Nodes(ctx context.Context) ([]graphqlbacke
}
func (r *batchSpecConnectionResolver) TotalCount(ctx context.Context) (int32, error) {
- // TODO(ssbc): not implemented
- //
- count, err := r.store.CountBatchSpecs(ctx)
+ count, err := r.store.CountBatchSpecs(ctx, store.CountBatchSpecsOpts{
+ BatchChangeID: r.opts.BatchChangeID,
+ })
return int32(count), err
}
func (r *batchSpecConnectionResolver) PageInfo(ctx context.Context) (*graphqlutil.PageInfo, error) {
- // TODO(ssbc): not implemented
_, next, err := r.compute(ctx)
if err != nil {
return nil, err
@@ -57,7 +55,6 @@ func (r *batchSpecConnectionResolver) PageInfo(ctx context.Context) (*graphqluti
}
func (r *batchSpecConnectionResolver) compute(ctx context.Context) ([]*btypes.BatchSpec, int64, error) {
- // TODO(ssbc): not implemented
r.once.Do(func() {
r.batchSpecs, r.next, r.err = r.store.ListBatchSpecs(ctx, r.opts)
})
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace.go b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace.go
index fc110e6a678..ec2b6de2b50 100644
--- a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace.go
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace.go
@@ -2,59 +2,122 @@ package resolvers
import (
"context"
+ "encoding/json"
+ "strings"
+ "sync"
- "github.com/cockroachdb/errors"
"github.com/graph-gophers/graphql-go"
+ "github.com/graph-gophers/graphql-go/relay"
"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
- "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/service"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
+ btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
+ "github.com/sourcegraph/sourcegraph/internal/types"
+ "github.com/sourcegraph/sourcegraph/internal/workerutil"
batcheslib "github.com/sourcegraph/sourcegraph/lib/batches"
)
+const batchSpecWorkspaceIDKind = "BatchSpecWorkspace"
+
+func marshalBatchSpecWorkspaceID(id int64) graphql.ID {
+ return relay.MarshalID(batchSpecWorkspaceIDKind, id)
+}
+
+func unmarshalBatchSpecWorkspaceID(id graphql.ID) (batchSpecWorkspaceID int64, err error) {
+ err = relay.UnmarshalSpec(id, &batchSpecWorkspaceID)
+ return
+}
+
type batchSpecWorkspaceResolver struct {
- store *store.Store
- node *service.RepoWorkspace
+ store *store.Store
+ workspace *btypes.BatchSpecWorkspace
+ execution *btypes.BatchSpecWorkspaceExecutionJob
+
+ repoOnce sync.Once
+ repo *graphqlbackend.RepositoryResolver
+ repoErr error
}
var _ graphqlbackend.BatchSpecWorkspaceResolver = &batchSpecWorkspaceResolver{}
func (r *batchSpecWorkspaceResolver) ID() graphql.ID {
- // TODO(ssbc): not implemented
- return graphql.ID("not implemented")
+ return marshalBatchSpecWorkspaceID(r.workspace.ID)
}
+
+func (r *batchSpecWorkspaceResolver) computeRepo(ctx context.Context) (*graphqlbackend.RepositoryResolver, error) {
+ r.repoOnce.Do(func() {
+ var repo *types.Repo
+ repo, r.repoErr = r.store.Repos().Get(ctx, r.workspace.RepoID)
+ r.repo = graphqlbackend.NewRepositoryResolver(r.store.DB(), repo)
+ })
+ return r.repo, r.repoErr
+}
+
func (r *batchSpecWorkspaceResolver) Repository(ctx context.Context) (*graphqlbackend.RepositoryResolver, error) {
- return graphqlbackend.NewRepositoryResolver(r.store.DB(), r.node.Repo), nil
+ return r.computeRepo(ctx)
}
func (r *batchSpecWorkspaceResolver) Branch(ctx context.Context) (*graphqlbackend.GitRefResolver, error) {
- repo, _ := r.Repository(ctx)
- return graphqlbackend.NewGitRefResolver(repo, r.node.Branch, graphqlbackend.GitObjectID(r.node.Commit)), nil
+ repo, err := r.computeRepo(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return graphqlbackend.NewGitRefResolver(repo, r.workspace.Branch, graphqlbackend.GitObjectID(r.workspace.Commit)), nil
}
func (r *batchSpecWorkspaceResolver) Path() string {
- return r.node.Path
+ return r.workspace.Path
}
func (r *batchSpecWorkspaceResolver) OnlyFetchWorkspace() bool {
- return r.node.OnlyFetchWorkspace
+ return r.workspace.OnlyFetchWorkspace
}
func (r *batchSpecWorkspaceResolver) SearchResultPaths() []string {
- return r.node.FileMatches
+ return r.workspace.FileMatches
}
-func (r *batchSpecWorkspaceResolver) Steps() []graphqlbackend.BatchSpecWorkspaceStepResolver {
- resolvers := make([]graphqlbackend.BatchSpecWorkspaceStepResolver, 0, len(r.node.Steps))
- for _, step := range r.node.Steps {
- resolvers = append(resolvers, &batchSpecWorkspaceStepResolver{step})
+func (r *batchSpecWorkspaceResolver) Steps(ctx context.Context) ([]graphqlbackend.BatchSpecWorkspaceStepResolver, error) {
+ logLines := []batcheslib.LogEvent{}
+ if r.execution != nil {
+ entry, ok := findExecutionLogEntry(r.execution, "step.src.0")
+ if ok {
+ for _, line := range strings.Split(entry.Out, "\n") {
+ if !strings.HasPrefix(line, "stdout: ") {
+ continue
+ }
+ line = line[len("stdout: "):]
+ var parsed batcheslib.LogEvent
+ err := json.Unmarshal([]byte(line), &parsed)
+ if err != nil {
+ return nil, err
+ }
+ logLines = append(logLines, parsed)
+ }
+ }
}
- return resolvers
+
+ repo, err := r.computeRepo(ctx)
+ if err != nil {
+ return nil, err
+ }
+ resolvers := make([]graphqlbackend.BatchSpecWorkspaceStepResolver, 0, len(r.workspace.Steps))
+ for idx, step := range r.workspace.Steps {
+ resolvers = append(resolvers, &batchSpecWorkspaceStepResolver{index: idx, step: step, logLines: logLines, store: r.store, repo: repo, baseRev: r.workspace.Commit})
+ }
+
+ return resolvers, nil
}
-func (r *batchSpecWorkspaceResolver) BatchSpec(context.Context) (graphqlbackend.BatchSpecResolver, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented")
+func (r *batchSpecWorkspaceResolver) BatchSpec(ctx context.Context) (graphqlbackend.BatchSpecResolver, error) {
+ if r.workspace.BatchSpecID == 0 {
+ return nil, nil
+ }
+ batchSpec, err := r.store.GetBatchSpec(ctx, store.GetBatchSpecOpts{ID: r.workspace.BatchSpecID})
+ if err != nil {
+ return nil, err
+ }
+ return &batchSpecResolver{store: r.store, batchSpec: batchSpec}, nil
}
func (r *batchSpecWorkspaceResolver) Ignored() bool {
@@ -67,102 +130,114 @@ func (r *batchSpecWorkspaceResolver) CachedResultFound() bool {
return false
}
-func (r *batchSpecWorkspaceResolver) Stages() (graphqlbackend.BatchSpecWorkspaceStagesResolver, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented yet")
+func (r *batchSpecWorkspaceResolver) Stages() graphqlbackend.BatchSpecWorkspaceStagesResolver {
+ if r.execution == nil {
+ return nil
+ }
+ return &batchSpecWorkspaceStagesResolver{store: r.store, execution: r.execution}
}
func (r *batchSpecWorkspaceResolver) StartedAt() *graphqlbackend.DateTime {
- // TODO(ssbc): not implemented
- return nil
+ if r.execution == nil {
+ return nil
+ }
+ if r.execution.StartedAt.IsZero() {
+ return nil
+ }
+ return &graphqlbackend.DateTime{Time: r.execution.StartedAt}
}
func (r *batchSpecWorkspaceResolver) FinishedAt() *graphqlbackend.DateTime {
- // TODO(ssbc): not implemented
- return nil
+ if r.execution == nil {
+ return nil
+ }
+ if r.execution.FinishedAt.IsZero() {
+ return nil
+ }
+ return &graphqlbackend.DateTime{Time: r.execution.FinishedAt}
}
func (r *batchSpecWorkspaceResolver) FailureMessage() *string {
- // TODO(ssbc): not implemented
- return nil
+ if r.execution == nil {
+ return nil
+ }
+ return r.execution.FailureMessage
}
func (r *batchSpecWorkspaceResolver) State() string {
- // TODO(ssbc): not implemented
- return "FAILED"
+ if r.execution == nil {
+ return "QUEUED"
+ }
+ return r.execution.State.ToGraphQL()
}
-func (r *batchSpecWorkspaceResolver) ChangesetSpecs() *[]graphqlbackend.ChangesetSpecResolver {
- // TODO(ssbc): not implemented
- return nil
+func (r *batchSpecWorkspaceResolver) ChangesetSpecs(ctx context.Context) (*[]graphqlbackend.ChangesetSpecResolver, error) {
+ if len(r.workspace.ChangesetSpecIDs) == 0 {
+ none := []graphqlbackend.ChangesetSpecResolver{}
+ return &none, nil
+ }
+ specs, _, err := r.store.ListChangesetSpecs(ctx, store.ListChangesetSpecsOpts{IDs: r.workspace.ChangesetSpecIDs})
+ if err != nil {
+ return nil, err
+ }
+ repos, err := r.store.Repos().GetReposSetByIDs(ctx, specs.RepoIDs()...)
+ if err != nil {
+ return nil, err
+ }
+ resolvers := make([]graphqlbackend.ChangesetSpecResolver, 0, len(specs))
+ for _, spec := range specs {
+ resolvers = append(resolvers, NewChangesetSpecResolverWithRepo(r.store, repos[spec.RepoID], spec))
+ }
+ return &resolvers, nil
}
func (r *batchSpecWorkspaceResolver) PlaceInQueue() *int32 {
- // TODO(ssbc): not implemented
- var p int32 = 9999
- return &p
-}
-
-// -----------------------------------------------
-
-type batchSpecWorkspaceStepResolver struct {
- step batcheslib.Step
-}
-
-func (r *batchSpecWorkspaceStepResolver) Run() string {
- return r.step.Run
-}
-
-func (r *batchSpecWorkspaceStepResolver) Container() string {
- return r.step.Container
-}
-
-func (r *batchSpecWorkspaceStepResolver) CachedResultFound() bool {
- // TODO(ssbc): not implemented
- return false
-}
-
-func (r *batchSpecWorkspaceStepResolver) Skipped() bool {
- // TODO(ssbc): not implemented
- return false
-}
-
-func (r *batchSpecWorkspaceStepResolver) OutputLines(ctx context.Context, args *graphqlbackend.BatchSpecWorkspaceStepOutputLinesArgs) (*[]string, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented yet")
-}
-
-func (r *batchSpecWorkspaceStepResolver) StartedAt() *graphqlbackend.DateTime {
// TODO(ssbc): not implemented
return nil
}
-func (r *batchSpecWorkspaceStepResolver) FinishedAt() *graphqlbackend.DateTime {
- // TODO(ssbc): not implemented
+type batchSpecWorkspaceStagesResolver struct {
+ store *store.Store
+ execution *btypes.BatchSpecWorkspaceExecutionJob
+}
+
+var _ graphqlbackend.BatchSpecWorkspaceStagesResolver = &batchSpecWorkspaceStagesResolver{}
+
+func (r *batchSpecWorkspaceStagesResolver) Setup() []graphqlbackend.ExecutionLogEntryResolver {
+ return r.executionLogEntryResolversWithPrefix("setup.")
+}
+
+func (r *batchSpecWorkspaceStagesResolver) SrcExec() graphqlbackend.ExecutionLogEntryResolver {
+ if entry, ok := findExecutionLogEntry(r.execution, "step.src.0"); ok {
+ return graphqlbackend.NewExecutionLogEntryResolver(r.store.DB(), entry)
+ }
+
return nil
}
-func (r *batchSpecWorkspaceStepResolver) ExitCode() *int32 {
- // TODO(ssbc): not implemented
- return nil
+func (r *batchSpecWorkspaceStagesResolver) Teardown() []graphqlbackend.ExecutionLogEntryResolver {
+ return r.executionLogEntryResolversWithPrefix("teardown.")
}
-func (r *batchSpecWorkspaceStepResolver) Environment() []graphqlbackend.BatchSpecWorkspaceEnvironmentVariableResolver {
- // TODO(ssbc): not implemented
- return nil
+func (r *batchSpecWorkspaceStagesResolver) executionLogEntryResolversWithPrefix(prefix string) []graphqlbackend.ExecutionLogEntryResolver {
+ var resolvers []graphqlbackend.ExecutionLogEntryResolver
+ for _, entry := range r.execution.ExecutionLogs {
+ if !strings.HasPrefix(entry.Key, prefix) {
+ continue
+ }
+ r := graphqlbackend.NewExecutionLogEntryResolver(r.store.DB(), entry)
+ resolvers = append(resolvers, r)
+ }
+
+ return resolvers
}
-func (r *batchSpecWorkspaceStepResolver) OutputVariables() *[]graphqlbackend.BatchSpecWorkspaceOutputVariableResolver {
- // TODO(ssbc): not implemented
- return nil
-}
+func findExecutionLogEntry(execution *btypes.BatchSpecWorkspaceExecutionJob, key string) (workerutil.ExecutionLogEntry, bool) {
+ for _, entry := range execution.ExecutionLogs {
+ if entry.Key == key {
+ return entry, true
+ }
+ }
-func (r *batchSpecWorkspaceStepResolver) DiffStat() *graphqlbackend.DiffStat {
- // TODO(ssbc): not implemented
- return nil
-}
-
-func (r *batchSpecWorkspaceStepResolver) Diff(ctx context.Context) (graphqlbackend.PreviewRepositoryComparisonResolver, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented yet")
+ return workerutil.ExecutionLogEntry{}, false
}
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_connection.go b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_connection.go
new file mode 100644
index 00000000000..b0baae9e9db
--- /dev/null
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_connection.go
@@ -0,0 +1,89 @@
+package resolvers
+
+import (
+ "context"
+ "strconv"
+ "sync"
+
+ "github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
+ "github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend/graphqlutil"
+ "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
+ btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
+)
+
+type batchSpecWorkspaceConnectionResolver struct {
+ store *store.Store
+ opts store.ListBatchSpecWorkspacesOpts
+
+ // Cache results because they are used by multiple fields.
+ once sync.Once
+ workspaces []*btypes.BatchSpecWorkspace
+ next int64
+ err error
+}
+
+var _ graphqlbackend.BatchSpecWorkspaceConnectionResolver = &batchSpecWorkspaceConnectionResolver{}
+
+func (r *batchSpecWorkspaceConnectionResolver) Nodes(ctx context.Context) ([]graphqlbackend.BatchSpecWorkspaceResolver, error) {
+ nodes, _, err := r.compute(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(nodes) == 0 {
+ return []graphqlbackend.BatchSpecWorkspaceResolver{}, nil
+ }
+
+ nodeIDs := make([]int64, 0, len(nodes))
+ for _, n := range nodes {
+ nodeIDs = append(nodeIDs, n.ID)
+ }
+ executions, err := r.store.ListBatchSpecWorkspaceExecutionJobs(ctx, store.ListBatchSpecWorkspaceExecutionJobsOpts{BatchSpecWorkspaceIDs: nodeIDs})
+ if err != nil {
+ return nil, err
+ }
+ executionsByWorkspaceID := make(map[int64]*btypes.BatchSpecWorkspaceExecutionJob)
+ for _, e := range executions {
+ executionsByWorkspaceID[e.BatchSpecWorkspaceID] = e
+ }
+ resolvers := make([]graphqlbackend.BatchSpecWorkspaceResolver, 0, len(nodes))
+ for _, w := range nodes {
+ res := &batchSpecWorkspaceResolver{
+ store: r.store,
+ workspace: w,
+ }
+ if ex, ok := executionsByWorkspaceID[w.ID]; ok {
+ res.execution = ex
+ }
+ resolvers = append(resolvers, res)
+ }
+ return resolvers, nil
+}
+
+func (r *batchSpecWorkspaceConnectionResolver) TotalCount(ctx context.Context) (int32, error) {
+ // TODO(ssbc): not implemented
+ return 0, nil
+}
+
+func (r *batchSpecWorkspaceConnectionResolver) PageInfo(ctx context.Context) (*graphqlutil.PageInfo, error) {
+ _, next, err := r.compute(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if next != 0 {
+ return graphqlutil.NextPageCursor(strconv.Itoa(int(next))), nil
+ }
+ return graphqlutil.HasNextPage(false), nil
+}
+
+func (r *batchSpecWorkspaceConnectionResolver) compute(ctx context.Context) ([]*btypes.BatchSpecWorkspace, int64, error) {
+ r.once.Do(func() {
+ r.workspaces, r.next, r.err = r.store.ListBatchSpecWorkspaces(ctx, r.opts)
+ })
+ return r.workspaces, r.next, r.err
+}
+
+func (r *batchSpecWorkspaceConnectionResolver) Stats(ctx context.Context) graphqlbackend.BatchSpecWorkspacesStatsResolver {
+ // TODO(ssbc): not implemented
+ return nil
+}
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_resolution.go b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_resolution.go
new file mode 100644
index 00000000000..8468c052e14
--- /dev/null
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_resolution.go
@@ -0,0 +1,81 @@
+package resolvers
+
+import (
+ "context"
+ "strconv"
+
+ "github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
+ "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
+ btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
+)
+
+type batchSpecWorkspaceResolutionResolver struct {
+ store *store.Store
+ resolution *btypes.BatchSpecResolutionJob
+}
+
+var _ graphqlbackend.BatchSpecWorkspaceResolutionResolver = &batchSpecWorkspaceResolutionResolver{}
+
+func (r *batchSpecWorkspaceResolutionResolver) State() string {
+ return r.resolution.State.ToGraphQL()
+}
+
+func (r *batchSpecWorkspaceResolutionResolver) StartedAt() *graphqlbackend.DateTime {
+ if r.resolution.StartedAt.IsZero() {
+ return nil
+ }
+ return &graphqlbackend.DateTime{Time: r.resolution.StartedAt}
+}
+
+func (r *batchSpecWorkspaceResolutionResolver) FinishedAt() *graphqlbackend.DateTime {
+ if r.resolution.FinishedAt.IsZero() {
+ return nil
+ }
+ return &graphqlbackend.DateTime{Time: r.resolution.FinishedAt}
+}
+
+func (r *batchSpecWorkspaceResolutionResolver) FailureMessage() *string {
+ return r.resolution.FailureMessage
+}
+
+func (r *batchSpecWorkspaceResolutionResolver) AllowIgnored() bool {
+ return r.resolution.AllowIgnored
+}
+
+func (r *batchSpecWorkspaceResolutionResolver) AllowUnsupported() bool {
+ return r.resolution.AllowUnsupported
+}
+
+func (r *batchSpecWorkspaceResolutionResolver) Workspaces(ctx context.Context, args *graphqlbackend.ListWorkspacesArgs) (graphqlbackend.BatchSpecWorkspaceConnectionResolver, error) {
+ opts := store.ListBatchSpecWorkspacesOpts{
+ BatchSpecID: r.resolution.BatchSpecID,
+ }
+ if err := validateFirstParamDefaults(args.First); err != nil {
+ return nil, err
+ }
+ opts.Limit = int(args.First)
+ if args.After != nil {
+ id, err := strconv.Atoi(*args.After)
+ if err != nil {
+ return nil, err
+ }
+ opts.Cursor = int64(id)
+ }
+
+ return &batchSpecWorkspaceConnectionResolver{store: r.store, opts: opts}, nil
+}
+
+func (r *batchSpecWorkspaceResolutionResolver) Unsupported(ctx context.Context) graphqlbackend.RepositoryConnectionResolver {
+ // TODO(ssbc): not implemented
+ return nil
+}
+
+func (r *batchSpecWorkspaceResolutionResolver) RecentlyCompleted(ctx context.Context, args *graphqlbackend.ListRecentlyCompletedWorkspacesArgs) graphqlbackend.BatchSpecWorkspaceConnectionResolver {
+ // TODO(ssbc): not implemented
+ return nil
+}
+
+func (r *batchSpecWorkspaceResolutionResolver) RecentlyErrored(ctx context.Context, args *graphqlbackend.ListRecentlyErroredWorkspacesArgs) graphqlbackend.BatchSpecWorkspaceConnectionResolver {
+ // TODO(ssbc): not implemented
+ return nil
+}
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_step.go b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_step.go
new file mode 100644
index 00000000000..cc96e117cc7
--- /dev/null
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/batch_spec_workspace_step.go
@@ -0,0 +1,229 @@
+package resolvers
+
+import (
+ "context"
+
+ "github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
+ "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
+ batcheslib "github.com/sourcegraph/sourcegraph/lib/batches"
+)
+
+type batchSpecWorkspaceStepResolver struct {
+ store *store.Store
+ repo *graphqlbackend.RepositoryResolver
+ baseRev string
+ index int
+ step batcheslib.Step
+ logLines []batcheslib.LogEvent
+}
+
+func (r *batchSpecWorkspaceStepResolver) Run() string {
+ return r.step.Run
+}
+
+func (r *batchSpecWorkspaceStepResolver) Container() string {
+ return r.step.Container
+}
+
+func (r *batchSpecWorkspaceStepResolver) CachedResultFound() bool {
+ // TODO(ssbc): not implemented
+ return false
+}
+
+func (r *batchSpecWorkspaceStepResolver) Skipped() bool {
+ for _, l := range r.logLines {
+ if m, ok := l.Metadata.(*batcheslib.TaskSkippingStepsMetadata); ok {
+ if m.StartStep-1 > r.index {
+ return true
+ }
+ }
+ if m, ok := l.Metadata.(*batcheslib.TaskStepSkippedMetadata); ok {
+ if m.Step-1 == r.index {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+func (r *batchSpecWorkspaceStepResolver) OutputLines(ctx context.Context, args *graphqlbackend.BatchSpecWorkspaceStepOutputLinesArgs) (*[]string, error) {
+ lines := []string{}
+ for _, l := range r.logLines {
+ if l.Status != batcheslib.LogEventStatusProgress {
+ continue
+ }
+ if m, ok := l.Metadata.(*batcheslib.TaskStepMetadata); ok {
+ if m.Step-1 != r.index {
+ continue
+ }
+ if m.Out == "" {
+ continue
+ }
+ lines = append(lines, m.Out)
+ }
+ }
+ if args.After != nil {
+ lines = lines[*args.After:]
+ }
+ if int(args.First) < len(lines) {
+ lines = lines[:args.First]
+ }
+ // TODO: Should sometimes return nil.
+ return &lines, nil
+}
+
+func (r *batchSpecWorkspaceStepResolver) StartedAt() *graphqlbackend.DateTime {
+ for _, l := range r.logLines {
+ if l.Status != batcheslib.LogEventStatusStarted {
+ continue
+ }
+ if m, ok := l.Metadata.(*batcheslib.TaskPreparingStepMetadata); ok {
+ if m.Step-1 == r.index {
+ return &graphqlbackend.DateTime{Time: l.Timestamp}
+ }
+ }
+ }
+ return nil
+}
+
+func (r *batchSpecWorkspaceStepResolver) FinishedAt() *graphqlbackend.DateTime {
+ for _, l := range r.logLines {
+ if l.Status != batcheslib.LogEventStatusSuccess && l.Status != batcheslib.LogEventStatusFailure {
+ continue
+ }
+ if m, ok := l.Metadata.(*batcheslib.TaskStepMetadata); ok {
+ if m.Step-1 == r.index {
+ return &graphqlbackend.DateTime{Time: l.Timestamp}
+ }
+ }
+ }
+ return nil
+}
+
+func (r *batchSpecWorkspaceStepResolver) ExitCode() *int32 {
+ for _, l := range r.logLines {
+ if l.Status != batcheslib.LogEventStatusSuccess && l.Status != batcheslib.LogEventStatusFailure {
+ continue
+ }
+ if m, ok := l.Metadata.(*batcheslib.TaskStepMetadata); ok {
+ if m.Step-1 == r.index {
+ code := int32(m.ExitCode)
+ return &code
+ }
+ }
+ }
+ return nil
+}
+
+func (r *batchSpecWorkspaceStepResolver) Environment() ([]graphqlbackend.BatchSpecWorkspaceEnvironmentVariableResolver, error) {
+ // The environment is dependent on environment of the executor and template variables, that aren't
+ // known at the time when we resolve the workspace. If the step already started, src cli has logged
+ // the final env. Otherwise, we fall back to the preliminary set of env vars as determined by the
+ // resolve workspaces step.
+ found := false
+ var env map[string]string
+ for _, l := range r.logLines {
+ if l.Status != batcheslib.LogEventStatusStarted {
+ continue
+ }
+ if m, ok := l.Metadata.(*batcheslib.TaskStepMetadata); ok {
+ if m.Step-1 == r.index {
+ if m.Env != nil {
+ found = true
+ env = m.Env
+ }
+ }
+ }
+ }
+
+ if !found {
+ var err error
+ env, err = r.step.Env.Resolve([]string{})
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ resolvers := make([]graphqlbackend.BatchSpecWorkspaceEnvironmentVariableResolver, 0, len(env))
+ for k, v := range env {
+ resolvers = append(resolvers, &batchSpecWorkspaceEnvironmentVariableResolver{key: k, value: v})
+ }
+ return resolvers, nil
+}
+
+func (r *batchSpecWorkspaceStepResolver) OutputVariables() *[]graphqlbackend.BatchSpecWorkspaceOutputVariableResolver {
+ for _, l := range r.logLines {
+ if l.Status != batcheslib.LogEventStatusSuccess {
+ continue
+ }
+ if m, ok := l.Metadata.(*batcheslib.TaskStepMetadata); ok {
+ if m.Step-1 == r.index {
+ resolvers := make([]graphqlbackend.BatchSpecWorkspaceOutputVariableResolver, 0, len(m.Outputs))
+ for k, v := range m.Outputs {
+ resolvers = append(resolvers, &batchSpecWorkspaceOutputVariableResolver{key: k, value: v})
+ }
+ return &resolvers
+ }
+ }
+ }
+ return nil
+}
+
+func (r *batchSpecWorkspaceStepResolver) DiffStat(ctx context.Context) (*graphqlbackend.DiffStat, error) {
+ diffRes, err := r.Diff(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if diffRes != nil {
+ fd, err := diffRes.FileDiffs(ctx, &graphqlbackend.FileDiffsConnectionArgs{})
+ if err != nil {
+ return nil, err
+ }
+ return fd.DiffStat(ctx)
+ }
+ return nil, nil
+}
+
+func (r *batchSpecWorkspaceStepResolver) Diff(ctx context.Context) (graphqlbackend.PreviewRepositoryComparisonResolver, error) {
+ for _, l := range r.logLines {
+ if l.Status != batcheslib.LogEventStatusSuccess {
+ continue
+ }
+ if m, ok := l.Metadata.(*batcheslib.TaskStepMetadata); ok {
+ if m.Step-1 == r.index {
+ return graphqlbackend.NewPreviewRepositoryComparisonResolver(ctx, r.store.DB(), r.repo, r.baseRev, m.Diff)
+ }
+ }
+ }
+
+ return nil, nil
+}
+
+type batchSpecWorkspaceEnvironmentVariableResolver struct {
+ key string
+ value string
+}
+
+var _ graphqlbackend.BatchSpecWorkspaceEnvironmentVariableResolver = &batchSpecWorkspaceEnvironmentVariableResolver{}
+
+func (r *batchSpecWorkspaceEnvironmentVariableResolver) Name() string {
+ return r.key
+}
+func (r *batchSpecWorkspaceEnvironmentVariableResolver) Value() string {
+ return r.value
+}
+
+type batchSpecWorkspaceOutputVariableResolver struct {
+ key string
+ value interface{}
+}
+
+var _ graphqlbackend.BatchSpecWorkspaceOutputVariableResolver = &batchSpecWorkspaceOutputVariableResolver{}
+
+func (r *batchSpecWorkspaceOutputVariableResolver) Name() string {
+ return r.key
+}
+func (r *batchSpecWorkspaceOutputVariableResolver) Value() graphqlbackend.JSONValue {
+ return graphqlbackend.JSONValue{Value: r.value}
+}
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/changeset_spec_connection.go b/enterprise/cmd/frontend/internal/batches/resolvers/changeset_spec_connection.go
index 3e69b810858..ddf58e056f3 100644
--- a/enterprise/cmd/frontend/internal/batches/resolvers/changeset_spec_connection.go
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/changeset_spec_connection.go
@@ -18,8 +18,7 @@ var _ graphqlbackend.ChangesetSpecConnectionResolver = &changesetSpecConnectionR
type changesetSpecConnectionResolver struct {
store *store.Store
- opts store.ListChangesetSpecsOpts
- batchSpecID int64
+ opts store.ListChangesetSpecsOpts
// Cache results because they are used by multiple fields
once sync.Once
@@ -31,7 +30,8 @@ type changesetSpecConnectionResolver struct {
func (r *changesetSpecConnectionResolver) TotalCount(ctx context.Context) (int32, error) {
count, err := r.store.CountChangesetSpecs(ctx, store.CountChangesetSpecsOpts{
- BatchSpecID: r.batchSpecID,
+ BatchSpecID: r.opts.BatchSpecID,
+ Type: r.opts.Type,
})
if err != nil {
return 0, err
@@ -77,7 +77,6 @@ func (r *changesetSpecConnectionResolver) Nodes(ctx context.Context) ([]graphqlb
func (r *changesetSpecConnectionResolver) compute(ctx context.Context) (btypes.ChangesetSpecs, map[api.RepoID]*types.Repo, int64, error) {
r.once.Do(func() {
opts := r.opts
- opts.BatchSpecID = r.batchSpecID
r.changesetSpecs, r.next, r.err = r.store.ListChangesetSpecs(ctx, opts)
if r.err != nil {
return
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/resolver.go b/enterprise/cmd/frontend/internal/batches/resolvers/resolver.go
index 3ef5a151034..0cf41bc7acd 100644
--- a/enterprise/cmd/frontend/internal/batches/resolvers/resolver.go
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/resolver.go
@@ -132,6 +132,9 @@ func (r *Resolver) NodeResolvers() map[string]graphqlbackend.NodeByIDFunc {
bulkOperationIDKind: func(ctx context.Context, id graphql.ID) (graphqlbackend.Node, error) {
return r.bulkOperationByID(ctx, id)
},
+ batchSpecWorkspaceIDKind: func(ctx context.Context, id graphql.ID) (graphqlbackend.Node, error) {
+ return r.batchSpecWorkspaceByID(ctx, id)
+ },
}
}
@@ -349,6 +352,36 @@ func (r *Resolver) bulkOperationByIDString(ctx context.Context, id string) (grap
return &bulkOperationResolver{store: r.store, bulkOperation: bulkOperation}, nil
}
+func (r *Resolver) batchSpecWorkspaceByID(ctx context.Context, gqlID graphql.ID) (graphqlbackend.BatchSpecWorkspaceResolver, error) {
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
+
+ id, err := unmarshalBatchSpecWorkspaceID(gqlID)
+ if err != nil {
+ return nil, err
+ }
+
+ if id == 0 {
+ return nil, ErrIDIsZero{}
+ }
+
+ w, err := r.store.GetBatchSpecWorkspace(ctx, store.GetBatchSpecWorkspaceOpts{ID: id})
+ if err != nil {
+ if err == store.ErrNoResults {
+ return nil, nil
+ }
+ return nil, err
+ }
+ ex, err := r.store.GetBatchSpecWorkspaceExecutionJob(ctx, store.GetBatchSpecWorkspaceExecutionJobOpts{BatchSpecWorkspaceID: w.ID})
+ if err != nil && err != store.ErrNoResults {
+ return nil, err
+ }
+
+ return &batchSpecWorkspaceResolver{store: r.store, workspace: w, execution: ex}, nil
+}
+
func (r *Resolver) CreateBatchChange(ctx context.Context, args *graphqlbackend.CreateBatchChangeArgs) (graphqlbackend.BatchChangeResolver, error) {
var err error
tr, _ := trace.New(ctx, "Resolver.CreateBatchChange", fmt.Sprintf("BatchSpec %s", args.BatchSpec))
@@ -1396,58 +1429,171 @@ func (r *Resolver) PublishChangesets(ctx context.Context, args *graphqlbackend.P
}
func (r *Resolver) BatchSpecs(ctx context.Context, args *graphqlbackend.ListBatchSpecArgs) (_ graphqlbackend.BatchSpecConnectionResolver, err error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented yet")
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
+
+ if err := validateFirstParamDefaults(args.First); err != nil {
+ return nil, err
+ }
+ opts := store.ListBatchSpecsOpts{
+ LimitOpts: store.LimitOpts{
+ Limit: int(args.First),
+ },
+ }
+ if args.After != nil {
+ id, err := strconv.Atoi(*args.After)
+ if err != nil {
+ return nil, err
+ }
+ opts.Cursor = int64(id)
+ }
+
+ return &batchSpecConnectionResolver{store: r.store, opts: opts}, nil
}
func (r *Resolver) CreateBatchSpecFromRaw(ctx context.Context, args *graphqlbackend.CreateBatchSpecFromRawArgs) (graphqlbackend.BatchSpecResolver, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented yet")
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
+
+ svc := service.New(r.store)
+ batchSpec, err := svc.CreateBatchSpec(ctx, service.CreateBatchSpecOpts{
+ NamespaceUserID: actor.FromContext(ctx).UID,
+ RawSpec: args.BatchSpec,
+ })
+ if err != nil {
+ return nil, err
+ }
+ err = svc.EnqueueBatchSpecResolution(ctx, service.EnqueueBatchSpecResolutionOpts{
+ BatchSpecID: batchSpec.ID,
+ AllowIgnored: args.AllowIgnored,
+ AllowUnsupported: args.AllowUnsupported,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return r.batchSpecByID(ctx, marshalBatchSpecRandID(batchSpec.RandID))
}
func (r *Resolver) DeleteBatchSpec(ctx context.Context, args *graphqlbackend.DeleteBatchSpecArgs) (*graphqlbackend.EmptyResponse, error) {
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
// TODO(ssbc): not implemented
return nil, errors.New("not implemented yet")
}
func (r *Resolver) ExecuteBatchSpec(ctx context.Context, args *graphqlbackend.ExecuteBatchSpecArgs) (graphqlbackend.BatchSpecResolver, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented yet")
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
+
+ batchSpecRandID, err := unmarshalBatchSpecID(args.BatchSpec)
+ if err != nil {
+ return nil, err
+ }
+
+ if batchSpecRandID == "" {
+ return nil, ErrIDIsZero{}
+ }
+
+ svc := service.New(r.store)
+ batchSpec, err := svc.ExecuteBatchSpec(ctx, service.ExecuteBatchSpecOpts{
+ BatchSpecRandID: batchSpecRandID,
+ // TODO: args not yet implemented: NoCache, AutoApply
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return &batchSpecResolver{store: r.store, batchSpec: batchSpec}, nil
}
func (r *Resolver) CancelBatchSpecExecution(ctx context.Context, args *graphqlbackend.CancelBatchSpecExecutionArgs) (graphqlbackend.BatchSpecResolver, error) {
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
// TODO(ssbc): not implemented
return nil, errors.New("not implemented yet")
}
func (r *Resolver) CancelBatchSpecWorkspaceExecution(ctx context.Context, args *graphqlbackend.CancelBatchSpecWorkspaceExecutionArgs) (*graphqlbackend.EmptyResponse, error) {
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
// TODO(ssbc): not implemented
return nil, errors.New("not implemented yet")
}
func (r *Resolver) RetryBatchSpecWorkspaceExecution(ctx context.Context, args *graphqlbackend.RetryBatchSpecWorkspaceExecutionArgs) (*graphqlbackend.EmptyResponse, error) {
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
// TODO(ssbc): not implemented
return nil, errors.New("not implemented yet")
}
func (r *Resolver) RetryBatchSpecExecution(ctx context.Context, args *graphqlbackend.RetryBatchSpecExecutionArgs) (*graphqlbackend.EmptyResponse, error) {
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
// TODO(ssbc): not implemented
return nil, errors.New("not implemented yet")
}
func (r *Resolver) EnqueueBatchSpecWorkspaceExecution(ctx context.Context, args *graphqlbackend.EnqueueBatchSpecWorkspaceExecutionArgs) (*graphqlbackend.EmptyResponse, error) {
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
// TODO(ssbc): not implemented
return nil, errors.New("not implemented yet")
}
func (r *Resolver) ToggleBatchSpecAutoApply(ctx context.Context, args *graphqlbackend.ToggleBatchSpecAutoApplyArgs) (graphqlbackend.BatchSpecResolver, error) {
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
// TODO(ssbc): not implemented
return nil, errors.New("not implemented yet")
}
func (r *Resolver) ReplaceBatchSpecInput(ctx context.Context, args *graphqlbackend.ReplaceBatchSpecInputArgs) (graphqlbackend.BatchSpecResolver, error) {
- // TODO(ssbc): not implemented
- return nil, errors.New("not implemented yet")
+ // TODO(ssbc): currently admin only.
+ if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.store.DB()); err != nil {
+ return nil, err
+ }
+
+ batchSpecRandID, err := unmarshalBatchSpecID(args.PreviousSpec)
+ if err != nil {
+ return nil, err
+ }
+
+ if batchSpecRandID == "" {
+ return nil, ErrIDIsZero{}
+ }
+
+ svc := service.New(r.store)
+ batchSpec, err := svc.ReplaceBatchSpecInput(ctx, service.ReplaceBatchSpecInputOpts{
+ BatchSpecRandID: batchSpecRandID,
+ RawSpec: args.BatchSpec,
+ AllowIgnored: args.AllowIgnored,
+ AllowUnsupported: args.AllowUnsupported,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return &batchSpecResolver{store: r.store, batchSpec: batchSpec}, nil
}
func parseBatchChangeState(s *string) (btypes.BatchChangeState, error) {
diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/resolver_test.go b/enterprise/cmd/frontend/internal/batches/resolvers/resolver_test.go
index d1a691b023c..3bd26e2aa06 100644
--- a/enterprise/cmd/frontend/internal/batches/resolvers/resolver_test.go
+++ b/enterprise/cmd/frontend/internal/batches/resolvers/resolver_test.go
@@ -91,6 +91,7 @@ func TestNullIDResilience(t *testing.T) {
fmt.Sprintf(`mutation { closeChangesets(batchChange: %q, changesets: [%q]) { id } }`, marshalBatchChangeID(1), marshalChangesetID(0)),
fmt.Sprintf(`mutation { publishChangesets(batchChange: %q, changesets: []) { id } }`, marshalBatchChangeID(0)),
fmt.Sprintf(`mutation { publishChangesets(batchChange: %q, changesets: [%q]) { id } }`, marshalBatchChangeID(1), marshalChangesetID(0)),
+ fmt.Sprintf(`mutation { executeBatchSpec(batchSpec: %q) { id } }`, marshalBatchSpecRandID("")),
}
for _, m := range mutations {
diff --git a/enterprise/cmd/frontend/internal/executorqueue/metrics/queue_allocation.go b/enterprise/cmd/frontend/internal/executorqueue/metrics/queue_allocation.go
index fedd1626b30..437746e0098 100644
--- a/enterprise/cmd/frontend/internal/executorqueue/metrics/queue_allocation.go
+++ b/enterprise/cmd/frontend/internal/executorqueue/metrics/queue_allocation.go
@@ -11,7 +11,7 @@ type QueueAllocation struct {
}
var (
- validQueueNames = []string{"batches", "codeintel", "batch-spec-workspaces"}
+ validQueueNames = []string{"batches", "codeintel"}
validCloudProviderNames = []string{"aws", "gcp"}
)
diff --git a/enterprise/internal/batches/background/batch_spec_workspace_creator_test.go b/enterprise/internal/batches/background/batch_spec_workspace_creator_test.go
index 76e2e9ec85b..46c480af874 100644
--- a/enterprise/internal/batches/background/batch_spec_workspace_creator_test.go
+++ b/enterprise/internal/batches/background/batch_spec_workspace_creator_test.go
@@ -76,7 +76,7 @@ func TestBatchSpecWorkspaceCreatorProcess(t *testing.T) {
t.Fatalf("proces failed: %s", err)
}
- have, err := s.ListBatchSpecWorkspaces(context.Background(), store.ListBatchSpecWorkspacesOpts{BatchSpecID: batchSpec.ID})
+ have, _, err := s.ListBatchSpecWorkspaces(context.Background(), store.ListBatchSpecWorkspacesOpts{BatchSpecID: batchSpec.ID})
if err != nil {
t.Fatalf("listing workspaces failed: %s", err)
}
diff --git a/enterprise/internal/batches/service/service.go b/enterprise/internal/batches/service/service.go
index 06385beb6b2..135f084adac 100644
--- a/enterprise/internal/batches/service/service.go
+++ b/enterprise/internal/batches/service/service.go
@@ -154,12 +154,160 @@ func (s *Service) EnqueueBatchSpecResolution(ctx context.Context, opts EnqueueBa
}()
return s.store.CreateBatchSpecResolutionJob(ctx, &btypes.BatchSpecResolutionJob{
+ State: btypes.BatchSpecResolutionJobStateQueued,
BatchSpecID: opts.BatchSpecID,
AllowIgnored: opts.AllowIgnored,
AllowUnsupported: opts.AllowUnsupported,
})
}
+type ErrBatchSpecResolutionErrored struct {
+ failureMessage *string
+}
+
+func (e ErrBatchSpecResolutionErrored) Error() string {
+ if e.failureMessage != nil && *e.failureMessage != "" {
+ return fmt.Sprintf("cannot execute batch spec, workspace resolution failed: %s", *e.failureMessage)
+ }
+ return "cannot execute batch spec, workspace resolution failed"
+}
+
+var ErrBatchSpecResolutionIncomplete = errors.New("cannot execute batch spec, workspaces still being resolved")
+
+type ExecuteBatchSpecOpts struct {
+ BatchSpecRandID string
+}
+
+// ExecuteBatchSpec creates BatchSpecWorkspaceExecutionJobs for every created
+// BatchSpecWorkspace.
+//
+// It returns an error if the batchSpecWorkspaceResolutionJob didn't finish
+// successfully.
+func (s *Service) ExecuteBatchSpec(ctx context.Context, opts ExecuteBatchSpecOpts) (batchSpec *btypes.BatchSpec, err error) {
+ actor := actor.FromContext(ctx)
+ tr, ctx := trace.New(ctx, "Service.ExecuteBatchSpec", fmt.Sprintf("Actor %d", actor.UID))
+ defer func() {
+ tr.SetError(err)
+ tr.Finish()
+ }()
+
+ batchSpec, err = s.store.GetBatchSpec(ctx, store.GetBatchSpecOpts{RandID: opts.BatchSpecRandID})
+ if err != nil {
+ return nil, err
+ }
+
+ // Check whether the current user has access to either one of the namespaces.
+ err = s.CheckNamespaceAccess(ctx, batchSpec.NamespaceUserID, batchSpec.NamespaceOrgID)
+ if err != nil {
+ return nil, err
+ }
+
+ // TODO: In the future we want to block here until the resolution is done
+ // and only then check whether it failed or not.
+ //
+ // TODO: We also want to check that whether there was already an
+ // execution.
+ tx, err := s.store.Transact(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer func() { err = tx.Done(err) }()
+
+ resolutionJob, err := tx.GetBatchSpecResolutionJob(ctx, store.GetBatchSpecResolutionJobOpts{BatchSpecID: batchSpec.ID})
+ if err != nil {
+ return nil, err
+ }
+
+ switch resolutionJob.State {
+ case btypes.BatchSpecResolutionJobStateErrored, btypes.BatchSpecResolutionJobStateFailed:
+ return nil, ErrBatchSpecResolutionErrored{resolutionJob.FailureMessage}
+
+ case btypes.BatchSpecResolutionJobStateCompleted:
+ return batchSpec, tx.CreateBatchSpecWorkspaceExecutionJobs(ctx, batchSpec.ID)
+
+ default:
+ return nil, ErrBatchSpecResolutionIncomplete
+ }
+}
+
+type ReplaceBatchSpecInputOpts struct {
+ BatchSpecRandID string
+ RawSpec string
+ AllowIgnored bool
+ AllowUnsupported bool
+}
+
+// ReplaceBatchSpecInput creates BatchSpecWorkspaceExecutionJobs for every created
+// BatchSpecWorkspace.
+//
+// It returns an error if the batchSpecWorkspaceResolutionJob didn't finish
+// successfully.
+func (s *Service) ReplaceBatchSpecInput(ctx context.Context, opts ReplaceBatchSpecInputOpts) (batchSpec *btypes.BatchSpec, err error) {
+ actor := actor.FromContext(ctx)
+ tr, ctx := trace.New(ctx, "Service.ReplaceBatchSpecInput", fmt.Sprintf("Actor %d", actor.UID))
+ defer func() {
+ tr.SetError(err)
+ tr.Finish()
+ }()
+
+ // Before we hit the database, validate the new spec.
+ newSpec, err := btypes.NewBatchSpecFromRaw(opts.RawSpec)
+ if err != nil {
+ return nil, err
+ }
+
+ // Make sure the user has access.
+ batchSpec, err = s.store.GetBatchSpec(ctx, store.GetBatchSpecOpts{RandID: opts.BatchSpecRandID})
+ if err != nil {
+ return nil, err
+ }
+
+ // Check whether the current user has access to either one of the namespaces.
+ err = s.CheckNamespaceAccess(ctx, batchSpec.NamespaceUserID, batchSpec.NamespaceOrgID)
+ if err != nil {
+ return nil, err
+ }
+
+ // Start transaction.
+ tx, err := s.store.Transact(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer func() { err = tx.Done(err) }()
+
+ // Delete the previous batch spec, which should delete
+ // - batch_spec_resolution_jobs
+ // - batch_spec_workspaces
+ // associated with it
+ if err := tx.DeleteBatchSpec(ctx, batchSpec.ID); err != nil {
+ return nil, err
+ }
+
+ // We keep the RandID so the user-visible GraphQL ID is stable
+ newSpec.RandID = batchSpec.RandID
+
+ newSpec.NamespaceOrgID = batchSpec.NamespaceOrgID
+ newSpec.NamespaceUserID = batchSpec.NamespaceUserID
+ newSpec.UserID = batchSpec.UserID
+
+ if err := tx.CreateBatchSpec(ctx, newSpec); err != nil {
+ return nil, err
+ }
+
+ // Create a new resolution job now in the transaction so that we switch the
+ // resolution jobs essentially.
+ err = tx.CreateBatchSpecResolutionJob(ctx, &btypes.BatchSpecResolutionJob{
+ BatchSpecID: newSpec.ID,
+ AllowIgnored: opts.AllowIgnored,
+ AllowUnsupported: opts.AllowUnsupported,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return newSpec, nil
+}
+
// CreateChangesetSpec validates the given raw spec input and creates the ChangesetSpec.
func (s *Service) CreateChangesetSpec(ctx context.Context, rawSpec string, userID int32) (spec *btypes.ChangesetSpec, err error) {
tr, ctx := trace.New(ctx, "Service.CreateChangesetSpec", fmt.Sprintf("User %d", userID))
diff --git a/enterprise/internal/batches/service/service_test.go b/enterprise/internal/batches/service/service_test.go
index 029d8100293..d359ac73a9c 100644
--- a/enterprise/internal/batches/service/service_test.go
+++ b/enterprise/internal/batches/service/service_test.go
@@ -168,6 +168,21 @@ func TestServicePermissionLevels(t *testing.T) {
_, err := svc.CreateChangesetJobs(currentUserCtx, batchChange.ID, []int64{changeset.ID}, btypes.ChangesetJobTypeComment, btypes.ChangesetJobCommentPayload{Message: "test"}, store.ListChangesetsOpts{})
tc.assertFunc(t, err)
})
+
+ t.Run("ExecuteBatchSpec", func(t *testing.T) {
+ _, err := svc.ExecuteBatchSpec(currentUserCtx, ExecuteBatchSpecOpts{
+ BatchSpecRandID: batchSpec.RandID,
+ })
+ tc.assertFunc(t, err)
+ })
+
+ t.Run("ReplaceBatchSpecInput", func(t *testing.T) {
+ _, err := svc.ReplaceBatchSpecInput(currentUserCtx, ReplaceBatchSpecInputOpts{
+ BatchSpecRandID: batchSpec.RandID,
+ RawSpec: ct.TestRawBatchSpecYAML,
+ })
+ tc.assertFunc(t, err)
+ })
})
}
}
@@ -1101,6 +1116,161 @@ func TestService(t *testing.T) {
})
})
+
+ t.Run("ExecuteBatchSpec", func(t *testing.T) {
+ t.Run("success", func(t *testing.T) {
+ spec := testBatchSpec(admin.ID)
+ if err := s.CreateBatchSpec(ctx, spec); err != nil {
+ t.Fatal(err)
+ }
+
+ // Simulate successful resolution.
+ job := &btypes.BatchSpecResolutionJob{
+ State: btypes.BatchSpecResolutionJobStateCompleted,
+ BatchSpecID: spec.ID,
+ }
+
+ if err := s.CreateBatchSpecResolutionJob(ctx, job); err != nil {
+ t.Fatal(err)
+ }
+
+ var workspaceIDs []int64
+ for _, repo := range rs {
+ ws := &btypes.BatchSpecWorkspace{BatchSpecID: spec.ID, RepoID: repo.ID}
+ if err := s.CreateBatchSpecWorkspace(ctx, ws); err != nil {
+ t.Fatal(err)
+ }
+ workspaceIDs = append(workspaceIDs, ws.ID)
+ }
+
+ // Execute BatchSpec by creating execution jobs
+ if _, err := svc.ExecuteBatchSpec(ctx, ExecuteBatchSpecOpts{BatchSpecRandID: spec.RandID}); err != nil {
+ t.Fatal(err)
+ }
+
+ jobs, err := s.ListBatchSpecWorkspaceExecutionJobs(ctx, store.ListBatchSpecWorkspaceExecutionJobsOpts{
+ BatchSpecWorkspaceIDs: workspaceIDs,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if len(jobs) != len(rs) {
+ t.Fatalf("wrong number of execution jobs created. want=%d, have=%d", len(rs), len(jobs))
+ }
+ })
+ t.Run("resolution not completed", func(t *testing.T) {
+ spec := testBatchSpec(admin.ID)
+ if err := s.CreateBatchSpec(ctx, spec); err != nil {
+ t.Fatal(err)
+ }
+
+ job := &btypes.BatchSpecResolutionJob{
+ State: btypes.BatchSpecResolutionJobStateQueued,
+ BatchSpecID: spec.ID,
+ }
+
+ if err := s.CreateBatchSpecResolutionJob(ctx, job); err != nil {
+ t.Fatal(err)
+ }
+
+ // Execute BatchSpec by creating execution jobs
+ _, err := svc.ExecuteBatchSpec(ctx, ExecuteBatchSpecOpts{BatchSpecRandID: spec.RandID})
+ if !errors.Is(err, ErrBatchSpecResolutionIncomplete) {
+ t.Fatalf("error has wrong type: %T", err)
+ }
+ })
+
+ t.Run("resolution failed", func(t *testing.T) {
+ spec := testBatchSpec(admin.ID)
+ if err := s.CreateBatchSpec(ctx, spec); err != nil {
+ t.Fatal(err)
+ }
+
+ failureMessage := "cat ate the homework"
+ job := &btypes.BatchSpecResolutionJob{
+ State: btypes.BatchSpecResolutionJobStateFailed,
+ FailureMessage: &failureMessage,
+ BatchSpecID: spec.ID,
+ }
+
+ if err := s.CreateBatchSpecResolutionJob(ctx, job); err != nil {
+ t.Fatal(err)
+ }
+
+ // Execute BatchSpec by creating execution jobs
+ _, err := svc.ExecuteBatchSpec(ctx, ExecuteBatchSpecOpts{BatchSpecRandID: spec.RandID})
+ if !errors.HasType(err, ErrBatchSpecResolutionErrored{}) {
+ t.Fatalf("error has wrong type: %T", err)
+ }
+ })
+ })
+
+ t.Run("ReplaceBatchSpecInput", func(t *testing.T) {
+ t.Run("success", func(t *testing.T) {
+ spec := testBatchSpec(admin.ID)
+ if err := s.CreateBatchSpec(ctx, spec); err != nil {
+ t.Fatal(err)
+ }
+
+ job := &btypes.BatchSpecResolutionJob{
+ State: btypes.BatchSpecResolutionJobStateCompleted,
+ BatchSpecID: spec.ID,
+ }
+
+ if err := s.CreateBatchSpecResolutionJob(ctx, job); err != nil {
+ t.Fatal(err)
+ }
+
+ for _, repo := range rs {
+ ws := &btypes.BatchSpecWorkspace{BatchSpecID: spec.ID, RepoID: repo.ID}
+ if err := s.CreateBatchSpecWorkspace(ctx, ws); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ newSpec, err := svc.ReplaceBatchSpecInput(ctx, ReplaceBatchSpecInputOpts{
+ BatchSpecRandID: spec.RandID,
+ RawSpec: ct.TestRawBatchSpecYAML,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if newSpec.ID == spec.ID {
+ t.Fatalf("new batch spec has same ID as old one: %d", newSpec.ID)
+ }
+
+ if newSpec.RandID != spec.RandID {
+ t.Fatalf("new batch spec has different RandID. new=%s, old=%s", newSpec.RandID, spec.RandID)
+ }
+ if newSpec.UserID != spec.UserID {
+ t.Fatalf("new batch spec has different UserID. new=%d, old=%d", newSpec.UserID, spec.UserID)
+ }
+ if newSpec.NamespaceUserID != spec.NamespaceUserID {
+ t.Fatalf("new batch spec has different NamespaceUserID. new=%d, old=%d", newSpec.NamespaceUserID, spec.NamespaceUserID)
+ }
+ if newSpec.NamespaceOrgID != spec.NamespaceOrgID {
+ t.Fatalf("new batch spec has different NamespaceOrgID. new=%d, old=%d", newSpec.NamespaceOrgID, spec.NamespaceOrgID)
+ }
+
+ resolutionJob, err := s.GetBatchSpecResolutionJob(ctx, store.GetBatchSpecResolutionJobOpts{
+ BatchSpecID: newSpec.ID,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ if want, have := btypes.BatchSpecResolutionJobStateQueued, resolutionJob.State; have != want {
+ t.Fatalf("resolution job has wrong state. want=%s, have=%s", want, have)
+ }
+
+ // Assert that old batch spec is deleted
+ _, err = s.GetBatchSpec(ctx, store.GetBatchSpecOpts{ID: spec.ID})
+ if err != store.ErrNoResults {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ })
+ })
}
func testBatchChange(user int32, spec *btypes.BatchSpec) *btypes.BatchChange {
diff --git a/enterprise/internal/batches/store/batch_spec_resolution_jobs.go b/enterprise/internal/batches/store/batch_spec_resolution_jobs.go
index 47a7dd90ef7..c07f58998ba 100644
--- a/enterprise/internal/batches/store/batch_spec_resolution_jobs.go
+++ b/enterprise/internal/batches/store/batch_spec_resolution_jobs.go
@@ -69,12 +69,17 @@ func (s *Store) CreateBatchSpecResolutionJob(ctx context.Context, ws ...*btypes.
wj.UpdatedAt = wj.CreatedAt
}
+ state := string(wj.State)
+ if state == "" {
+ state = string(btypes.BatchSpecResolutionJobStateQueued)
+ }
+
if err := inserter.Insert(
ctx,
wj.BatchSpecID,
wj.AllowUnsupported,
wj.AllowIgnored,
- wj.State,
+ state,
wj.CreatedAt,
wj.UpdatedAt,
); err != nil {
diff --git a/enterprise/internal/batches/store/batch_spec_workspace.go b/enterprise/internal/batches/store/batch_spec_workspace.go
index a410a749e60..36ca506adac 100644
--- a/enterprise/internal/batches/store/batch_spec_workspace.go
+++ b/enterprise/internal/batches/store/batch_spec_workspace.go
@@ -14,6 +14,7 @@ import (
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/database/batch"
"github.com/sourcegraph/sourcegraph/internal/observation"
+ batcheslib "github.com/sourcegraph/sourcegraph/lib/batches"
)
// batchSpecWorkspaceInsertColumns is the list of batch_spec_workspaces columns
@@ -85,6 +86,10 @@ func (s *Store) CreateBatchSpecWorkspace(ctx context.Context, ws ...*btypes.Batc
wj.FileMatches = []string{}
}
+ if wj.Steps == nil {
+ wj.Steps = []batcheslib.Step{}
+ }
+
marshaledSteps, err := json.Marshal(wj.Steps)
if err != nil {
return err
@@ -177,11 +182,13 @@ func getBatchSpecWorkspaceQuery(opts *GetBatchSpecWorkspaceOpts) *sqlf.Query {
// ListBatchSpecWorkspacesOpts captures the query options needed for
// listing batch spec workspace jobs.
type ListBatchSpecWorkspacesOpts struct {
+ LimitOpts
+ Cursor int64
BatchSpecID int64
}
// ListBatchSpecWorkspaces lists batch changes with the given filters.
-func (s *Store) ListBatchSpecWorkspaces(ctx context.Context, opts ListBatchSpecWorkspacesOpts) (cs []*btypes.BatchSpecWorkspace, err error) {
+func (s *Store) ListBatchSpecWorkspaces(ctx context.Context, opts ListBatchSpecWorkspacesOpts) (cs []*btypes.BatchSpecWorkspace, next int64, err error) {
ctx, endObservation := s.operations.listBatchSpecWorkspaces.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})
@@ -197,7 +204,12 @@ func (s *Store) ListBatchSpecWorkspaces(ctx context.Context, opts ListBatchSpecW
return nil
})
- return cs, err
+ if opts.Limit != 0 && len(cs) == opts.DBLimit() {
+ next = cs[len(cs)-1].ID
+ cs = cs[:len(cs)-1]
+ }
+
+ return cs, next, err
}
var listBatchSpecWorkspacesQueryFmtstr = `
@@ -217,8 +229,12 @@ func listBatchSpecWorkspacesQuery(opts ListBatchSpecWorkspacesOpts) *sqlf.Query
preds = append(preds, sqlf.Sprintf("batch_spec_workspaces.batch_spec_id = %d", opts.BatchSpecID))
}
+ if opts.Cursor > 0 {
+ preds = append(preds, sqlf.Sprintf("batch_spec_workspaces.id >= %s", opts.Cursor))
+ }
+
return sqlf.Sprintf(
- listBatchSpecWorkspacesQueryFmtstr,
+ listBatchSpecWorkspacesQueryFmtstr+opts.LimitOpts.ToDB(),
sqlf.Join(BatchSpecWorkspaceColums.ToSqlf(), ", "),
sqlf.Join(preds, "\n AND "),
)
diff --git a/enterprise/internal/batches/store/batch_spec_workspace_execution_jobs.go b/enterprise/internal/batches/store/batch_spec_workspace_execution_jobs.go
index 3ec94ca10e7..aacb8539f5a 100644
--- a/enterprise/internal/batches/store/batch_spec_workspace_execution_jobs.go
+++ b/enterprise/internal/batches/store/batch_spec_workspace_execution_jobs.go
@@ -7,6 +7,7 @@ import (
"github.com/keegancsmith/sqlf"
"github.com/lib/pq"
"github.com/opentracing/opentracing-go/log"
+
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/database/batch"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
@@ -42,6 +43,29 @@ var BatchSpecWorkspaceExecutionJobColums = SQLColumns{
"batch_spec_workspace_execution_jobs.updated_at",
}
+const createBatchSpecWorkspaceExecutionJobsQueryFmtstr = `
+-- source: enterprise/internal/batches/store/batch_spec_workspace_execution_jobs.go:CreateBatchSpecWorkspaceExecutionJobs
+INSERT INTO
+ batch_spec_workspace_execution_jobs (batch_spec_workspace_id)
+SELECT
+ id
+FROM
+ batch_spec_workspaces
+WHERE
+ batch_spec_id = %s
+`
+
+// CreateBatchSpecWorkspaceExecutionJob creates the given batch spec workspace jobs.
+func (s *Store) CreateBatchSpecWorkspaceExecutionJobs(ctx context.Context, batchSpecID int64) (err error) {
+ ctx, endObservation := s.operations.createBatchSpecWorkspaceExecutionJob.With(ctx, &err, observation.Args{LogFields: []log.Field{
+ log.Int("batchSpecID", int(batchSpecID)),
+ }})
+ defer endObservation(1, observation.Args{})
+
+ q := sqlf.Sprintf(createBatchSpecWorkspaceExecutionJobsQueryFmtstr, batchSpecID)
+ return s.Exec(ctx, q)
+}
+
// CreateBatchSpecWorkspaceExecutionJob creates the given batch spec workspace jobs.
func (s *Store) CreateBatchSpecWorkspaceExecutionJob(ctx context.Context, jobs ...*btypes.BatchSpecWorkspaceExecutionJob) (err error) {
ctx, endObservation := s.operations.createBatchSpecWorkspaceExecutionJob.With(ctx, &err, observation.Args{LogFields: []log.Field{
@@ -144,9 +168,10 @@ func getBatchSpecWorkspaceExecutionJobQuery(opts *GetBatchSpecWorkspaceExecution
// ListBatchSpecWorkspaceExecutionJobsOpts captures the query options needed for
// listing batch spec workspace execution jobs.
type ListBatchSpecWorkspaceExecutionJobsOpts struct {
- Cancel *bool
- State btypes.BatchSpecWorkspaceExecutionJobState
- WorkerHostname string
+ Cancel *bool
+ State btypes.BatchSpecWorkspaceExecutionJobState
+ WorkerHostname string
+ BatchSpecWorkspaceIDs []int64
}
// ListBatchSpecWorkspaceExecutionJobs lists batch changes with the given filters.
@@ -191,6 +216,10 @@ func listBatchSpecWorkspaceExecutionJobsQuery(opts ListBatchSpecWorkspaceExecuti
preds = append(preds, sqlf.Sprintf("batch_spec_workspace_execution_jobs.cancel = %s", *opts.Cancel))
}
+ if len(opts.BatchSpecWorkspaceIDs) != 0 {
+ preds = append(preds, sqlf.Sprintf("batch_spec_workspace_execution_jobs.batch_spec_workspace_id = ANY (%s)", pq.Array(opts.BatchSpecWorkspaceIDs)))
+ }
+
if len(preds) == 0 {
preds = append(preds, sqlf.Sprintf("TRUE"))
}
diff --git a/enterprise/internal/batches/store/batch_spec_workspace_test.go b/enterprise/internal/batches/store/batch_spec_workspace_test.go
index a93e9d687bd..8a157fc5581 100644
--- a/enterprise/internal/batches/store/batch_spec_workspace_test.go
+++ b/enterprise/internal/batches/store/batch_spec_workspace_test.go
@@ -125,7 +125,7 @@ func testStoreBatchSpecWorkspaces(t *testing.T, ctx context.Context, s *Store, c
t.Run("List", func(t *testing.T) {
t.Run("All", func(t *testing.T) {
- have, err := s.ListBatchSpecWorkspaces(ctx, ListBatchSpecWorkspacesOpts{})
+ have, _, err := s.ListBatchSpecWorkspaces(ctx, ListBatchSpecWorkspacesOpts{})
if err != nil {
t.Fatal(err)
}
@@ -136,7 +136,7 @@ func testStoreBatchSpecWorkspaces(t *testing.T, ctx context.Context, s *Store, c
t.Run("ByBatchSpecID", func(t *testing.T) {
for _, ws := range workspaces {
- have, err := s.ListBatchSpecWorkspaces(ctx, ListBatchSpecWorkspacesOpts{
+ have, _, err := s.ListBatchSpecWorkspaces(ctx, ListBatchSpecWorkspacesOpts{
BatchSpecID: ws.BatchSpecID,
})
diff --git a/enterprise/internal/batches/store/batch_specs.go b/enterprise/internal/batches/store/batch_specs.go
index 1a9d77346af..038d1d06a15 100644
--- a/enterprise/internal/batches/store/batch_specs.go
+++ b/enterprise/internal/batches/store/batch_specs.go
@@ -159,20 +159,57 @@ var deleteBatchSpecQueryFmtstr = `
DELETE FROM batch_specs WHERE id = %s
`
+// CountBatchSpecsOpts captures the query options needed for
+// counting batch specs.
+type CountBatchSpecsOpts struct {
+ BatchChangeID int64
+}
+
// CountBatchSpecs returns the number of code mods in the database.
-func (s *Store) CountBatchSpecs(ctx context.Context) (count int, err error) {
+func (s *Store) CountBatchSpecs(ctx context.Context, opts CountBatchSpecsOpts) (count int, err error) {
ctx, endObservation := s.operations.countBatchSpecs.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})
- return s.queryCount(ctx, sqlf.Sprintf(countBatchSpecsQueryFmtstr))
+ q := countBatchSpecsQuery(opts)
+
+ return s.queryCount(ctx, q)
}
var countBatchSpecsQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_specs.go:CountBatchSpecs
SELECT COUNT(id)
FROM batch_specs
+-- Joins go here:
+%s
+WHERE %s
`
+func countBatchSpecsQuery(opts CountBatchSpecsOpts) *sqlf.Query {
+ preds := []*sqlf.Query{}
+ joins := []*sqlf.Query{}
+
+ if opts.BatchChangeID != 0 {
+ joins = append(joins, sqlf.Sprintf(`INNER JOIN batch_changes
+ON
+ batch_changes.name = batch_specs.spec->>'name'
+ AND
+ batch_changes.namespace_user_id IS NOT DISTINCT FROM batch_specs.namespace_user_id
+ AND
+ batch_changes.namespace_org_id IS NOT DISTINCT FROM batch_specs.namespace_org_id`))
+ preds = append(preds, sqlf.Sprintf("batch_changes.id = %s", opts.BatchChangeID))
+ }
+
+ if len(preds) == 0 {
+ preds = append(preds, sqlf.Sprintf("TRUE"))
+ }
+
+ return sqlf.Sprintf(
+ countBatchSpecsQueryFmtstr,
+ sqlf.Join(joins, "\n"),
+ sqlf.Join(preds, "\n AND "),
+ )
+}
+
// GetBatchSpecOpts captures the query options needed for getting a BatchSpec
type GetBatchSpecOpts struct {
ID int64
@@ -304,7 +341,8 @@ func getNewestBatchSpecQuery(opts *GetNewestBatchSpecOpts) *sqlf.Query {
// listing batch specs.
type ListBatchSpecsOpts struct {
LimitOpts
- Cursor int64
+ Cursor int64
+ BatchChangeID int64
}
// ListBatchSpecs lists BatchSpecs with the given filters.
@@ -335,18 +373,33 @@ func (s *Store) ListBatchSpecs(ctx context.Context, opts ListBatchSpecsOpts) (cs
var listBatchSpecsQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_specs.go:ListBatchSpecs
SELECT %s FROM batch_specs
+-- Joins go here:
+%s
WHERE %s
ORDER BY id ASC
`
func listBatchSpecsQuery(opts *ListBatchSpecsOpts) *sqlf.Query {
preds := []*sqlf.Query{
- sqlf.Sprintf("id >= %s", opts.Cursor),
+ sqlf.Sprintf("batch_specs.id >= %s", opts.Cursor),
+ }
+ joins := []*sqlf.Query{}
+
+ if opts.BatchChangeID != 0 {
+ joins = append(joins, sqlf.Sprintf(`INNER JOIN batch_changes
+ON
+ batch_changes.name = batch_specs.spec->>'name'
+ AND
+ batch_changes.namespace_user_id IS NOT DISTINCT FROM batch_specs.namespace_user_id
+ AND
+ batch_changes.namespace_org_id IS NOT DISTINCT FROM batch_specs.namespace_org_id`))
+ preds = append(preds, sqlf.Sprintf("batch_changes.id = %s", opts.BatchChangeID))
}
return sqlf.Sprintf(
listBatchSpecsQueryFmtstr+opts.LimitOpts.ToDB(),
sqlf.Join(batchSpecColumns, ", "),
+ sqlf.Join(joins, "\n"),
sqlf.Join(preds, "\n AND "),
)
}
diff --git a/enterprise/internal/batches/store/batch_specs_test.go b/enterprise/internal/batches/store/batch_specs_test.go
index b98d9c81a38..9b698c3f282 100644
--- a/enterprise/internal/batches/store/batch_specs_test.go
+++ b/enterprise/internal/batches/store/batch_specs_test.go
@@ -78,7 +78,7 @@ func testStoreBatchSpecs(t *testing.T, ctx context.Context, s *Store, clock ct.C
}
t.Run("Count", func(t *testing.T) {
- count, err := s.CountBatchSpecs(ctx)
+ count, err := s.CountBatchSpecs(ctx, CountBatchSpecsOpts{})
if err != nil {
t.Fatal(err)
}
@@ -274,7 +274,7 @@ func testStoreBatchSpecs(t *testing.T, ctx context.Context, s *Store, clock ct.C
t.Fatal(err)
}
- count, err := s.CountBatchSpecs(ctx)
+ count, err := s.CountBatchSpecs(ctx, CountBatchSpecsOpts{})
if err != nil {
t.Fatal(err)
}
diff --git a/enterprise/internal/batches/store/changeset_specs.go b/enterprise/internal/batches/store/changeset_specs.go
index 0601b4630d0..a65a2a01aa4 100644
--- a/enterprise/internal/batches/store/changeset_specs.go
+++ b/enterprise/internal/batches/store/changeset_specs.go
@@ -217,6 +217,7 @@ DELETE FROM changeset_specs WHERE id = %s
// ChangesetSpecs.
type CountChangesetSpecsOpts struct {
BatchSpecID int64
+ Type batcheslib.ChangesetSpecDescriptionType
}
// CountChangesetSpecs returns the number of changeset specs in the database.
@@ -247,6 +248,16 @@ func countChangesetSpecsQuery(opts *CountChangesetSpecsOpts) *sqlf.Query {
preds = append(preds, cond)
}
+ if opts.Type != "" {
+ if opts.Type == batcheslib.ChangesetSpecDescriptionTypeExisting {
+ // Check that externalID is not empty.
+ preds = append(preds, sqlf.Sprintf("COALESCE(changeset_specs.spec->>'externalID', NULL) IS NOT NULL", opts.BatchSpecID))
+ } else {
+ // Check that externalID is empty.
+ preds = append(preds, sqlf.Sprintf("COALESCE(changeset_specs.spec->>'externalID', NULL) IS NULL", opts.BatchSpecID))
+ }
+ }
+
if len(preds) == 0 {
preds = append(preds, sqlf.Sprintf("TRUE"))
}
@@ -331,6 +342,7 @@ type ListChangesetSpecsOpts struct {
BatchSpecID int64
RandIDs []string
IDs []int64
+ Type batcheslib.ChangesetSpecDescriptionType
}
// ListChangesetSpecs lists ChangesetSpecs with the given filters.
@@ -384,6 +396,16 @@ func listChangesetSpecsQuery(opts *ListChangesetSpecsOpts) *sqlf.Query {
preds = append(preds, sqlf.Sprintf("changeset_specs.id = ANY (%s)", pq.Array(opts.IDs)))
}
+ if opts.Type != "" {
+ if opts.Type == batcheslib.ChangesetSpecDescriptionTypeExisting {
+ // Check that externalID is not empty.
+ preds = append(preds, sqlf.Sprintf("COALESCE(changeset_specs.spec->>'externalID', NULL) IS NOT NULL", opts.BatchSpecID))
+ } else {
+ // Check that externalID is empty.
+ preds = append(preds, sqlf.Sprintf("COALESCE(changeset_specs.spec->>'externalID', NULL) IS NULL", opts.BatchSpecID))
+ }
+ }
+
return sqlf.Sprintf(
listChangesetSpecsQueryFmtstr+opts.LimitOpts.ToDB(),
sqlf.Join(changesetSpecColumns, ", "),
diff --git a/enterprise/internal/batches/store/store.go b/enterprise/internal/batches/store/store.go
index 1fa3be8b1af..d8dd0858144 100644
--- a/enterprise/internal/batches/store/store.go
+++ b/enterprise/internal/batches/store/store.go
@@ -214,10 +214,11 @@ type operations struct {
getBatchSpecWorkspace *observation.Operation
listBatchSpecWorkspaces *observation.Operation
- createBatchSpecWorkspaceExecutionJob *observation.Operation
- getBatchSpecWorkspaceExecutionJob *observation.Operation
- listBatchSpecWorkspaceExecutionJobs *observation.Operation
- cancelBatchSpecWorkspaceExecutionJob *observation.Operation
+ createBatchSpecWorkspaceExecutionJob *observation.Operation
+ createBatchSpecWorkspaceExecutionJobs *observation.Operation
+ getBatchSpecWorkspaceExecutionJob *observation.Operation
+ listBatchSpecWorkspaceExecutionJobs *observation.Operation
+ cancelBatchSpecWorkspaceExecutionJob *observation.Operation
createBatchSpecResolutionJob *observation.Operation
getBatchSpecResolutionJob *observation.Operation
@@ -332,10 +333,11 @@ func newOperations(observationContext *observation.Context) *operations {
getBatchSpecWorkspace: op("GetBatchSpecWorkspace"),
listBatchSpecWorkspaces: op("ListBatchSpecWorkspaces"),
- createBatchSpecWorkspaceExecutionJob: op("CreateBatchSpecWorkspaceExecutionJob"),
- getBatchSpecWorkspaceExecutionJob: op("GetBatchSpecWorkspaceExecutionJob"),
- listBatchSpecWorkspaceExecutionJobs: op("ListBatchSpecWorkspaceExecutionJobs"),
- cancelBatchSpecWorkspaceExecutionJob: op("CancelBatchSpecWorkspaceExecutionJob"),
+ createBatchSpecWorkspaceExecutionJob: op("CreateBatchSpecWorkspaceExecutionJob"),
+ createBatchSpecWorkspaceExecutionJobs: op("CreateBatchSpecWorkspaceExecutionJobs"),
+ getBatchSpecWorkspaceExecutionJob: op("GetBatchSpecWorkspaceExecutionJob"),
+ listBatchSpecWorkspaceExecutionJobs: op("ListBatchSpecWorkspaceExecutionJobs"),
+ cancelBatchSpecWorkspaceExecutionJob: op("CancelBatchSpecWorkspaceExecutionJob"),
createBatchSpecResolutionJob: op("CreateBatchSpecResolutionJob"),
getBatchSpecResolutionJob: op("GetBatchSpecResolutionJob"),
diff --git a/lib/batches/json_logs.go b/lib/batches/json_logs.go
index 3687494bbd3..604291fcc52 100644
--- a/lib/batches/json_logs.go
+++ b/lib/batches/json_logs.go
@@ -1,14 +1,92 @@
package batches
-import "time"
+import (
+ "encoding/json"
+ "time"
+
+ "github.com/cockroachdb/errors"
+)
type LogEvent struct {
Operation LogEventOperation `json:"operation"`
Timestamp time.Time `json:"timestamp"`
- Status LogEventStatus `json:"status"`
- Metadata map[string]interface{} `json:"metadata,omitempty"`
+ Status LogEventStatus `json:"status"`
+ Metadata interface{} `json:"metadata,omitempty"`
+}
+
+type logEventJSON struct {
+ Operation LogEventOperation `json:"operation"`
+ Timestamp time.Time `json:"timestamp"`
+ Status LogEventStatus `json:"status"`
+}
+
+func (l *LogEvent) UnmarshalJSON(data []byte) error {
+ var j *logEventJSON
+ if err := json.Unmarshal(data, &j); err != nil {
+ return err
+ }
+ l.Operation = j.Operation
+ l.Timestamp = j.Timestamp
+ l.Status = j.Status
+
+ switch l.Operation {
+ case LogEventOperationParsingBatchSpec:
+ l.Metadata = new(ParsingBatchSpecMetadata)
+ case LogEventOperationResolvingNamespace:
+ l.Metadata = new(ResolvingNamespaceMetadata)
+ case LogEventOperationPreparingDockerImages:
+ l.Metadata = new(PreparingDockerImagesMetadata)
+ case LogEventOperationDeterminingWorkspaceType:
+ l.Metadata = new(DeterminingWorkspaceTypeMetadata)
+ case LogEventOperationResolvingRepositories:
+ l.Metadata = new(ResolvingRepositoriesMetadata)
+ case LogEventOperationDeterminingWorkspaces:
+ l.Metadata = new(DeterminingWorkspacesMetadata)
+ case LogEventOperationCheckingCache:
+ l.Metadata = new(CheckingCacheMetadata)
+ case LogEventOperationExecutingTasks:
+ l.Metadata = new(ExecutingTasksMetadata)
+ case LogEventOperationLogFileKept:
+ l.Metadata = new(LogFileKeptMetadata)
+ case LogEventOperationUploadingChangesetSpecs:
+ l.Metadata = new(UploadingChangesetSpecsMetadata)
+ case LogEventOperationCreatingBatchSpec:
+ l.Metadata = new(CreatingBatchSpecMetadata)
+ case LogEventOperationApplyingBatchSpec:
+ l.Metadata = new(ApplyingBatchSpecMetadata)
+ case LogEventOperationBatchSpecExecution:
+ l.Metadata = new(BatchSpecExecutionMetadata)
+ case LogEventOperationExecutingTask:
+ l.Metadata = new(ExecutingTaskMetadata)
+ case LogEventOperationTaskBuildChangesetSpecs:
+ l.Metadata = new(TaskBuildChangesetSpecsMetadata)
+ case LogEventOperationTaskDownloadingArchive:
+ l.Metadata = new(TaskDownloadingArchiveMetadata)
+ case LogEventOperationTaskInitializingWorkspace:
+ l.Metadata = new(TaskInitializingWorkspaceMetadata)
+ case LogEventOperationTaskSkippingSteps:
+ l.Metadata = new(TaskSkippingStepsMetadata)
+ case LogEventOperationTaskStepSkipped:
+ l.Metadata = new(TaskStepSkippedMetadata)
+ case LogEventOperationTaskPreparingStep:
+ l.Metadata = new(TaskPreparingStepMetadata)
+ case LogEventOperationTaskStep:
+ l.Metadata = new(TaskStepMetadata)
+ case LogEventOperationTaskCalculatingDiff:
+ l.Metadata = new(TaskCalculatingDiffMetadata)
+ default:
+ return errors.Newf("invalid event type %s", l.Operation)
+ }
+
+ wrapper := struct {
+ Metadata interface{} `json:"metadata"`
+ }{
+ Metadata: l.Metadata,
+ }
+
+ return json.Unmarshal(data, &wrapper)
}
type LogEventOperation string
@@ -46,3 +124,127 @@ const (
LogEventStatusFailure LogEventStatus = "FAILURE"
LogEventStatusProgress LogEventStatus = "PROGRESS"
)
+
+type ParsingBatchSpecMetadata struct {
+ Error string `json:"error,omitempty"`
+}
+
+type ResolvingNamespaceMetadata struct {
+ NamespaceID string `json:"namespaceID,omitempty"`
+}
+
+type PreparingDockerImagesMetadata struct {
+ Done int `json:"done,omitempty"`
+ Total int `json:"total,omitempty"`
+}
+
+type DeterminingWorkspaceTypeMetadata struct {
+ Type string `json:"type,omitempty"`
+}
+
+type ResolvingRepositoriesMetadata struct {
+ Unsupported int `json:"unsupported,omitempty"`
+ Ignored int `json:"ignored,omitempty"`
+ Count int `json:"count,omitempty"`
+}
+
+type DeterminingWorkspacesMetadata struct {
+ Count int `json:"count,omitempty"`
+}
+
+type CheckingCacheMetadata struct {
+ CachedSpecsFound int `json:"cachedSpecsFound,omitempty"`
+ TasksToExecute int `json:"tasksToExecute,omitempty"`
+}
+
+type JSONLinesTask struct {
+ ID string `json:"id"`
+ Repository string `json:"repository"`
+ Workspace string `json:"workspace"`
+ Steps []Step `json:"steps"`
+ CachedStepResultsFound bool `json:"cachedStepResultFound"`
+ StartStep int `json:"startStep"`
+}
+
+type ExecutingTasksMetadata struct {
+ Tasks []JSONLinesTask `json:"tasks,omitempty"`
+ Skipped bool `json:"skipped,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+type LogFileKeptMetadata struct {
+ Path string `json:"path,omitempty"`
+}
+
+type UploadingChangesetSpecsMetadata struct {
+ Done int `json:"done,omitempty"`
+ Total int `json:"total,omitempty"`
+ // IDs is the slice of GraphQL IDs of the created changeset specs.
+ IDs []string `json:"ids,omitempty"`
+}
+
+type CreatingBatchSpecMetadata struct {
+ PreviewURL string `json:"previewURL,omitempty"`
+}
+
+type ApplyingBatchSpecMetadata struct {
+ BatchChangeURL string `json:"batchChangeURL,omitempty"`
+}
+
+type BatchSpecExecutionMetadata struct {
+ Error string `json:"error,omitempty"`
+}
+
+type ExecutingTaskMetadata struct {
+ TaskID string `json:"taskID,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+type TaskBuildChangesetSpecsMetadata struct {
+ TaskID string `json:"taskID,omitempty"`
+}
+
+type TaskDownloadingArchiveMetadata struct {
+ TaskID string `json:"taskID,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+type TaskInitializingWorkspaceMetadata struct {
+ TaskID string `json:"taskID,omitempty"`
+}
+
+type TaskSkippingStepsMetadata struct {
+ TaskID string `json:"taskID,omitempty"`
+ StartStep int `json:"startStep,omitempty"`
+}
+
+type TaskStepSkippedMetadata struct {
+ TaskID string `json:"taskID,omitempty"`
+ Step int `json:"step,omitempty"`
+}
+
+type TaskPreparingStepMetadata struct {
+ TaskID string `json:"taskID,omitempty"`
+ Step int `json:"step,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+type TaskStepMetadata struct {
+ TaskID string `json:"taskID,omitempty"`
+ Step int `json:"step,omitempty"`
+
+ RunScript string `json:"runScript,omitempty"`
+ Env map[string]string `json:"env,omitempty"`
+
+ Out string `json:"out,omitempty"`
+
+ Diff string `json:"diff,omitempty"`
+ Outputs map[string]interface{} `json:"outputs,omitempty"`
+
+ ExitCode int `json:"exitCode,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+type TaskCalculatingDiffMetadata struct {
+ TaskID string `json:"taskID,omitempty"`
+}
diff --git a/sg.config.yaml b/sg.config.yaml
index 3ce85885e09..cb3c085bf92 100644
--- a/sg.config.yaml
+++ b/sg.config.yaml
@@ -438,6 +438,7 @@ commands:
env TMPDIR="$HOME/.sourcegraph/batches-executor-temp" .bin/executor
env:
EXECUTOR_QUEUE_NAME: batches
+ EXECUTOR_MAXIMUM_NUM_JOBS: 8
SRC_PROF_HTTP: ":6093"
# If you want to use this, either start it with `sg run batches-executor-firecracker` or