fix: Add Exists method to dbworker Store to avoid COUNT(*) (#64297)

Running `COUNT(*)` on hot tables like lsif_indexes on Sourcegraph.com
shows up on profiles when the number of executors is bumped to
30+, and when the table has 100K+ jobs. So avoid `COUNT(*)`
where possible.
This commit is contained in:
Varun Gandhi 2024-08-06 22:13:09 +08:00 committed by GitHub
parent 99f7d97dc6
commit bf4eb26bdf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 274 additions and 58 deletions

View File

@ -282,18 +282,19 @@ func (m *MultiHandler) SelectNonEmptyQueues(ctx context.Context, queueNames []st
var nonEmptyQueues []string
for _, queue := range queueNames {
var err error
var count int
var isNonEmpty bool
statesBitset := dbworkerstore.StateQueued | dbworkerstore.StateErrored
switch queue {
case m.BatchesQueueHandler.Name:
count, err = m.BatchesQueueHandler.Store.QueuedCount(ctx, false)
isNonEmpty, err = m.BatchesQueueHandler.Store.Exists(ctx, statesBitset)
case m.AutoIndexQueueHandler.Name:
count, err = m.AutoIndexQueueHandler.Store.QueuedCount(ctx, false)
isNonEmpty, err = m.AutoIndexQueueHandler.Store.Exists(ctx, statesBitset)
}
if err != nil {
m.logger.Error("fetching queue size", log.Error(err), log.String("queue", queue))
return nil, err
}
if count != 0 {
if isNonEmpty {
nonEmptyQueues = append(nonEmptyQueues, queue)
}
}

View File

@ -63,14 +63,11 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Dequeue one record for each queue",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel", "batches"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
// QueuedCount gets called for each queue in queues on every invocation of HandleDequeue to filter empty queues,
// so two calls are mocked for two dequeue events. Functionally it doesn't really matter what these return, but
// for the sake of accuracy, the codeintel store returns 1 less. The batches store returns the same value because
// the batches job isn't dequeued until after the second call to QueuedCount.
codeintelMockStore.QueuedCountFunc.PushReturn(2, nil)
codeintelMockStore.QueuedCountFunc.PushReturn(1, nil)
batchesMockStore.QueuedCountFunc.PushReturn(2, nil)
batchesMockStore.QueuedCountFunc.PushReturn(2, nil)
// Initialize both with queue count = 2
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
batchesMockStore.ExistsFunc.PushReturn(true, nil)
batchesMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.DequeueFunc.PushReturn(uploadsshared.AutoIndexJob{ID: 1}, true, nil)
jobTokenStore.CreateFunc.PushReturn("token1", nil)
@ -80,8 +77,8 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, jobTokenStore.CreateFunc.History(), 2)
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 2)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 2)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 2)
require.Len(t, batchesMockStore.ExistsFunc.History(), 2)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 1)
assert.Equal(t, "test-executor", codeintelMockStore.DequeueFunc.History()[0].Arg1)
assert.Nil(t, codeintelMockStore.DequeueFunc.History()[0].Arg2)
@ -112,10 +109,10 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
// On the second event, the queue will be empty and return an empty job
codeintelMockStore.QueuedCountFunc.PushReturn(1, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.DequeueFunc.PushReturn(uploadsshared.AutoIndexJob{ID: 1}, true, nil)
// Mock a non-empty queue that will never be reached because it's not requested in the dequeue body
batchesMockStore.QueuedCountFunc.PushReturn(1, nil)
batchesMockStore.ExistsFunc.PushReturn(true, nil)
batchesMockStore.DequeueFunc.PushReturn(&btypes.BatchSpecWorkspaceExecutionJob{ID: 2}, true, nil)
jobTokenStore.CreateFunc.PushReturn("token1", nil)
},
@ -129,7 +126,7 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
assert.Equal(t, 1, jobTokenStore.CreateFunc.History()[0].Arg1)
assert.Equal(t, "codeintel", jobTokenStore.CreateFunc.History()[0].Arg2)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 0)
require.Len(t, batchesMockStore.ExistsFunc.History(), 0)
require.Len(t, batchesMockStore.DequeueFunc.History(), 0)
},
dequeueEvents: []dequeueEvent{
@ -148,17 +145,17 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Dequeue only codeintel record when requesting both queues and batches record doesn't exists",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel", "batches"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
codeintelMockStore.QueuedCountFunc.PushReturn(1, nil)
codeintelMockStore.QueuedCountFunc.PushReturn(0, nil)
batchesMockStore.QueuedCountFunc.PushReturn(0, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.ExistsFunc.PushReturn(false, nil)
batchesMockStore.ExistsFunc.PushReturn(false, nil)
codeintelMockStore.DequeueFunc.PushReturn(uploadsshared.AutoIndexJob{ID: 1}, true, nil)
jobTokenStore.CreateFunc.PushReturn("token1", nil)
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, jobTokenStore.CreateFunc.History(), 1)
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 2)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 2)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 2)
require.Len(t, batchesMockStore.ExistsFunc.History(), 2)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 1)
assert.Equal(t, "test-executor", codeintelMockStore.DequeueFunc.History()[0].Arg1)
assert.Nil(t, codeintelMockStore.DequeueFunc.History()[0].Arg2)
@ -197,8 +194,8 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "No queue names provided",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": []}`,
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 0)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 0)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 0)
require.Len(t, batchesMockStore.ExistsFunc.History(), 0)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 0)
require.Len(t, batchesMockStore.DequeueFunc.History(), 0)
require.Len(t, jobTokenStore.CreateFunc.History(), 0)
@ -209,8 +206,8 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Invalid queue name",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["invalidqueue"]}`,
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 0)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 0)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 0)
require.Len(t, batchesMockStore.ExistsFunc.History(), 0)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 0)
require.Len(t, batchesMockStore.DequeueFunc.History(), 0)
require.Len(t, jobTokenStore.CreateFunc.History(), 0)
@ -226,8 +223,8 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Invalid version",
body: `{"executorName": "test-executor", "version":"\n1.2", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel","batches"]}`,
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 0)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 0)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 0)
require.Len(t, batchesMockStore.ExistsFunc.History(), 0)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 0)
require.Len(t, batchesMockStore.DequeueFunc.History(), 0)
require.Len(t, jobTokenStore.CreateFunc.History(), 0)
@ -243,12 +240,12 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Dequeue error codeintel",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
codeintelMockStore.QueuedCountFunc.PushReturn(1, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.DequeueFunc.PushReturn(uploadsshared.AutoIndexJob{}, false, errors.New("failed to dequeue"))
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 1)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 0)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 1)
require.Len(t, batchesMockStore.ExistsFunc.History(), 0)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 1)
require.Len(t, batchesMockStore.DequeueFunc.History(), 0)
require.Len(t, jobTokenStore.CreateFunc.History(), 0)
@ -265,12 +262,12 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Dequeue error batches",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["batches"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
batchesMockStore.QueuedCountFunc.PushReturn(1, nil)
batchesMockStore.ExistsFunc.PushReturn(true, nil)
batchesMockStore.DequeueFunc.PushReturn(&btypes.BatchSpecWorkspaceExecutionJob{}, false, errors.New("failed to dequeue"))
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 0)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 1)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 0)
require.Len(t, batchesMockStore.ExistsFunc.History(), 1)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 0)
require.Len(t, batchesMockStore.DequeueFunc.History(), 1)
require.Len(t, jobTokenStore.CreateFunc.History(), 0)
@ -287,13 +284,13 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Failed to transform record codeintel",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
codeintelMockStore.QueuedCountFunc.PushReturn(1, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.DequeueFunc.PushReturn(uploadsshared.AutoIndexJob{ID: 1}, true, nil)
codeintelMockStore.MarkFailedFunc.PushReturn(true, nil)
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 1)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 0)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 1)
require.Len(t, batchesMockStore.ExistsFunc.History(), 0)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 1)
require.Len(t, batchesMockStore.DequeueFunc.History(), 0)
require.Len(t, codeintelMockStore.MarkFailedFunc.History(), 1)
@ -317,13 +314,13 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Failed to transform record batches",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["batches"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
batchesMockStore.QueuedCountFunc.PushReturn(1, nil)
batchesMockStore.ExistsFunc.PushReturn(true, nil)
batchesMockStore.DequeueFunc.PushReturn(&btypes.BatchSpecWorkspaceExecutionJob{ID: 1}, true, nil)
batchesMockStore.MarkFailedFunc.PushReturn(true, nil)
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 0)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 1)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 0)
require.Len(t, batchesMockStore.ExistsFunc.History(), 1)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 0)
require.Len(t, batchesMockStore.DequeueFunc.History(), 1)
require.Len(t, batchesMockStore.MarkFailedFunc.History(), 1)
@ -347,13 +344,13 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Failed to mark record as failed codeintel",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
codeintelMockStore.QueuedCountFunc.PushReturn(1, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.DequeueFunc.PushReturn(uploadsshared.AutoIndexJob{ID: 1}, true, nil)
codeintelMockStore.MarkFailedFunc.PushReturn(true, errors.New("failed to mark"))
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 1)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 0)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 1)
require.Len(t, batchesMockStore.ExistsFunc.History(), 0)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 1)
require.Len(t, batchesMockStore.DequeueFunc.History(), 0)
require.Len(t, codeintelMockStore.MarkFailedFunc.History(), 1)
@ -377,13 +374,13 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Failed to mark record as failed batches",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["batches"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
batchesMockStore.QueuedCountFunc.PushReturn(1, nil)
batchesMockStore.ExistsFunc.PushReturn(true, nil)
batchesMockStore.DequeueFunc.PushReturn(&btypes.BatchSpecWorkspaceExecutionJob{ID: 1}, true, nil)
batchesMockStore.MarkFailedFunc.PushReturn(true, errors.New("failed to mark"))
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 0)
require.Len(t, batchesMockStore.QueuedCountFunc.History(), 1)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 0)
require.Len(t, batchesMockStore.ExistsFunc.History(), 1)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 0)
require.Len(t, batchesMockStore.DequeueFunc.History(), 1)
assert.Equal(t, 1, batchesMockStore.MarkFailedFunc.History()[0].Arg1)
@ -406,12 +403,12 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Failed to create job token",
body: `{"executorName": "test-executor", "numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel","batches"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
codeintelMockStore.QueuedCountFunc.PushReturn(1, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.DequeueFunc.PushReturn(uploadsshared.AutoIndexJob{ID: 1}, true, nil)
jobTokenStore.CreateFunc.PushReturn("", errors.New("failed to create token"))
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 1)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 1)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 1)
require.Len(t, jobTokenStore.CreateFunc.History(), 1)
require.Len(t, jobTokenStore.RegenerateFunc.History(), 0)
@ -428,13 +425,13 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Job token already exists",
body: `{"executorName": "test-executor","numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel","batches"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
codeintelMockStore.QueuedCountFunc.PushReturn(1, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.DequeueFunc.PushReturn(uploadsshared.AutoIndexJob{ID: 1}, true, nil)
jobTokenStore.CreateFunc.PushReturn("", executorstore.ErrJobTokenAlreadyCreated)
jobTokenStore.RegenerateFunc.PushReturn("somenewtoken", nil)
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 1)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 1)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 1)
require.Len(t, jobTokenStore.CreateFunc.History(), 1)
require.Len(t, jobTokenStore.RegenerateFunc.History(), 1)
@ -453,13 +450,13 @@ func TestMultiHandler_HandleDequeue(t *testing.T) {
name: "Failed to regenerate token",
body: `{"executorName": "test-executor","numCPUs": 1, "memory": "1GB", "diskSpace": "10GB","queues": ["codeintel","batches"]}`,
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
codeintelMockStore.QueuedCountFunc.PushReturn(1, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
codeintelMockStore.DequeueFunc.PushReturn(uploadsshared.AutoIndexJob{ID: 1}, true, nil)
jobTokenStore.CreateFunc.PushReturn("", executorstore.ErrJobTokenAlreadyCreated)
jobTokenStore.RegenerateFunc.PushReturn("", errors.New("failed to regen token"))
},
assertionFunc: func(t *testing.T, codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob], jobTokenStore *executorstore.MockJobTokenStore) {
require.Len(t, codeintelMockStore.QueuedCountFunc.History(), 1)
require.Len(t, codeintelMockStore.ExistsFunc.History(), 1)
require.Len(t, codeintelMockStore.DequeueFunc.History(), 1)
require.Len(t, jobTokenStore.CreateFunc.History(), 1)
require.Len(t, jobTokenStore.RegenerateFunc.History(), 1)
@ -1069,8 +1066,8 @@ func TestMultiHandler_SelectNonEmptyQueues(t *testing.T) {
name: "Both contain jobs",
queueNames: []string{"batches", "codeintel"},
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob]) {
codeintelMockStore.QueuedCountFunc.PushReturn(5, nil)
batchesMockStore.QueuedCountFunc.PushReturn(5, nil)
codeintelMockStore.ExistsFunc.PushReturn(true, nil)
batchesMockStore.ExistsFunc.PushReturn(true, nil)
},
expectedQueues: []string{"batches", "codeintel"},
},
@ -1078,8 +1075,8 @@ func TestMultiHandler_SelectNonEmptyQueues(t *testing.T) {
name: "Only batches contains jobs",
queueNames: []string{"batches", "codeintel"},
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob]) {
codeintelMockStore.QueuedCountFunc.PushReturn(0, nil)
batchesMockStore.QueuedCountFunc.PushReturn(5, nil)
codeintelMockStore.ExistsFunc.PushReturn(false, nil)
batchesMockStore.ExistsFunc.PushReturn(true, nil)
},
expectedQueues: []string{"batches"},
},
@ -1087,8 +1084,8 @@ func TestMultiHandler_SelectNonEmptyQueues(t *testing.T) {
name: "None contain jobs",
queueNames: []string{"batches", "codeintel"},
mockFunc: func(codeintelMockStore *dbworkerstoremocks.MockStore[uploadsshared.AutoIndexJob], batchesMockStore *dbworkerstoremocks.MockStore[*btypes.BatchSpecWorkspaceExecutionJob]) {
codeintelMockStore.QueuedCountFunc.PushReturn(0, nil)
batchesMockStore.QueuedCountFunc.PushReturn(0, nil)
codeintelMockStore.ExistsFunc.PushReturn(false, nil)
batchesMockStore.ExistsFunc.PushReturn(false, nil)
},
expectedQueues: nil,
},

