Make batch_spec_id on resolution jobs unique (#25832)

This commit is contained in:
Thorsten Ball 2021-10-08 16:12:05 +02:00 committed by GitHub
parent ad860d5781
commit 414e3f5105
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 47 deletions

View File

@ -3,13 +3,13 @@ package store
import (
"context"
"database/sql"
"fmt"
"github.com/keegancsmith/sqlf"
"github.com/lib/pq"
"github.com/opentracing/opentracing-go/log"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/database/batch"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
@ -17,8 +17,8 @@ import (
)
// batchSpecResolutionJobInsertColumns is the list of changeset_jobs columns that are
// modified in CreateChangesetJob.
var batchSpecResolutionJobInsertColumns = []string{
// modified in CreateBatchSpecResolutionJob.
var batchSpecResolutionJobInsertColumns = SQLColumns{
"batch_spec_id",
"allow_unsupported",
"allow_ignored",
@ -29,6 +29,8 @@ var batchSpecResolutionJobInsertColumns = []string{
"updated_at",
}
const batchSpecResolutionJobInsertColsFmt = `(%s, %s, %s, %s, %s, %s)`
// ChangesetJobColumns are used by the changeset job related Store methods to query
// and create changeset jobs.
var BatchSpecResolutionJobColums = SQLColumns{
@ -52,55 +54,64 @@ var BatchSpecResolutionJobColums = SQLColumns{
"batch_spec_resolution_jobs.updated_at",
}
// ErrResolutionJobAlreadyExists can be returned by
// CreateBatchSpecResolutionJob if a BatchSpecResolutionJob pointing at the
// same BatchSpec already exists.
type ErrResolutionJobAlreadyExists struct {
BatchSpecID int64
}
func (e ErrResolutionJobAlreadyExists) Error() string {
return fmt.Sprintf("a resolution job for batch spec %d already exists", e.BatchSpecID)
}
// CreateBatchSpecResolutionJob creates the given batch spec resolutionjob jobs.
func (s *Store) CreateBatchSpecResolutionJob(ctx context.Context, ws ...*btypes.BatchSpecResolutionJob) (err error) {
ctx, endObservation := s.operations.createBatchSpecResolutionJob.With(ctx, &err, observation.Args{LogFields: []log.Field{
log.Int("count", len(ws)),
}})
func (s *Store) CreateBatchSpecResolutionJob(ctx context.Context, wj *btypes.BatchSpecResolutionJob) (err error) {
ctx, endObservation := s.operations.createBatchSpecResolutionJob.With(ctx, &err, observation.Args{LogFields: []log.Field{}})
defer endObservation(1, observation.Args{})
inserter := func(inserter *batch.Inserter) error {
for _, wj := range ws {
if wj.CreatedAt.IsZero() {
wj.CreatedAt = s.now()
}
q := s.createBatchSpecResolutionJobQuery(wj)
if wj.UpdatedAt.IsZero() {
wj.UpdatedAt = wj.CreatedAt
}
state := string(wj.State)
if state == "" {
state = string(btypes.BatchSpecResolutionJobStateQueued)
}
if err := inserter.Insert(
ctx,
wj.BatchSpecID,
wj.AllowUnsupported,
wj.AllowIgnored,
state,
wj.CreatedAt,
wj.UpdatedAt,
); err != nil {
return err
}
}
return nil
err = s.query(ctx, q, func(sc scanner) (err error) {
return scanBatchSpecResolutionJob(wj, sc)
})
if err != nil && isUniqueConstraintViolation(err, "batch_spec_resolution_jobs_batch_spec_id_unique") {
return ErrResolutionJobAlreadyExists{BatchSpecID: wj.BatchSpecID}
}
i := -1
return batch.WithInserterWithReturn(
ctx,
s.Handle().DB(),
"batch_spec_resolution_jobs",
batchSpecResolutionJobInsertColumns,
BatchSpecResolutionJobColums,
func(rows *sql.Rows) error {
i++
return scanBatchSpecResolutionJob(ws[i], rows)
},
inserter,
return err
}
var createBatchSpecResolutionJobQueryFmtstr = `
-- source: enterprise/internal/batches/store/batch_spec_resolution_jobs.go:CreateBatchSpecResolutionJob
INSERT INTO batch_spec_resolution_jobs (%s)
VALUES ` + batchSpecResolutionJobInsertColsFmt + `
RETURNING %s
`
func (s *Store) createBatchSpecResolutionJobQuery(wj *btypes.BatchSpecResolutionJob) *sqlf.Query {
if wj.CreatedAt.IsZero() {
wj.CreatedAt = s.now()
}
if wj.UpdatedAt.IsZero() {
wj.UpdatedAt = wj.CreatedAt
}
state := string(wj.State)
if state == "" {
state = string(btypes.BatchSpecResolutionJobStateQueued)
}
return sqlf.Sprintf(
createBatchSpecResolutionJobQueryFmtstr,
sqlf.Join(batchSpecResolutionJobInsertColumns.ToSqlf(), ", "),
wj.BatchSpecID,
wj.AllowUnsupported,
wj.AllowIgnored,
state,
wj.CreatedAt,
wj.UpdatedAt,
sqlf.Join(BatchSpecResolutionJobColums.ToSqlf(), ", "),
)
}

View File

@ -8,8 +8,12 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/keegancsmith/sqlf"
ct "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/testing"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
)
func testStoreBatchSpecResolutionJobs(t *testing.T, ctx context.Context, s *Store, clock ct.Clock) {
@ -144,3 +148,35 @@ func testStoreBatchSpecResolutionJobs(t *testing.T, ctx context.Context, s *Stor
})
})
}
func TestBatchSpecResolutionJobs_BatchSpecIDUnique(t *testing.T) {
// This test is a separate test so we can test the database constraints,
// because in the store tests the constraints are all deferred.
ctx := context.Background()
c := &ct.TestClock{Time: timeutil.Now()}
db := dbtest.NewDB(t, "")
s := NewWithClock(db, &observation.TestContext, nil, c.Now)
user := ct.CreateTestUser(t, db, true)
batchSpec := &btypes.BatchSpec{
UserID: user.ID,
NamespaceUserID: user.ID,
}
if err := s.CreateBatchSpec(ctx, batchSpec); err != nil {
t.Fatal(err)
}
job1 := &btypes.BatchSpecResolutionJob{BatchSpecID: batchSpec.ID}
if err := s.CreateBatchSpecResolutionJob(ctx, job1); err != nil {
t.Fatal(err)
}
job2 := &btypes.BatchSpecResolutionJob{BatchSpecID: batchSpec.ID}
err := s.CreateBatchSpecResolutionJob(ctx, job2)
wantErr := ErrResolutionJobAlreadyExists{BatchSpecID: batchSpec.ID}
if err != wantErr {
t.Fatalf("wrong error. want=%s, have=%s", wantErr, err)
}
}

View File

@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/dineshappavoo/basex"
"github.com/jackc/pgconn"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/sourcegraph/internal/database"
@ -460,3 +461,8 @@ func (o LimitOpts) ToDB() string {
}
return limitClause
}
func isUniqueConstraintViolation(err error, constraintName string) bool {
var e *pgconn.PgError
return errors.As(err, &e) && e.Code == "23505" && e.ConstraintName == constraintName
}

View File

@ -101,6 +101,7 @@ Indexes:
updated_at | timestamp with time zone | | not null | now()
Indexes:
"batch_spec_resolution_jobs_pkey" PRIMARY KEY, btree (id)
"batch_spec_resolution_jobs_batch_spec_id_unique" UNIQUE CONSTRAINT, btree (batch_spec_id)
Foreign-key constraints:
"batch_spec_resolution_jobs_batch_spec_id_fkey" FOREIGN KEY (batch_spec_id) REFERENCES batch_specs(id) ON DELETE CASCADE DEFERRABLE

View File

@ -0,0 +1,6 @@
BEGIN;
ALTER TABLE IF EXISTS batch_spec_resolution_jobs
DROP CONSTRAINT IF EXISTS batch_spec_resolution_jobs_batch_spec_id_unique;
COMMIT;

View File

@ -0,0 +1,6 @@
BEGIN;
ALTER TABLE IF EXISTS batch_spec_resolution_jobs
ADD CONSTRAINT batch_spec_resolution_jobs_batch_spec_id_unique UNIQUE (batch_spec_id);
COMMIT;