mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 16:51:55 +00:00
This PR is a result/followup of the improvements we've made in the [SAMS repo](https://github.com/sourcegraph/sourcegraph-accounts/pull/199) that allows call sites to pass down a context (primarily to indicate deadline, and of course, cancellation if desired) and collects the error returned from `background.Routine`s `Stop` method. Note that I did not adopt returning error from `Stop` method because I realize in monorepo, the more common (and arguably the desired) pattern is to hang on the call of `Start` method until `Stop` is called, so it is meaningless to collect errors from `Start` methods as return values anyway, and doing that would also complicate the design and semantics more than necessary. All usages of the the `background.Routine` and `background.CombinedRoutines` are updated, I DID NOT try to interpret the code logic and make anything better other than fixing compile and test errors. The only file that contains the core change is the [`lib/background/background.go`](https://github.com/sourcegraph/sourcegraph/pull/62136/files#diff-65c3228388620e91f8c22d91c18faac3f985fc67d64b08612df18fa7c04fafcd).
465 lines
11 KiB
Go
465 lines
11 KiB
Go
package goroutine
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/derision-test/glock"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/sourcegraph/sourcegraph/lib/errors"
|
|
)
|
|
|
|
func withClock(clock glock.Clock) Option {
|
|
return func(p *PeriodicGoroutine) { p.clock = clock }
|
|
}
|
|
|
|
func withConcurrencyClock(clock glock.Clock) Option {
|
|
return func(p *PeriodicGoroutine) { p.concurrencyClock = clock }
|
|
}
|
|
|
|
func TestPeriodicGoroutine(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
handler := NewMockHandler()
|
|
called := make(chan struct{}, 1)
|
|
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) error {
|
|
called <- struct{}{}
|
|
return nil
|
|
})
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithInterval(time.Second),
|
|
withClock(clock),
|
|
)
|
|
go goroutine.Start()
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
if calls := len(handler.HandleFunc.History()); calls != 4 {
|
|
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)
|
|
}
|
|
}
|
|
|
|
func TestPeriodicGoroutineReinvoke(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
handler := NewMockHandler()
|
|
called := make(chan struct{}, 1)
|
|
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) error {
|
|
called <- struct{}{}
|
|
return ErrReinvokeImmediately
|
|
})
|
|
|
|
witnessHandler := func() {
|
|
for range maxConsecutiveReinvocations {
|
|
<-called
|
|
}
|
|
}
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithInterval(time.Second),
|
|
withClock(clock),
|
|
)
|
|
go goroutine.Start()
|
|
witnessHandler()
|
|
clock.BlockingAdvance(time.Second)
|
|
witnessHandler()
|
|
clock.BlockingAdvance(time.Second)
|
|
witnessHandler()
|
|
clock.BlockingAdvance(time.Second)
|
|
witnessHandler()
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
if calls := len(handler.HandleFunc.History()); calls != 4*maxConsecutiveReinvocations {
|
|
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4*maxConsecutiveReinvocations, calls)
|
|
}
|
|
}
|
|
|
|
func TestPeriodicGoroutineWithDynamicInterval(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
handler := NewMockHandler()
|
|
called := make(chan struct{}, 1)
|
|
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) error {
|
|
called <- struct{}{}
|
|
return nil
|
|
})
|
|
|
|
seconds := 1
|
|
|
|
// intervals: 1 -> 2 -> 3 ...
|
|
getInterval := func() time.Duration {
|
|
duration := time.Duration(seconds) * time.Second
|
|
seconds += 1
|
|
return duration
|
|
}
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithIntervalFunc(getInterval),
|
|
withClock(clock),
|
|
)
|
|
go goroutine.Start()
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(2 * time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(3 * time.Second)
|
|
<-called
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
if calls := len(handler.HandleFunc.History()); calls != 4 {
|
|
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)
|
|
}
|
|
}
|
|
|
|
func TestPeriodicGoroutineWithInitialDelay(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
handler := NewMockHandler()
|
|
called := make(chan struct{}, 1)
|
|
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) error {
|
|
called <- struct{}{}
|
|
return nil
|
|
})
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithInterval(time.Second),
|
|
WithInitialDelay(2*time.Second),
|
|
withClock(clock),
|
|
)
|
|
go goroutine.Start()
|
|
clock.BlockingAdvance(time.Second)
|
|
select {
|
|
case <-called:
|
|
t.Error("unexpected handler invocation")
|
|
default:
|
|
}
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
if calls := len(handler.HandleFunc.History()); calls != 3 {
|
|
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 3, calls)
|
|
}
|
|
}
|
|
|
|
func TestPeriodicGoroutineConcurrency(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
handler := NewMockHandler()
|
|
called := make(chan struct{})
|
|
concurrency := 4
|
|
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) error {
|
|
called <- struct{}{}
|
|
return nil
|
|
})
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithConcurrency(concurrency),
|
|
withClock(clock),
|
|
)
|
|
go goroutine.Start()
|
|
|
|
for range concurrency {
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
}
|
|
|
|
for range concurrency {
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
}
|
|
|
|
for range concurrency {
|
|
<-called
|
|
}
|
|
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
if calls := len(handler.HandleFunc.History()); calls != 3*concurrency {
|
|
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 3*concurrency, calls)
|
|
}
|
|
}
|
|
|
|
func TestPeriodicGoroutineWithDynamicConcurrency(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
concurrencyClock := glock.NewMockClock()
|
|
handler := NewMockHandler()
|
|
called := make(chan struct{})
|
|
exit := make(chan struct{})
|
|
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) error {
|
|
select {
|
|
case called <- struct{}{}:
|
|
return nil
|
|
|
|
case <-ctx.Done():
|
|
select {
|
|
case exit <- struct{}{}:
|
|
default:
|
|
}
|
|
|
|
return ctx.Err()
|
|
}
|
|
})
|
|
|
|
concurrency := 0
|
|
|
|
// concurrency: 1 -> 2 -> 3 ...
|
|
getConcurrency := func() int {
|
|
concurrency += 1
|
|
return concurrency
|
|
}
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithConcurrencyFunc(getConcurrency),
|
|
withClock(clock),
|
|
withConcurrencyClock(concurrencyClock),
|
|
)
|
|
go goroutine.Start()
|
|
|
|
for poolSize := 1; poolSize < 3; poolSize++ {
|
|
// Ensure each of the handlers can be called independently.
|
|
// Adding an additional channel read would block as each of
|
|
// the monitor routines would be waiting on the clock tick.
|
|
for range poolSize {
|
|
<-called
|
|
}
|
|
|
|
// Resize the pool
|
|
clock.BlockingAdvance(time.Second) // invoke but block one handler
|
|
concurrencyClock.BlockingAdvance(concurrencyRecheckInterval) // trigger drain of the old pool
|
|
<-exit // wait for blocked handler to exit
|
|
}
|
|
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
// N.B.: no need for assertions here as getting through the test at all to this
|
|
// point without some permanent blockage shows that each of the pool sizes behave
|
|
// as expected.
|
|
}
|
|
|
|
func TestPeriodicGoroutineError(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
handler := NewMockHandlerWithErrorHandler()
|
|
|
|
calls := 0
|
|
called := make(chan struct{}, 1)
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) (err error) {
|
|
if calls == 0 {
|
|
err = errors.New("oops")
|
|
}
|
|
|
|
calls++
|
|
called <- struct{}{}
|
|
return err
|
|
})
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithInterval(time.Second),
|
|
withClock(clock),
|
|
)
|
|
go goroutine.Start()
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
if calls := len(handler.HandleFunc.History()); calls != 4 {
|
|
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)
|
|
}
|
|
|
|
if calls := len(handler.HandleErrorFunc.History()); calls != 1 {
|
|
t.Errorf("unexpected number of error handler invocations. want=%d have=%d", 1, calls)
|
|
}
|
|
}
|
|
|
|
func TestPeriodicGoroutinePanic(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
handler := NewMockHandlerWithErrorHandler()
|
|
|
|
calls := 0
|
|
called := make(chan struct{}, 1)
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) error {
|
|
calls++
|
|
defer func() {
|
|
called <- struct{}{}
|
|
}()
|
|
|
|
if calls == 1 {
|
|
panic("oops")
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithInterval(time.Second),
|
|
withClock(clock),
|
|
)
|
|
go goroutine.Start()
|
|
|
|
clock.BlockingAdvance(time.Second)
|
|
select {
|
|
case <-called:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("first call didn't happen within 1s")
|
|
}
|
|
// Run a second time to make sure it actually is invoked again after the
|
|
// panic. Periodic goroutines turn panics into errors (analogous to
|
|
// goroutine.Go which silences panics), and we expect to keep running a periodic
|
|
// routine after a panic.
|
|
clock.BlockingAdvance(time.Second)
|
|
select {
|
|
case <-called:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("second call didn't happen within 1s")
|
|
}
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
if calls := len(handler.HandleFunc.History()); calls != 2 {
|
|
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)
|
|
}
|
|
if calls := len(handler.HandleErrorFunc.History()); calls != 1 {
|
|
t.Errorf("unexpected number of error handler invocations. want=%d have=%d", 4, calls)
|
|
}
|
|
}
|
|
|
|
func TestPeriodicGoroutineContextError(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
handler := NewMockHandlerWithErrorHandler()
|
|
|
|
called := make(chan struct{}, 1)
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) error {
|
|
called <- struct{}{}
|
|
<-ctx.Done()
|
|
return ctx.Err()
|
|
})
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithInterval(time.Second),
|
|
withClock(clock),
|
|
)
|
|
go goroutine.Start()
|
|
<-called
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
if calls := len(handler.HandleFunc.History()); calls != 1 {
|
|
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 1, calls)
|
|
}
|
|
|
|
if calls := len(handler.HandleErrorFunc.History()); calls != 0 {
|
|
t.Errorf("unexpected number of error handler invocations. want=%d have=%d", 0, calls)
|
|
}
|
|
}
|
|
|
|
func TestPeriodicGoroutineFinalizer(t *testing.T) {
|
|
clock := glock.NewMockClock()
|
|
handler := NewMockHandlerWithFinalizer()
|
|
|
|
called := make(chan struct{}, 1)
|
|
handler.HandleFunc.SetDefaultHook(func(ctx context.Context) error {
|
|
called <- struct{}{}
|
|
return nil
|
|
})
|
|
|
|
goroutine := NewPeriodicGoroutine(
|
|
context.Background(),
|
|
handler,
|
|
WithName(t.Name()),
|
|
WithInterval(time.Second),
|
|
withClock(clock),
|
|
)
|
|
go goroutine.Start()
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
clock.BlockingAdvance(time.Second)
|
|
<-called
|
|
err := goroutine.Stop(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
if calls := len(handler.HandleFunc.History()); calls != 4 {
|
|
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)
|
|
}
|
|
|
|
if calls := len(handler.OnShutdownFunc.History()); calls != 1 {
|
|
t.Errorf("unexpected number of finalizer invocations. want=%d have=%d", 1, calls)
|
|
}
|
|
}
|
|
|
|
type MockHandlerWithErrorHandler struct {
|
|
*MockHandler
|
|
*MockErrorHandler
|
|
}
|
|
|
|
func NewMockHandlerWithErrorHandler() *MockHandlerWithErrorHandler {
|
|
return &MockHandlerWithErrorHandler{
|
|
MockHandler: NewMockHandler(),
|
|
MockErrorHandler: NewMockErrorHandler(),
|
|
}
|
|
}
|
|
|
|
type MockHandlerWithFinalizer struct {
|
|
*MockHandler
|
|
*MockFinalizer
|
|
}
|
|
|
|
func NewMockHandlerWithFinalizer() *MockHandlerWithFinalizer {
|
|
return &MockHandlerWithFinalizer{
|
|
MockHandler: NewMockHandler(),
|
|
MockFinalizer: NewMockFinalizer(),
|
|
}
|
|
}
|