Add ctx to QueueOptions.RecordTransformer (#22592)

This commit is contained in:
Thorsten Ball 2021-07-05 17:06:13 +02:00 committed by GitHub
parent add53ea7a8
commit 2a97512ba6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 31 additions and 15 deletions

View File

@ -1,6 +1,7 @@
package batches
import (
"context"
"database/sql"
apiserver "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/server"
@ -14,8 +15,8 @@ import (
)
func QueueOptions(db dbutil.DB, config *Config, observationContext *observation.Context) apiserver.QueueOptions {
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) {
return transformRecord(record.(*btypes.BatchSpecExecution), config)
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return transformRecord(ctx, db, record.(*btypes.BatchSpecExecution), config)
}
return apiserver.QueueOptions{

View File

@ -1,15 +1,17 @@
package batches
import (
"context"
"fmt"
"net/url"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
)
// transformRecord transforms a *btypes.BatchSpecExecution into an apiclient.Job.
func transformRecord(exec *btypes.BatchSpecExecution, config *Config) (apiclient.Job, error) {
func transformRecord(ctx context.Context, db dbutil.DB, exec *btypes.BatchSpecExecution, config *Config) (apiclient.Job, error) {
srcEndpoint, err := makeURL(config.Shared.FrontendURL, config.Shared.FrontendUsername, config.Shared.FrontendPassword)
if err != nil {
return apiclient.Job{}, err

View File

@ -1,6 +1,7 @@
package batches
import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
@ -8,6 +9,7 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/config"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
"github.com/sourcegraph/sourcegraph/internal/database/dbtesting"
)
func TestTransformRecord(t *testing.T) {
@ -24,7 +26,7 @@ func TestTransformRecord(t *testing.T) {
},
}
job, err := transformRecord(index, config)
job, err := transformRecord(context.Background(), &dbtesting.MockDB{}, index, config)
if err != nil {
t.Fatalf("unexpected error transforming record: %s", err)
}

View File

@ -1,6 +1,7 @@
package codeintel
import (
"context"
"database/sql"
"time"
@ -28,7 +29,7 @@ const StalledJobMaximumAge = time.Second * 5
const MaximumNumResets = 3
func QueueOptions(db dbutil.DB, config *Config, observationContext *observation.Context) apiserver.QueueOptions {
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) {
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return transformRecord(record.(store.Index), config)
}

View File

@ -59,7 +59,7 @@ type QueueOptions struct {
// RecordTransformer is a required hook for each registered queue that transforms a generic
// record from that queue into the job to be given to an executor.
RecordTransformer func(record workerutil.Record) (apiclient.Job, error)
RecordTransformer func(ctx context.Context, record workerutil.Record) (apiclient.Job, error)
}
type executorMeta struct {
@ -129,7 +129,7 @@ func (m *handler) dequeue(ctx context.Context, queueName, executorName, executor
return apiclient.Job{}, false, nil
}
job, err := queueOptions.RecordTransformer(record)
job, err := queueOptions.RecordTransformer(ctx, record)
if err != nil {
return apiclient.Job{}, false, tx.Done(err)
}

View File

@ -25,7 +25,7 @@ func TestDequeue(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.DequeueWithIndependentTransactionContextFunc.SetDefaultReturn(testRecord{ID: 42, Payload: "secret"}, store, true, nil)
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) {
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
if tr, ok := record.(testRecord); !ok {
t.Errorf("mismatched record type.")
} else if tr.Payload != "secret" {
@ -90,7 +90,9 @@ func TestDequeueMaxTransactions(t *testing.T) {
store.DequeueWithIndependentTransactionContextFunc.PushReturn(testRecord{ID: 41}, store, true, nil)
store.DequeueWithIndependentTransactionContextFunc.PushReturn(testRecord{ID: 42}, store, true, nil)
store.DequeueWithIndependentTransactionContextFunc.PushReturn(testRecord{ID: 43}, store, true, nil)
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) { return apiclient.Job{}, nil }
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{}, nil
}
options := Options{
QueueOptions: map[string]QueueOptions{
@ -140,7 +142,9 @@ func TestDequeueMaxTransactions(t *testing.T) {
func TestAddExecutionLogEntry(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.DequeueWithIndependentTransactionContextFunc.SetDefaultReturn(testRecord{ID: 42}, store, true, nil)
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) { return apiclient.Job{ID: 42}, nil }
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: 42}, nil
}
options := Options{
QueueOptions: map[string]QueueOptions{
@ -211,7 +215,9 @@ func TestAddExecutionLogEntryUnknownJob(t *testing.T) {
func TestMarkComplete(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.DequeueWithIndependentTransactionContextFunc.SetDefaultReturn(testRecord{ID: 42}, store, true, nil)
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) { return apiclient.Job{ID: 42}, nil }
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: 42}, nil
}
options := Options{
QueueOptions: map[string]QueueOptions{
@ -267,7 +273,9 @@ func TestMarkCompleteUnknownQueue(t *testing.T) {
func TestMarkErrored(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.DequeueWithIndependentTransactionContextFunc.SetDefaultReturn(testRecord{ID: 42}, store, true, nil)
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) { return apiclient.Job{ID: 42}, nil }
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: 42}, nil
}
options := Options{
QueueOptions: map[string]QueueOptions{
@ -326,7 +334,9 @@ func TestMarkErroredUnknownQueue(t *testing.T) {
func TestMarkFailed(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.DequeueWithIndependentTransactionContextFunc.SetDefaultReturn(testRecord{ID: 42}, store, true, nil)
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) { return apiclient.Job{ID: 42}, nil }
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: 42}, nil
}
options := Options{
QueueOptions: map[string]QueueOptions{

View File

@ -16,7 +16,7 @@ import (
func TestHeartbeat(t *testing.T) {
store1 := workerstoremocks.NewMockStore()
store2 := workerstoremocks.NewMockStore()
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) {
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: record.RecordID()}, nil
}
@ -110,7 +110,7 @@ func TestHeartbeat(t *testing.T) {
func TestCleanup(t *testing.T) {
store1 := workerstoremocks.NewMockStore()
store2 := workerstoremocks.NewMockStore()
recordTransformer := func(record workerutil.Record) (apiclient.Job, error) {
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: record.RecordID()}, nil
}