diff --git a/internal/workerutil/worker_test.go b/internal/workerutil/worker_test.go index b4f9a21f2ff..27a7a723a1f 100644 --- a/internal/workerutil/worker_test.go +++ b/internal/workerutil/worker_test.go @@ -151,13 +151,11 @@ func TestWorkerHandlerNonRetryableFailure(t *testing.T) { } } +const NumTestRecords = 50 + func TestWorkerConcurrent(t *testing.T) { - NumTestRecords := 50 - - for numHandlers := 1; numHandlers < NumTestRecords; numHandlers++ { - name := fmt.Sprintf("numHandlers=%d", numHandlers) - - t.Run(name, func(t *testing.T) { + runTest := func(numHandlers int) { + t.Run(fmt.Sprintf("numHandlers=%d", numHandlers), func(t *testing.T) { t.Parallel() store := NewMockStore[*TestRecord]() @@ -180,17 +178,23 @@ func TestWorkerConcurrent(t *testing.T) { store.DequeueFunc.SetDefaultReturn(nil, false, nil) var m sync.Mutex - times := map[int][2]time.Time{} - markTime := func(recordID, index int) { + maxConcurrency := 0 + currentRunning := 0 + collect := func(start bool) { m.Lock() - pair := times[recordID] - pair[index] = time.Now() - times[recordID] = pair + if start { + currentRunning++ + } else { + currentRunning-- + } + if currentRunning > maxConcurrency { + maxConcurrency = currentRunning + } m.Unlock() } - handler.PreHandleFunc.SetDefaultHook(func(ctx context.Context, _ log.Logger, record *TestRecord) { markTime(record.RecordID(), 0) }) - handler.PostHandleFunc.SetDefaultHook(func(ctx context.Context, _ log.Logger, record *TestRecord) { markTime(record.RecordID(), 1) }) + handler.PreHandleFunc.SetDefaultHook(func(ctx context.Context, _ log.Logger, record *TestRecord) { collect(true) }) + handler.PostHandleFunc.SetDefaultHook(func(ctx context.Context, _ log.Logger, record *TestRecord) { collect(false) }) handler.HandleFunc.SetDefaultHook(func(context.Context, log.Logger, *TestRecord) error { // Do a _very_ small sleep to make it very unlikely that the scheduler // will happen to invoke all of the handlers sequentially. @@ -205,47 +209,19 @@ func TestWorkerConcurrent(t *testing.T) { } worker.Stop() - intersecting := 0 - for i := range NumTestRecords { - for j := i + 1; j < NumTestRecords; j++ { - if !times[i][1].Before(times[j][0]) { - if j-i > 2*numHandlers-1 { - // The greatest distance between two "batches" that can overlap is - // just under 2x the number of concurrent handler routines. For example - // if n=3: - // - // t1: dequeue A (1 active) * - // t2: dequeue B (2 active) - // t3: dequeue C (3 active) - // t4: process C (2 active) - // t5: dequeue D (3 active) - // t6: process B (2 active) - // t7: dequeue E (3 active) * - // t8: process A (2 active) * - // - // Here, A finishes after E is dequeued, which has a distance of 5 (2*3-1). - - t.Errorf( - "times %[1]d (%[3]s-%[4]s) and %[2]d (%[5]s-%[6]s) failed validation", - i, - j, - times[i][0], - times[i][1], - times[j][0], - times[j][1], - ) - } - - intersecting++ - } - } - } - - if numHandlers > 1 && intersecting == 0 { - t.Errorf("no handler routines were concurrent") + // We never want to see more concurrently running jobs than configured. + // Ideally, we would also want to see that the number is exactly equal, + // but due to the nature of the goroutine scheduler, we can't guarantee that + // and it causes flakes in our tests. + if maxConcurrency > numHandlers { + t.Errorf("unexpected max concurrency. want=%d have=%d", numHandlers, maxConcurrency) } }) } + + for numHandlers := 1; numHandlers < NumTestRecords; numHandlers++ { + runTest(numHandlers) + } } func TestWorkerBlockingPreDequeueHook(t *testing.T) {