worker: Fix test after go 1.22 upgrade (#61048)

Seems like this test was always wrong, thanks @camdencheek!
The numWorkers variable hasn't been captured before so it always ended up being == NumTestRecords.

This revealed that the tracking of job start/end times at the millisecond precision we were doing here is just not reliable enough to not flake a lot.
So I switched the approach here to tracking the actual concurrency seen.

## Test plan

Ran the test with --count=100 locally, no more flakes.
This commit is contained in:
Erik Seliger 2024-03-13 15:32:34 +01:00 committed by GitHub
parent 61f59e06e9
commit 677824aa09
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -151,13 +151,11 @@ func TestWorkerHandlerNonRetryableFailure(t *testing.T) {
}
}
const NumTestRecords = 50
func TestWorkerConcurrent(t *testing.T) {
NumTestRecords := 50
for numHandlers := 1; numHandlers < NumTestRecords; numHandlers++ {
name := fmt.Sprintf("numHandlers=%d", numHandlers)
t.Run(name, func(t *testing.T) {
runTest := func(numHandlers int) {
t.Run(fmt.Sprintf("numHandlers=%d", numHandlers), func(t *testing.T) {
t.Parallel()
store := NewMockStore[*TestRecord]()
@ -180,17 +178,23 @@ func TestWorkerConcurrent(t *testing.T) {
store.DequeueFunc.SetDefaultReturn(nil, false, nil)
var m sync.Mutex
times := map[int][2]time.Time{}
markTime := func(recordID, index int) {
maxConcurrency := 0
currentRunning := 0
collect := func(start bool) {
m.Lock()
pair := times[recordID]
pair[index] = time.Now()
times[recordID] = pair
if start {
currentRunning++
} else {
currentRunning--
}
if currentRunning > maxConcurrency {
maxConcurrency = currentRunning
}
m.Unlock()
}
handler.PreHandleFunc.SetDefaultHook(func(ctx context.Context, _ log.Logger, record *TestRecord) { markTime(record.RecordID(), 0) })
handler.PostHandleFunc.SetDefaultHook(func(ctx context.Context, _ log.Logger, record *TestRecord) { markTime(record.RecordID(), 1) })
handler.PreHandleFunc.SetDefaultHook(func(ctx context.Context, _ log.Logger, record *TestRecord) { collect(true) })
handler.PostHandleFunc.SetDefaultHook(func(ctx context.Context, _ log.Logger, record *TestRecord) { collect(false) })
handler.HandleFunc.SetDefaultHook(func(context.Context, log.Logger, *TestRecord) error {
// Do a _very_ small sleep to make it very unlikely that the scheduler
// will happen to invoke all of the handlers sequentially.
@ -205,47 +209,19 @@ func TestWorkerConcurrent(t *testing.T) {
}
worker.Stop()
intersecting := 0
for i := range NumTestRecords {
for j := i + 1; j < NumTestRecords; j++ {
if !times[i][1].Before(times[j][0]) {
if j-i > 2*numHandlers-1 {
// The greatest distance between two "batches" that can overlap is
// just under 2x the number of concurrent handler routines. For example
// if n=3:
//
// t1: dequeue A (1 active) *
// t2: dequeue B (2 active)
// t3: dequeue C (3 active)
// t4: process C (2 active)
// t5: dequeue D (3 active)
// t6: process B (2 active)
// t7: dequeue E (3 active) *
// t8: process A (2 active) *
//
// Here, A finishes after E is dequeued, which has a distance of 5 (2*3-1).
t.Errorf(
"times %[1]d (%[3]s-%[4]s) and %[2]d (%[5]s-%[6]s) failed validation",
i,
j,
times[i][0],
times[i][1],
times[j][0],
times[j][1],
)
}
intersecting++
}
}
}
if numHandlers > 1 && intersecting == 0 {
t.Errorf("no handler routines were concurrent")
// We never want to see more concurrently running jobs than configured.
// Ideally, we would also want to see that the number is exactly equal,
// but due to the nature of the goroutine scheduler, we can't guarantee that
// and it causes flakes in our tests.
if maxConcurrency > numHandlers {
t.Errorf("unexpected max concurrency. want=%d have=%d", numHandlers, maxConcurrency)
}
})
}
for numHandlers := 1; numHandlers < NumTestRecords; numHandlers++ {
runTest(numHandlers)
}
}
func TestWorkerBlockingPreDequeueHook(t *testing.T) {