View File

@ -29,6 +29,9 @@ type MockStore[T workerutil.Record] struct {
// DequeueFunc is an instance of a mock function object controlling the
// behavior of the method Dequeue.
DequeueFunc *StoreDequeueFunc[T]
// ExistsFunc is an instance of a mock function object controlling the
// behavior of the method Exists.
ExistsFunc *StoreExistsFunc[T]
// HandleFunc is an instance of a mock function object controlling the
// behavior of the method Handle.
HandleFunc *StoreHandleFunc[T]
@ -78,6 +81,11 @@ func NewMockStore[T workerutil.Record]() *MockStore[T] {
return
},
},
ExistsFunc: &StoreExistsFunc[T]{
defaultHook: func(context.Context, store.RecordState) (r0 bool, r1 error) {
return
},
},
HandleFunc: &StoreHandleFunc[T]{
defaultHook: func() (r0 basestore.TransactableHandle) {
return
@ -150,6 +158,11 @@ func NewStrictMockStore[T workerutil.Record]() *MockStore[T] {
panic("unexpected invocation of MockStore.Dequeue")
},
},
ExistsFunc: &StoreExistsFunc[T]{
defaultHook: func(context.Context, store.RecordState) (bool, error) {
panic("unexpected invocation of MockStore.Exists")
},
},
HandleFunc: &StoreHandleFunc[T]{
defaultHook: func() basestore.TransactableHandle {
panic("unexpected invocation of MockStore.Handle")
@ -218,6 +231,9 @@ func NewMockStoreFrom[T workerutil.Record](i store.Store[T]) *MockStore[T] {
DequeueFunc: &StoreDequeueFunc[T]{
defaultHook: i.Dequeue,
},
ExistsFunc: &StoreExistsFunc[T]{
defaultHook: i.Exists,
},
HandleFunc: &StoreHandleFunc[T]{
defaultHook: i.Handle,
},
@ -481,6 +497,113 @@ func (c StoreDequeueFuncCall[T]) Results() []interface{} {
return []interface{}{c.Result0, c.Result1, c.Result2}
}
// StoreExistsFunc describes the behavior when the Exists method of the
// parent MockStore instance is invoked.
type StoreExistsFunc[T workerutil.Record] struct {
defaultHook func(context.Context, store.RecordState) (bool, error)
hooks []func(context.Context, store.RecordState) (bool, error)
history []StoreExistsFuncCall[T]
mutex sync.Mutex
}
// Exists delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockStore[T]) Exists(v0 context.Context, v1 store.RecordState) (bool, error) {
r0, r1 := m.ExistsFunc.nextHook()(v0, v1)
m.ExistsFunc.appendCall(StoreExistsFuncCall[T]{v0, v1, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the Exists method of the
// parent MockStore instance is invoked and the hook queue is empty.
func (f *StoreExistsFunc[T]) SetDefaultHook(hook func(context.Context, store.RecordState) (bool, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Exists method of the parent MockStore instance invokes the hook at the
// front of the queue and discards it. After the queue is empty, the default
// hook function is invoked for any future action.
func (f *StoreExistsFunc[T]) PushHook(hook func(context.Context, store.RecordState) (bool, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *StoreExistsFunc[T]) SetDefaultReturn(r0 bool, r1 error) {
f.SetDefaultHook(func(context.Context, store.RecordState) (bool, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *StoreExistsFunc[T]) PushReturn(r0 bool, r1 error) {
f.PushHook(func(context.Context, store.RecordState) (bool, error) {
return r0, r1
})
}
func (f *StoreExistsFunc[T]) nextHook() func(context.Context, store.RecordState) (bool, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *StoreExistsFunc[T]) appendCall(r0 StoreExistsFuncCall[T]) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of StoreExistsFuncCall objects describing the
// invocations of this function.
func (f *StoreExistsFunc[T]) History() []StoreExistsFuncCall[T] {
f.mutex.Lock()
history := make([]StoreExistsFuncCall[T], len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// StoreExistsFuncCall is an object that describes an invocation of method
// Exists on an instance of MockStore.
type StoreExistsFuncCall[T workerutil.Record] struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 store.RecordState
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 bool
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c StoreExistsFuncCall[T]) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c StoreExistsFuncCall[T]) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// StoreHandleFunc describes the behavior when the Handle method of the
// parent MockStore instance is invoked.
type StoreHandleFunc[T workerutil.Record] struct {

View File

@ -17,6 +17,7 @@ type operations struct {
markFailed *observation.Operation
maxDurationInQueue *observation.Operation
queuedCount *observation.Operation
exists *observation.Operation
requeue *observation.Operation
resetStalled *observation.Operation
updateExecutionLogEntry *observation.Operation
@ -66,6 +67,7 @@ func newOperations(observationCtx *observation.Context, storeName string) *opera
markFailed: op("MarkFailed"),
maxDurationInQueue: op("MaxDurationInQueue"),
queuedCount: op("QueuedCount"),
exists: op("Exists"),
requeue: op("Requeue"),
resetStalled: op("ResetStalled"),
updateExecutionLogEntry: op("UpdateExecutionLogEntry"),

View File

@ -83,7 +83,17 @@ type Store[T workerutil.Record] interface {
// QueuedCount returns the number of queued and errored records. If includeProcessing
// is true it returns the number of queued _and_ processing records.
//
// If possible, prefer using Exists over this function.
QueuedCount(ctx context.Context, includeProcessing bool) (int, error)
// TODO: Change above API to also use a bitset like below for greater
// clarity at call-sites.
// Exists checks if there is at least one record in one of the given states
// in stateBitset.
//
// Pre-condition: stateBitset must be one or more RecordState values or-ed together.
Exists(ctx context.Context, stateBitset RecordState) (bool, error)
// MaxDurationInQueue returns the maximum age of queued records in this store. Returns 0 if there are no queued records.
MaxDurationInQueue(ctx context.Context) (time.Duration, error)
@ -135,6 +145,28 @@ type Store[T workerutil.Record] interface {
ResetStalled(ctx context.Context) (resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs map[int]time.Duration, err error)
}
type RecordState uint
const (
StateQueued RecordState = 1 << 0
StateErrored RecordState = 1 << 1
StateProcessing RecordState = 1 << 2
)
func (bitset RecordState) toList() []*sqlf.Query {
fragments := []*sqlf.Query{}
if (bitset & StateQueued) != 0 {
fragments = append(fragments, sqlf.Sprintf("%s", "queued"))
}
if (bitset & StateErrored) != 0 {
fragments = append(fragments, sqlf.Sprintf("%s", "errored"))
}
if (bitset & StateProcessing) != 0 {
fragments = append(fragments, sqlf.Sprintf("%s", "processing"))
}
return fragments
}
type store[T workerutil.Record] struct {
*basestore.Store
options Options[T]
@ -371,6 +403,32 @@ WHERE
{state} IN (%s)
`
func (s *store[T]) Exists(ctx context.Context, statesBitset RecordState) (_ bool, err error) {
ctx, _, endObservation := s.operations.exists.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})
fragments := statesBitset.toList()
if len(fragments) == 0 {
return false, errors.New("pre-condition failure: statesBitset should contain at least one state")
}
exists, _, err := basestore.ScanFirstBool(s.Query(ctx, s.formatQuery(
existsQuery,
quote(s.options.ViewName),
sqlf.Join(fragments, ","),
)))
return exists, err
}
const existsQuery = `
SELECT EXISTS (
SELECT *
FROM %s
WHERE {state} IN (%s)
)
`
// MaxDurationInQueue returns the longest duration for which a job associated with this store instance has
// been in the queued state (including errored records that can be retried in the future). This method returns
// a duration of zero if there are no jobs ready for processing.

View File

@ -65,6 +65,41 @@ func TestStoreQueuedCountIncludeProcessing(t *testing.T) {
}
}
func TestStoreExists(t *testing.T) {
db := setupStoreTest(t)
if _, err := db.ExecContext(context.Background(), `
INSERT INTO workerutil_test (id, state, created_at)
VALUES
(1, 'queued', NOW() - '1 minute'::interval),
(2, 'queued', NOW() - '2 minute'::interval),
(3, 'state2', NOW() - '3 minute'::interval),
(4, 'queued', NOW() - '4 minute'::interval),
(5, 'processing', NOW() - '5 minute'::interval)
`); err != nil {
t.Fatalf("unexpected error inserting records: %s", err)
}
myStore := testStore(db, defaultTestStoreOptions(nil, testScanRecord))
ctx := context.Background()
hasQueued, err := myStore.Exists(ctx, StateQueued)
require.NoError(t, err)
require.True(t, hasQueued)
hasErrored, err := myStore.Exists(ctx, StateErrored)
require.NoError(t, err)
require.False(t, hasErrored)
hasQueuedOrErrored, err := myStore.Exists(ctx, StateQueued|StateErrored)
require.NoError(t, err)
require.True(t, hasQueuedOrErrored)
_, err = myStore.Exists(ctx, 0)
require.Error(t, err)
require.Contains(t, err.Error(), "should contain at least one state")
}
func TestStoreQueuedCountFailed(t *testing.T) {
db := setupStoreTest(t)