lib/background: upgrade Routine interface with context and errors (#62136)

This PR is a result/followup of the improvements we've made in the [SAMS repo](https://github.com/sourcegraph/sourcegraph-accounts/pull/199) that allows call sites to pass down a context (primarily to indicate deadline, and of course, cancellation if desired) and collects the error returned from `background.Routine`s `Stop` method.

Note that I did not adopt returning error from `Stop` method because I realize in monorepo, the more common (and arguably the desired) pattern is to hang on the call of `Start` method until `Stop` is called, so it is meaningless to collect errors from `Start` methods as return values anyway, and doing that would also complicate the design and semantics more than necessary.

All usages of the the `background.Routine` and `background.CombinedRoutines` are updated, I DID NOT try to interpret the code logic and make anything better other than fixing compile and test errors.

The only file that contains the core change is the [`lib/background/background.go`](https://github.com/sourcegraph/sourcegraph/pull/62136/files#diff-65c3228388620e91f8c22d91c18faac3f985fc67d64b08612df18fa7c04fafcd).
This commit is contained in:
Joe Chen 2024-05-24 10:04:55 -04:00 committed by GitHub
parent 53c3d3cfd1
commit 2589fef13e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
63 changed files with 838 additions and 233 deletions

View File

@ -201,6 +201,10 @@ type redisLockedBackgroundRoutine struct {
routine goroutine.BackgroundRoutine
}
func (s *redisLockedBackgroundRoutine) Name() string {
return s.routine.Name()
}
func (s *redisLockedBackgroundRoutine) Start() {
s.logger.Info("Starting background sync routine")
@ -220,10 +224,10 @@ func (s *redisLockedBackgroundRoutine) Start() {
s.routine.Start()
}
func (s *redisLockedBackgroundRoutine) Stop() {
func (s *redisLockedBackgroundRoutine) Stop(ctx context.Context) error {
start := time.Now()
s.logger.Info("Stopping background sync routine")
s.routine.Stop()
stopErr := s.routine.Stop(ctx)
// If we have the lock, release it and let somebody else work
if expire := s.rmux.Until(); !expire.IsZero() && expire.After(time.Now()) {
@ -247,6 +251,7 @@ func (s *redisLockedBackgroundRoutine) Stop() {
s.logger.Info("Background sync successfully stopped",
log.Duration("elapsed", time.Since(start)))
return stopErr
}
// sourcesSyncHandler is a handler for NewPeriodicGoroutine

View File

@ -74,7 +74,8 @@ func TestSourcesWorkers(t *testing.T) {
w := NewSources(s1).Worker(observation.NewContext(logger), sourceWorkerMutex1, time.Millisecond)
go func() {
<-stop1
w.Stop()
err := w.Stop(context.Background())
require.NoError(t, err)
}()
w.Start()
})
@ -89,7 +90,8 @@ func TestSourcesWorkers(t *testing.T) {
w := NewSources(s2).Worker(observation.NewContext(logger), sourceWorkerMutex, time.Millisecond)
go func() {
<-stop2
w.Stop()
err := w.Stop(context.Background())
require.NoError(t, err)
}()
w.Start()
})

View File

@ -132,7 +132,11 @@ func (l *BufferedLogger) LogEvent(spanCtx context.Context, event Event) error {
}
}
// Start begins working by procssing the logger's buffer, blocking until stop
func (l *BufferedLogger) Name() string {
return "BufferedLogger"
}
// Start begins working by processing the logger's buffer, blocking until stop
// is called and the backlog is cleared.
func (l *BufferedLogger) Start() {
var wg sync.WaitGroup
@ -157,7 +161,7 @@ func (l *BufferedLogger) Start() {
}
// Stop stops buffered logger's background processing job and flushes its buffer.
func (l *BufferedLogger) Stop() {
func (l *BufferedLogger) Stop(context.Context) error {
l.bufferClosed.Store(true)
close(l.bufferC)
l.log.Info("buffer closed - waiting for events to flush")
@ -175,4 +179,5 @@ func (l *BufferedLogger) Stop() {
l.log.Error("failed to shut down within shutdown deadline",
log.Error(errors.Newf("unflushed events: %d", len(l.bufferC)))) // real error for Sentry
}
return nil
}

View File

@ -43,7 +43,8 @@ func TestBufferedLogger(t *testing.T) {
// Stop the worker and wait for it to finish so that events flush before
// making any assertions
b.Stop()
err := b.Stop(ctx)
require.NoError(t, err)
wg.Wait()
autogold.Expect([]string{"bar", "baz", "foo"}).Equal(t, asSortedIdentifiers(handler.ReceivedEvents))
@ -100,7 +101,8 @@ func TestBufferedLogger(t *testing.T) {
// Indicate close and stop the worker so that the buffer can flush
close(blockEventSubmissionC)
b.Stop()
err = b.Stop(ctx)
require.NoError(t, err)
wg.Wait()
// All backlogged events get submitted. Note the "buffer-full" event is
@ -133,7 +135,8 @@ func TestBufferedLogger(t *testing.T) {
assert.NoError(t, b.LogEvent(ctx, events.Event{Identifier: "foo"}))
// Stop the worker and wait for it to finish
b.Stop()
err := b.Stop(ctx)
require.NoError(t, err)
wg.Wait()
// Submit an additional event - this should immediately attempt to

View File

@ -235,9 +235,7 @@ func Main(ctx context.Context, obctx *observation.Context, ready service.ReadyFu
backgroundRoutines = append(backgroundRoutines, w)
}
// Block until done
goroutine.MonitorBackgroundRoutines(ctx, backgroundRoutines...)
return nil
return goroutine.MonitorBackgroundRoutines(ctx, backgroundRoutines...)
}
func newRedisStore(store redispool.KeyValue) limiter.RedisStore {

View File

@ -91,9 +91,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
// Mark health server as ready and go!
ready()
goroutine.MonitorBackgroundRoutines(ctx, server)
return nil
return goroutine.MonitorBackgroundRoutines(ctx, server)
}
func NewHandler(

View File

@ -127,8 +127,7 @@ func StandaloneRun(ctx context.Context, runner util.CmdRunner, logger log.Logger
cancel()
}()
goroutine.MonitorBackgroundRoutines(ctx, routines...)
return nil
return goroutine.MonitorBackgroundRoutines(ctx, routines...)
}
func mustRegisterVMCountMetric(observationCtx *observation.Context, runner util.CmdRunner, logger log.Logger, prefix string) {

View File

@ -9,6 +9,7 @@ import (
gcontext "github.com/gorilla/context"
"github.com/gorilla/mux"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/cmd/frontend/internal/app/assetsutil"
"github.com/sourcegraph/sourcegraph/cmd/frontend/internal/httpapi"
@ -70,7 +71,12 @@ func serveInternalServer(obsvCtx *observation.Context) (context.CancelFunc, erro
confServer.Start()
})
return confServer.Stop, nil
return func() {
err := confServer.Stop(context.Background())
if err != nil {
obsvCtx.Logger.Error("failed to stop conf server", log.Error(err))
}
}, nil
}
func serveExternalServer(obsvCtx *observation.Context, sqlDB *sql.DB, db database.DB) (context.CancelFunc, error) {
@ -100,5 +106,10 @@ func serveExternalServer(obsvCtx *observation.Context, sqlDB *sql.DB, db databas
progressServer.Start()
})
return progressServer.Stop, nil
return func() {
err := progressServer.Stop(context.Background())
if err != nil {
obsvCtx.Logger.Error("failed to stop progress server", log.Error(err))
}
}, nil
}

View File

@ -321,8 +321,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
logger.Info(fmt.Sprintf("✱ Sourcegraph is ready at: %s", globals.ExternalURL()))
ready()
goroutine.MonitorBackgroundRoutines(context.Background(), routines...)
return nil
return goroutine.MonitorBackgroundRoutines(context.Background(), routines...)
}
func makeExternalAPI(db database.DB, logger sglog.Logger, schema *graphql.Schema, enterprise enterprise.Services, rateLimiter graphqlbackend.LimitWatcher) (goroutine.BackgroundRoutine, error) {

View File

@ -58,7 +58,10 @@ func NewHandler(logger log.Logger) http.Handler {
// If we already have an updater goroutine running, we need to stop it.
if handler.updater != nil {
handler.updater.Stop()
err := handler.updater.Stop(ctx)
if err != nil {
logger.Error("failed to stop updater routine", log.Error(err))
}
handler.updater = nil
}
@ -78,7 +81,12 @@ func NewHandler(logger log.Logger) http.Handler {
goroutine.WithDescription("caches src-cli versions polled periodically"),
goroutine.WithInterval(config.interval),
)
go goroutine.MonitorBackgroundRoutines(ctx, handler.updater)
go func() {
err := goroutine.MonitorBackgroundRoutines(ctx, handler.updater)
if err != nil {
logger.Error("error monitoring updater routine", log.Error(err))
}
}()
handler.rc = rc
handler.webhookSecret = config.webhookSecret
@ -145,7 +153,7 @@ func (h *handler) handleWebhook(w http.ResponseWriter, r *http.Request) {
type signatureValidator func(signature string, payload []byte, secret []byte) error
func (h *handler) doHandleWebhook(w http.ResponseWriter, r *http.Request, signatureValidator signatureValidator) {
defer r.Body.Close()
defer func() { _ = r.Body.Close() }()
payload, err := io.ReadAll(r.Body)
if err != nil {
h.logger.Warn("error reading payload", log.Error(err))

View File

@ -227,7 +227,10 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
ready()
// Launch all routines!
goroutine.MonitorBackgroundRoutines(ctx, routines...)
err = goroutine.MonitorBackgroundRoutines(ctx, routines...)
if err != nil {
logger.Error("error monitoring background routines", log.Error(err))
}
// The most important thing this does is kill all our clones. If we just
// shutdown they will be orphaned and continue running.

View File

@ -166,18 +166,21 @@ type httpRoutine struct {
*http.Server
}
func (s *httpRoutine) Name() string { return "http" }
func (s *httpRoutine) Start() {
if err := s.Server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.log.Error("error stopping server", log.Error(err))
}
}
func (s *httpRoutine) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
func (s *httpRoutine) Stop(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := s.Server.Shutdown(ctx); err != nil {
s.log.Error("error shutting down server", log.Error(err))
} else {
s.log.Info("server stopped")
return errors.Wrap(err, "shutdown")
}
s.log.Info("server stopped")
return nil
}

View File

@ -94,9 +94,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
})
// Go!
goroutine.MonitorBackgroundRoutines(ctx, append(worker, server)...)
return nil
return goroutine.MonitorBackgroundRoutines(ctx, append(worker, server)...)
}
func mustInitializeDB(observationCtx *observation.Context) *sql.DB {

View File

@ -93,6 +93,10 @@ func NewUpdateScheduler(logger log.Logger, db database.DB, gitserverClient gitse
}
}
func (s *UpdateScheduler) Name() string {
return "UpdateScheduler"
}
func (s *UpdateScheduler) Start() {
// Make sure the update scheduler acts as an internal actor, so it can see all
// repos.
@ -103,10 +107,11 @@ func (s *UpdateScheduler) Start() {
go s.runScheduleLoop(ctx)
}
func (s *UpdateScheduler) Stop() {
func (s *UpdateScheduler) Stop(context.Context) error {
if s.cancelCtx != nil {
s.cancelCtx()
}
return nil
}
// runScheduleLoop starts the loop that schedules updates by enqueuing them into the updateQueue.

View File

@ -180,9 +180,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
// after being unblocked.
ready()
goroutine.MonitorBackgroundRoutines(ctx, routines...)
return nil
return goroutine.MonitorBackgroundRoutines(ctx, routines...)
}
func getDB(observationCtx *observation.Context) (database.DB, error) {

View File

@ -109,9 +109,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
// Mark health server as ready and go!
ready()
goroutine.MonitorBackgroundRoutines(ctx, routines...)
return nil
return goroutine.MonitorBackgroundRoutines(ctx, routines...)
}
func mustInitializeFrontendDB(observationCtx *observation.Context) *sql.DB {

View File

@ -3,7 +3,6 @@ package shared
import (
"context"
"database/sql"
"net/http"
"time"
@ -50,9 +49,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
})
// Go!
goroutine.MonitorBackgroundRoutines(ctx, server, indexingWorker)
return nil
return goroutine.MonitorBackgroundRoutines(ctx, server, indexingWorker)
}
func initDB(observationCtx *observation.Context, name string) *sql.DB {

View File

@ -130,9 +130,9 @@ func (Service) Initialize(ctx context.Context, logger log.Logger, contract runti
),
background.CallbackRoutine{
// No Start - serving is handled by httpserver
StopFunc: func() {
StopFunc: func(ctx context.Context) error {
grpcServer.GracefulStop()
eventsTopic.Stop()
return eventsTopic.Stop(ctx)
},
},
}, nil

View File

@ -105,7 +105,10 @@ func TestPermsSyncerWorker_RepoSyncJobs(t *testing.T) {
workerStore := makeStore(observationCtx, db.Handle(), syncTypeRepo)
worker := MakeTestWorker(ctx, observationCtx, workerStore, dummySyncer, syncTypeRepo, syncJobsStore)
go worker.Start()
t.Cleanup(worker.Stop)
t.Cleanup(func() {
err := worker.Stop(ctx)
require.NoError(t, err)
})
// Adding repo perms sync jobs.
err = syncJobsStore.CreateRepoSyncJob(ctx, api.RepoID(1), database.PermissionSyncJobOpts{Reason: database.ReasonManualRepoSync, Priority: database.MediumPriorityPermissionsSync, TriggeredByUserID: user1.ID})
@ -248,7 +251,10 @@ func TestPermsSyncerWorker_UserSyncJobs(t *testing.T) {
workerStore := makeStore(observationCtx, db.Handle(), syncTypeUser)
worker := MakeTestWorker(ctx, observationCtx, workerStore, dummySyncer, syncTypeUser, syncJobsStore)
go worker.Start()
t.Cleanup(worker.Stop)
t.Cleanup(func() {
err := worker.Stop(ctx)
require.NoError(t, err)
})
// Adding user perms sync jobs.
err = syncJobsStore.CreateUserSyncJob(ctx, user1.ID,

View File

@ -44,6 +44,10 @@ type usageRoutine struct {
cancel context.CancelFunc
}
func (j *usageRoutine) Name() string {
return "CodyGatewayUsageWorker"
}
func (j *usageRoutine) Start() {
var ctx context.Context
ctx, j.cancel = context.WithCancel(context.Background())
@ -100,8 +104,9 @@ func (j *usageRoutine) Start() {
})
}
func (j *usageRoutine) Stop() {
func (j *usageRoutine) Stop(context.Context) error {
if j.cancel != nil {
j.cancel()
}
return nil
}

View File

@ -199,6 +199,11 @@ func StartLicenseCheck(originalCtx context.Context, logger log.Logger, db databa
goroutine.WithInterval(licensing.LicenseCheckInterval),
goroutine.WithInitialDelay(initialWaitInterval),
)
go goroutine.MonitorBackgroundRoutines(ctxWithCancel, routine)
go func() {
err := goroutine.MonitorBackgroundRoutines(ctxWithCancel, routine)
if err != nil {
logger.Error("error monitoring background routines", log.Error(err))
}
}()
})
}

View File

@ -6,14 +6,13 @@ import (
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/cmd/worker/job"
workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/dotcom"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/licensing"
"github.com/sourcegraph/sourcegraph/internal/observation"
workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db"
)
type licenseWorker struct{}
@ -51,6 +50,10 @@ type licenseChecksWrapper struct {
db database.DB
}
func (l *licenseChecksWrapper) Name() string {
return "LicenseChecks"
}
func (l *licenseChecksWrapper) Start() {
goroutine.Go(func() {
licensing.StartMaxUserCount(l.logger, &usersStore{
@ -62,9 +65,7 @@ func (l *licenseChecksWrapper) Start() {
}
}
func (l *licenseChecksWrapper) Stop() {
// no-op
}
func (l *licenseChecksWrapper) Stop(context.Context) error { return nil }
type usersStore struct {
db database.DB

View File

@ -102,7 +102,10 @@ func TestExhaustiveSearch(t *testing.T) {
require.NoError(err)
for _, routine := range routines {
go routine.Start()
defer routine.Stop()
defer func() {
err := routine.Stop(context.Background())
require.NoError(err)
}()
}
require.Eventually(func() bool {
return !searchJob.hasWork(workerCtx)

View File

@ -205,8 +205,7 @@ func Start(ctx context.Context, observationCtx *observation.Context, ready servi
allRoutines = append(allRoutines, r.Routine)
}
goroutine.MonitorBackgroundRoutines(ctx, allRoutines...)
return nil
return goroutine.MonitorBackgroundRoutines(ctx, allRoutines...)
}
// loadConfigs calls Load on the configs of each of the jobs registered in this binary.

View File

@ -63,6 +63,7 @@ go_test(
"@com_github_buildkite_go_buildkite_v3//buildkite",
"@com_github_gorilla_mux//:mux",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_google_cloud_go_bigquery//:bigquery",
],

View File

@ -69,7 +69,7 @@ func NewServer(addr string, logger log.Logger, c config.Config, bqWriter BigQuer
bkClient: buildkite.NewClient(bk.Client()),
}
// Register routes the the server will be responding too
// Register routes the server will be responding too
r := mux.NewRouter()
r.Path("/buildkite").HandlerFunc(server.handleEvent).Methods(http.MethodPost)
r.Path("/-/healthz").HandlerFunc(server.handleHealthz).Methods(http.MethodGet)
@ -85,20 +85,23 @@ func NewServer(addr string, logger log.Logger, c config.Config, bqWriter BigQuer
return server
}
func (s *Server) Name() string { return "build-tracker" }
func (s *Server) Start() {
if err := s.http.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.logger.Error("error stopping server", log.Error(err))
}
}
func (s *Server) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
func (s *Server) Stop(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := s.http.Shutdown(ctx); err != nil {
s.logger.Error("error shutting down server", log.Error(err))
} else {
s.logger.Info("server stopped")
return errors.Wrap(err, "shutdown server")
}
s.logger.Info("server stopped")
return nil
}
func (s *Server) handleGetBuild(w http.ResponseWriter, req *http.Request) {
@ -116,7 +119,6 @@ func (s *Server) handleGetBuild(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
return
}
}
vars := mux.Vars(req)

View File

@ -13,6 +13,7 @@ import (
"github.com/buildkite/go-buildkite/v3/buildkite"
"github.com/gorilla/mux"
"github.com/sourcegraph/log/logtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/dev/build-tracker/build"
@ -145,7 +146,13 @@ func TestOldBuildsGetDeleted(t *testing.T) {
server.store.Set(b)
ctx, cancel := context.WithCancel(context.Background())
go goroutine.MonitorBackgroundRoutines(ctx, deleteOldBuilds(logger, server.store, 10*time.Millisecond, 24*time.Hour))
go func() {
err := goroutine.MonitorBackgroundRoutines(
ctx,
deleteOldBuilds(logger, server.store, 10*time.Millisecond, 24*time.Hour),
)
assert.EqualError(t, err, "unable to stop routines gracefully: context canceled")
}()
time.Sleep(20 * time.Millisecond)
cancel()
@ -167,7 +174,13 @@ func TestOldBuildsGetDeleted(t *testing.T) {
server.store.Set(b)
ctx, cancel := context.WithCancel(context.Background())
go goroutine.MonitorBackgroundRoutines(ctx, deleteOldBuilds(logger, server.store, 10*time.Millisecond, 24*time.Hour))
go func() {
err := goroutine.MonitorBackgroundRoutines(
ctx,
deleteOldBuilds(logger, server.store, 10*time.Millisecond, 24*time.Hour),
)
assert.EqualError(t, err, "unable to stop routines gracefully: context canceled")
}()
time.Sleep(20 * time.Millisecond)
cancel()

View File

@ -89,18 +89,23 @@ type httpRoutine struct {
*http.Server
}
func (s *httpRoutine) Name() string {
return "http"
}
func (s *httpRoutine) Start() {
if err := s.Server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.log.Error("error stopping server", log.Error(err))
}
}
func (s *httpRoutine) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
func (s *httpRoutine) Stop(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := s.Server.Shutdown(ctx); err != nil {
s.log.Error("error shutting down server", log.Error(err))
} else {
s.log.Info("server stopped")
}
return nil
}

View File

@ -100,12 +100,13 @@ func (s *Scheduler) Start() {
}
}
func (s *Scheduler) Stop() {
func (s *Scheduler) Stop(context.Context) error {
if s.recorder != nil {
go s.recorder.LogStop(s)
}
s.done <- struct{}{}
close(s.done)
return nil
}
func (s *Scheduler) enqueueChangeset() error {

View File

@ -68,6 +68,10 @@ func NewSyncRegistry(ctx context.Context, observationCtx *observation.Context, b
}
}
func (s *SyncRegistry) Name() string {
return "SyncRegistry"
}
func (s *SyncRegistry) Start() {
// Fetch initial list of syncers.
if err := s.syncCodeHosts(s.ctx); err != nil {
@ -88,11 +92,15 @@ func (s *SyncRegistry) Start() {
goroutine.WithInterval(externalServiceSyncerInterval),
)
goroutine.MonitorBackgroundRoutines(s.ctx, externalServiceSyncer)
err := goroutine.MonitorBackgroundRoutines(s.ctx, externalServiceSyncer)
if err != nil {
s.logger.Error("error monitoring background routines", log.Error(err))
}
}
func (s *SyncRegistry) Stop() {
func (s *SyncRegistry) Stop(context.Context) error {
s.cancel()
return nil
}
// EnqueueChangesetSyncs will enqueue the changesets with the supplied ids for high priority syncing.

View File

@ -210,7 +210,12 @@ func RunOutOfBandMigrations(
}
go runner.StartPartial(ids)
defer runner.Stop()
defer func() {
err := runner.Stop(ctx)
if err != nil {
out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Failed to stop out of band migrations: %s", err))
}
}()
defer func() {
if err == nil {
out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Out of band migrations complete"))

View File

@ -84,7 +84,7 @@ type Dumper interface {
// Any extra endpoints supplied will be registered via their own declared path.
func NewServerRoutine(ready <-chan struct{}, extra ...Endpoint) goroutine.BackgroundRoutine {
if addr == "" {
return goroutine.NoopRoutine()
return goroutine.NoopRoutine("noop server")
}
handler := httpserver.NewHandler(func(router *mux.Router) {

View File

@ -37,6 +37,7 @@ go_test(
deps = [
"//lib/errors",
"@com_github_derision_test_glock//:glock",
"@com_github_stretchr_testify//require",
],
)

View File

@ -35,5 +35,6 @@ var MonitorBackgroundRoutines = background.Monitor
// CombinedRoutine is a list of routines which are started and stopped in unison.
type CombinedRoutine = background.CombinedRoutine
// NoopRoutine does nothing for start or stop.
// NoopRoutine return a background routine that does nothing for start or stop.
// If the name is empty, it will default to "noop".
var NoopRoutine = background.NoopRoutine

View File

@ -10,6 +10,10 @@ type exampleRoutine struct {
done chan struct{}
}
func (m *exampleRoutine) Name() string {
return "exampleRoutine"
}
func (m *exampleRoutine) Start() {
for {
select {
@ -24,8 +28,9 @@ func (m *exampleRoutine) Start() {
}
}
func (m *exampleRoutine) Stop() {
func (m *exampleRoutine) Stop(context.Context) error {
m.done <- struct{}{}
return nil
}
func ExampleBackgroundRoutine() {
@ -35,7 +40,12 @@ func ExampleBackgroundRoutine() {
ctx, cancel := context.WithCancel(context.Background())
go MonitorBackgroundRoutines(ctx, r)
go func() {
err := MonitorBackgroundRoutines(ctx, r)
if err != nil {
fmt.Printf("error: %v\n", err)
}
}()
time.Sleep(500 * time.Millisecond)
cancel()
@ -58,7 +68,12 @@ func ExamplePeriodicGoroutine() {
WithInterval(200*time.Millisecond),
)
go MonitorBackgroundRoutines(ctx, r)
go func() {
err := MonitorBackgroundRoutines(ctx, r)
if err != nil {
fmt.Printf("error: %v\n", err)
}
}()
time.Sleep(500 * time.Millisecond)
cancel()

View File

@ -16,6 +16,9 @@ import (
// github.com/sourcegraph/sourcegraph/internal/goroutine) used for unit
// testing.
type MockBackgroundRoutine struct {
// NameFunc is an instance of a mock function object controlling the
// behavior of the method Name.
NameFunc *BackgroundRoutineNameFunc
// StartFunc is an instance of a mock function object controlling the
// behavior of the method Start.
StartFunc *BackgroundRoutineStartFunc
@ -29,13 +32,18 @@ type MockBackgroundRoutine struct {
// overwritten.
func NewMockBackgroundRoutine() *MockBackgroundRoutine {
return &MockBackgroundRoutine{
NameFunc: &BackgroundRoutineNameFunc{
defaultHook: func() (r0 string) {
return
},
},
StartFunc: &BackgroundRoutineStartFunc{
defaultHook: func() {
return
},
},
StopFunc: &BackgroundRoutineStopFunc{
defaultHook: func() {
defaultHook: func(context.Context) (r0 error) {
return
},
},
@ -47,13 +55,18 @@ func NewMockBackgroundRoutine() *MockBackgroundRoutine {
// overwritten.
func NewStrictMockBackgroundRoutine() *MockBackgroundRoutine {
return &MockBackgroundRoutine{
NameFunc: &BackgroundRoutineNameFunc{
defaultHook: func() string {
panic("unexpected invocation of MockBackgroundRoutine.Name")
},
},
StartFunc: &BackgroundRoutineStartFunc{
defaultHook: func() {
panic("unexpected invocation of MockBackgroundRoutine.Start")
},
},
StopFunc: &BackgroundRoutineStopFunc{
defaultHook: func() {
defaultHook: func(context.Context) error {
panic("unexpected invocation of MockBackgroundRoutine.Stop")
},
},
@ -65,6 +78,9 @@ func NewStrictMockBackgroundRoutine() *MockBackgroundRoutine {
// implementation, unless overwritten.
func NewMockBackgroundRoutineFrom(i BackgroundRoutine) *MockBackgroundRoutine {
return &MockBackgroundRoutine{
NameFunc: &BackgroundRoutineNameFunc{
defaultHook: i.Name,
},
StartFunc: &BackgroundRoutineStartFunc{
defaultHook: i.Start,
},
@ -74,6 +90,105 @@ func NewMockBackgroundRoutineFrom(i BackgroundRoutine) *MockBackgroundRoutine {
}
}
// BackgroundRoutineNameFunc describes the behavior when the Name method of
// the parent MockBackgroundRoutine instance is invoked.
type BackgroundRoutineNameFunc struct {
defaultHook func() string
hooks []func() string
history []BackgroundRoutineNameFuncCall
mutex sync.Mutex
}
// Name delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockBackgroundRoutine) Name() string {
r0 := m.NameFunc.nextHook()()
m.NameFunc.appendCall(BackgroundRoutineNameFuncCall{r0})
return r0
}
// SetDefaultHook sets function that is called when the Name method of the
// parent MockBackgroundRoutine instance is invoked and the hook queue is
// empty.
func (f *BackgroundRoutineNameFunc) SetDefaultHook(hook func() string) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Name method of the parent MockBackgroundRoutine instance invokes the hook
// at the front of the queue and discards it. After the queue is empty, the
// default hook function is invoked for any future action.
func (f *BackgroundRoutineNameFunc) PushHook(hook func() string) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *BackgroundRoutineNameFunc) SetDefaultReturn(r0 string) {
f.SetDefaultHook(func() string {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *BackgroundRoutineNameFunc) PushReturn(r0 string) {
f.PushHook(func() string {
return r0
})
}
func (f *BackgroundRoutineNameFunc) nextHook() func() string {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *BackgroundRoutineNameFunc) appendCall(r0 BackgroundRoutineNameFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of BackgroundRoutineNameFuncCall objects
// describing the invocations of this function.
func (f *BackgroundRoutineNameFunc) History() []BackgroundRoutineNameFuncCall {
f.mutex.Lock()
history := make([]BackgroundRoutineNameFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// BackgroundRoutineNameFuncCall is an object that describes an invocation
// of method Name on an instance of MockBackgroundRoutine.
type BackgroundRoutineNameFuncCall struct {
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 string
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c BackgroundRoutineNameFuncCall) Args() []interface{} {
return []interface{}{}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c BackgroundRoutineNameFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// BackgroundRoutineStartFunc describes the behavior when the Start method
// of the parent MockBackgroundRoutine instance is invoked.
type BackgroundRoutineStartFunc struct {
@ -172,24 +287,24 @@ func (c BackgroundRoutineStartFuncCall) Results() []interface{} {
// BackgroundRoutineStopFunc describes the behavior when the Stop method of
// the parent MockBackgroundRoutine instance is invoked.
type BackgroundRoutineStopFunc struct {
defaultHook func()
hooks []func()
defaultHook func(context.Context) error
hooks []func(context.Context) error
history []BackgroundRoutineStopFuncCall
mutex sync.Mutex
}
// Stop delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockBackgroundRoutine) Stop() {
m.StopFunc.nextHook()()
m.StopFunc.appendCall(BackgroundRoutineStopFuncCall{})
return
func (m *MockBackgroundRoutine) Stop(v0 context.Context) error {
r0 := m.StopFunc.nextHook()(v0)
m.StopFunc.appendCall(BackgroundRoutineStopFuncCall{v0, r0})
return r0
}
// SetDefaultHook sets function that is called when the Stop method of the
// parent MockBackgroundRoutine instance is invoked and the hook queue is
// empty.
func (f *BackgroundRoutineStopFunc) SetDefaultHook(hook func()) {
func (f *BackgroundRoutineStopFunc) SetDefaultHook(hook func(context.Context) error) {
f.defaultHook = hook
}
@ -197,7 +312,7 @@ func (f *BackgroundRoutineStopFunc) SetDefaultHook(hook func()) {
// Stop method of the parent MockBackgroundRoutine instance invokes the hook
// at the front of the queue and discards it. After the queue is empty, the
// default hook function is invoked for any future action.
func (f *BackgroundRoutineStopFunc) PushHook(hook func()) {
func (f *BackgroundRoutineStopFunc) PushHook(hook func(context.Context) error) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
@ -205,20 +320,20 @@ func (f *BackgroundRoutineStopFunc) PushHook(hook func()) {
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *BackgroundRoutineStopFunc) SetDefaultReturn() {
f.SetDefaultHook(func() {
return
func (f *BackgroundRoutineStopFunc) SetDefaultReturn(r0 error) {
f.SetDefaultHook(func(context.Context) error {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *BackgroundRoutineStopFunc) PushReturn() {
f.PushHook(func() {
return
func (f *BackgroundRoutineStopFunc) PushReturn(r0 error) {
f.PushHook(func(context.Context) error {
return r0
})
}
func (f *BackgroundRoutineStopFunc) nextHook() func() {
func (f *BackgroundRoutineStopFunc) nextHook() func(context.Context) error {
f.mutex.Lock()
defer f.mutex.Unlock()
@ -250,18 +365,25 @@ func (f *BackgroundRoutineStopFunc) History() []BackgroundRoutineStopFuncCall {
// BackgroundRoutineStopFuncCall is an object that describes an invocation
// of method Stop on an instance of MockBackgroundRoutine.
type BackgroundRoutineStopFuncCall struct{}
type BackgroundRoutineStopFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c BackgroundRoutineStopFuncCall) Args() []interface{} {
return []interface{}{}
return []interface{}{c.Arg0}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c BackgroundRoutineStopFuncCall) Results() []interface{} {
return []interface{}{}
return []interface{}{c.Result0}
}
// MockErrorHandler is a mock implementation of the ErrorHandler interface

View File

@ -181,12 +181,13 @@ func (r *PeriodicGoroutine) Start() {
// Stop will cancel the context passed to the handler function to stop the current
// iteration of work, then break the loop in the Start method so that no new work
// is accepted. This method blocks until Start has returned.
func (r *PeriodicGoroutine) Stop() {
func (r *PeriodicGoroutine) Stop(context.Context) error {
if r.recorder != nil {
go r.recorder.LogStop(r)
}
r.cancel()
<-r.finished
return nil
}
func (r *PeriodicGoroutine) runHandlerPool() {

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/derision-test/glock"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -42,7 +43,8 @@ func TestPeriodicGoroutine(t *testing.T) {
<-called
clock.BlockingAdvance(time.Second)
<-called
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
if calls := len(handler.HandleFunc.History()); calls != 4 {
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)
@ -80,7 +82,8 @@ func TestPeriodicGoroutineReinvoke(t *testing.T) {
witnessHandler()
clock.BlockingAdvance(time.Second)
witnessHandler()
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
if calls := len(handler.HandleFunc.History()); calls != 4*maxConsecutiveReinvocations {
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4*maxConsecutiveReinvocations, calls)
@ -120,7 +123,8 @@ func TestPeriodicGoroutineWithDynamicInterval(t *testing.T) {
<-called
clock.BlockingAdvance(3 * time.Second)
<-called
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
if calls := len(handler.HandleFunc.History()); calls != 4 {
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)
@ -158,7 +162,8 @@ func TestPeriodicGoroutineWithInitialDelay(t *testing.T) {
<-called
clock.BlockingAdvance(time.Second)
<-called
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
if calls := len(handler.HandleFunc.History()); calls != 3 {
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 3, calls)
@ -199,7 +204,8 @@ func TestPeriodicGoroutineConcurrency(t *testing.T) {
<-called
}
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
if calls := len(handler.HandleFunc.History()); calls != 3*concurrency {
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 3*concurrency, calls)
@ -260,7 +266,8 @@ func TestPeriodicGoroutineWithDynamicConcurrency(t *testing.T) {
<-exit // wait for blocked handler to exit
}
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
// N.B.: no need for assertions here as getting through the test at all to this
// point without some permanent blockage shows that each of the pool sizes behave
@ -297,7 +304,8 @@ func TestPeriodicGoroutineError(t *testing.T) {
<-called
clock.BlockingAdvance(time.Second)
<-called
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
if calls := len(handler.HandleFunc.History()); calls != 4 {
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)
@ -352,7 +360,8 @@ func TestPeriodicGoroutinePanic(t *testing.T) {
case <-time.After(time.Second):
t.Fatal("second call didn't happen within 1s")
}
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
if calls := len(handler.HandleFunc.History()); calls != 2 {
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)
@ -382,7 +391,8 @@ func TestPeriodicGoroutineContextError(t *testing.T) {
)
go goroutine.Start()
<-called
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
if calls := len(handler.HandleFunc.History()); calls != 1 {
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 1, calls)
@ -417,7 +427,8 @@ func TestPeriodicGoroutineFinalizer(t *testing.T) {
<-called
clock.BlockingAdvance(time.Second)
<-called
goroutine.Stop()
err := goroutine.Stop(context.Background())
require.NoError(t, err)
if calls := len(handler.HandleFunc.History()); calls != 4 {
t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls)

View File

@ -1,13 +1,15 @@
package recorder
import (
"context"
"testing"
"time"
"github.com/sourcegraph/log"
"github.com/stretchr/testify/assert"
"github.com/sourcegraph/sourcegraph/internal/rcache"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/stretchr/testify/assert"
)
// TestLoggerAndReaderHappyPaths tests pretty much everything in the happy path of both the logger and the log reader.
@ -114,8 +116,9 @@ func (r *RoutineMock) Start() {
// Do nothing
}
func (r *RoutineMock) Stop() {
func (r *RoutineMock) Stop(context.Context) error {
// Do nothing
return nil
}
func (r *RoutineMock) Name() string {

View File

@ -1,6 +1,7 @@
package recorder
import (
"context"
"encoding/json"
"time"
@ -12,7 +13,7 @@ import (
type Recordable interface {
Start()
Stop()
Stop(ctx context.Context) error
Name() string
Type() RoutineType
JobName() string

View File

@ -1,6 +1,7 @@
package grpcserver
import (
"context"
"net"
"os"
"sync"
@ -53,6 +54,10 @@ func newServer(logger log.Logger, grpcServer *grpc.Server, makeListener func() (
return s
}
func (s *server) Name() string {
return "gRPC server"
}
func (s *server) Start() {
listener, err := s.makeListener()
if err != nil {
@ -66,7 +71,7 @@ func (s *server) Start() {
}
}
func (s *server) Stop() {
func (s *server) Stop(context.Context) error {
s.once.Do(func() {
s.logger.Info("Shutting down gRPC server")
if s.preShutdownPause > 0 {
@ -90,4 +95,5 @@ func (s *server) Stop() {
s.logger.Warn("gRPC server forcefully stopped")
})
return nil
}

View File

@ -51,6 +51,10 @@ func newServer(httpServer *http.Server, makeListener func() (net.Listener, error
return s
}
func (s *server) Name() string {
return "HTTP server"
}
func (s *server) Start() {
listener, err := s.makeListener()
if err != nil {
@ -64,17 +68,18 @@ func (s *server) Start() {
}
}
func (s *server) Stop() {
func (s *server) Stop(ctx context.Context) error {
s.once.Do(func() {
if s.preShutdownPause > 0 {
time.Sleep(s.preShutdownPause)
}
ctx, cancel := context.WithTimeout(context.Background(), goroutine.GracefulShutdownTimeout)
ctx, cancel := context.WithTimeout(ctx, goroutine.GracefulShutdownTimeout)
defer cancel()
if err := s.server.Shutdown(ctx); err != nil {
log15.Error("Failed to shutdown server", "error", err)
}
})
return nil
}

View File

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
edb "github.com/sourcegraph/sourcegraph/internal/database"
@ -22,7 +23,7 @@ import (
func Test_MonitorStartsAndStops(t *testing.T) {
logger := logtest.Scoped(t)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
insightsDB := edb.NewInsightsDB(dbtest.NewInsightsDB(logger, t), logger)
repos := dbmocks.NewMockRepoStore()
@ -33,7 +34,8 @@ func Test_MonitorStartsAndStops(t *testing.T) {
CostAnalyzer: priority.NewQueryAnalyzer(),
}
routines := NewBackgroundJobMonitor(ctx, config).Routines()
goroutine.MonitorBackgroundRoutines(ctx, routines...)
err := goroutine.MonitorBackgroundRoutines(ctx, routines...)
assert.EqualError(t, err, "unable to stop routines gracefully: context deadline exceeded")
}
func TestScheduler_InitialBackfill(t *testing.T) {

View File

@ -74,6 +74,7 @@ go_test(
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_sourcegraph_log//:log",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//require",
],
)

View File

@ -193,6 +193,10 @@ func (r *Runner) UpdateDirection(ctx context.Context, ids []int, applyReverse bo
return nil
}
func (r *Runner) Name() string {
return "oobmigrationRunner"
}
// Start runs registered migrators on a loop until they complete. This method will periodically
// re-read from the database in order to refresh its current view of the migrations.
func (r *Runner) Start(currentVersion, firstVersion Version) {
@ -338,9 +342,10 @@ func (r *Runner) ensureProcessorIsRunning(wg *sync.WaitGroup, m map[int]chan Mig
}
// Stop will cancel the context used in Start, then blocks until Start has returned.
func (r *Runner) Stop() {
func (r *Runner) Stop(context.Context) error {
r.cancel()
<-r.finished
return nil
}
type migratorOptions struct {

View File

@ -8,6 +8,7 @@ import (
"github.com/derision-test/glock"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/log"
"github.com/sourcegraph/log/logtest"
@ -36,7 +37,8 @@ func TestRunner(t *testing.T) {
go runner.startInternal(allowAll)
tickN(ticker, 3)
runner.Stop()
err := runner.Stop(context.Background())
require.NoError(t, err)
if callCount := len(migrator.UpFunc.History()); callCount != 3 {
t.Errorf("unexpected number of calls to Up. want=%d have=%d", 3, callCount)
@ -67,7 +69,8 @@ func TestRunnerError(t *testing.T) {
go runner.startInternal(allowAll)
tickN(ticker, 1)
runner.Stop()
err := runner.Stop(context.Background())
require.NoError(t, err)
if calls := store.AddErrorFunc.history; len(calls) != 1 {
t.Fatalf("unexpected number of calls to AddError. want=%d have=%d", 1, len(calls))
@ -124,7 +127,8 @@ func TestRunnerRemovesCompleted(t *testing.T) {
tickN(ticker1, 5)
tickN(ticker2, 5)
tickN(ticker3, 5)
runner.Stop()
err := runner.Stop(context.Background())
require.NoError(t, err)
// not finished
if callCount := len(migrator1.UpFunc.History()); callCount != 5 {
@ -417,7 +421,8 @@ func TestRunnerBoundsFilter(t *testing.T) {
return m.ID != 2
})
tickN(ticker, 64)
runner.Stop()
err := runner.Stop(context.Background())
require.NoError(t, err)
// not called
if callCount := len(migrator2.UpFunc.History()); callCount != 0 {

View File

@ -50,4 +50,4 @@ func (c *MemoryTopicClient) PublishMessage(ctx context.Context, message []byte,
}
func (c *MemoryTopicClient) Ping(context.Context) error { return nil }
func (c *MemoryTopicClient) Stop() {}
func (c *MemoryTopicClient) Stop(context.Context) error { return nil }

View File

@ -25,7 +25,7 @@ type TopicClient interface {
Ping(ctx context.Context) error
// Stop stops the topic publishing channel. The client should not be used after
// calling Stop.
Stop()
Stop(ctx context.Context) error
}
// TopicPublisher is a Pub/Sub publisher bound to a topic.
@ -101,8 +101,9 @@ func (c *topicClient) PublishMessage(ctx context.Context, message []byte, attrib
return nil
}
func (c *topicClient) Stop() {
func (c *topicClient) Stop(context.Context) error {
c.topic.Stop()
return nil
}
// NewNoopTopicClient creates a no-op Pub/Sub client that does nothing on any
@ -118,7 +119,7 @@ func (c *noopTopicClient) Publish(context.Context, ...[]byte) error { return nil
func (c *noopTopicClient) PublishMessage(context.Context, []byte, map[string]string) error {
return nil
}
func (c *noopTopicClient) Stop() {}
func (c *noopTopicClient) Stop(context.Context) error { return nil }
// NewLoggingTopicClient creates a Pub/Sub client that just logs all messages,
// and does nothing otherwise. This is also a useful stub implementation of the

View File

@ -90,7 +90,7 @@ func NewSyncWorker(ctx context.Context, observationCtx *observation.Context, dbH
if opts.CleanupOldJobs {
janitor = newJobCleanerRoutine(ctx, database.NewDBWith(observationCtx.Logger, basestore.NewWithHandle(dbHandle)), opts.CleanupOldJobsInterval)
} else {
janitor = goroutine.NoopRoutine()
janitor = goroutine.NoopRoutine("noop sync worker")
}
return worker, resetter, janitor

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/keegancsmith/sqlf"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/log"
@ -64,9 +65,14 @@ func TestSyncWorkerPlumbing(t *testing.T) {
// finalising the row which means that when running tests in verbose mode we'll
// see "sql: transaction has already been committed or rolled back". These
// errors can be ignored.
defer janitor.Stop()
defer resetter.Stop()
defer worker.Stop()
t.Cleanup(func() {
err := janitor.Stop(ctx)
require.NoError(t, err)
err = resetter.Stop(ctx)
require.NoError(t, err)
err = worker.Stop(ctx)
require.NoError(t, err)
})
var job *repos.SyncJob
select {

View File

@ -12,6 +12,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/keegancsmith/sqlf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
@ -954,12 +955,16 @@ func TestSyncRun(t *testing.T) {
done := make(chan struct{})
go func() {
goroutine.MonitorBackgroundRoutines(ctx, syncer.Routines(ctx, store, repos.RunOptions{
EnqueueInterval: func() time.Duration { return time.Second },
IsDotCom: false,
MinSyncInterval: func() time.Duration { return 1 * time.Millisecond },
DequeueInterval: 1 * time.Millisecond,
})...)
err := goroutine.MonitorBackgroundRoutines(
ctx,
syncer.Routines(ctx, store, repos.RunOptions{
EnqueueInterval: func() time.Duration { return time.Second },
IsDotCom: false,
MinSyncInterval: func() time.Duration { return 1 * time.Millisecond },
DequeueInterval: 1 * time.Millisecond,
})...,
)
assert.EqualError(t, err, "unable to stop routines gracefully: context canceled")
done <- struct{}{}
}()
@ -1109,12 +1114,16 @@ func TestSyncerMultipleServices(t *testing.T) {
done := make(chan struct{})
go func() {
goroutine.MonitorBackgroundRoutines(ctx, syncer.Routines(ctx, store, repos.RunOptions{
EnqueueInterval: func() time.Duration { return time.Second },
IsDotCom: false,
MinSyncInterval: func() time.Duration { return 1 * time.Minute },
DequeueInterval: 1 * time.Millisecond,
})...)
err := goroutine.MonitorBackgroundRoutines(
ctx,
syncer.Routines(ctx, store, repos.RunOptions{
EnqueueInterval: func() time.Duration { return time.Second },
IsDotCom: false,
MinSyncInterval: func() time.Duration { return 1 * time.Minute },
DequeueInterval: 1 * time.Millisecond,
})...,
)
assert.EqualError(t, err, "unable to stop routines gracefully: context canceled")
done <- struct{}{}
}()

View File

@ -46,6 +46,7 @@ go_test(
"@com_github_google_go_cmp//cmp",
"@com_github_sourcegraph_log//:log",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//require",
],
)

View File

@ -34,5 +34,6 @@ go_test(
"@com_github_derision_test_glock//:glock",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//require",
],
)

View File

@ -2,6 +2,7 @@ package dbworker
import (
"context"
"fmt"
"time"
"github.com/derision-test/glock"
@ -96,6 +97,10 @@ func newResetter[T workerutil.Record](logger log.Logger, store store.Store[T], o
}
}
func (r *Resetter[T]) Name() string {
return fmt.Sprintf("dbworker.Resetter[%s]", r.options.Name)
}
// Start begins periodically calling reset stalled on the underlying store.
func (r *Resetter[T]) Start() {
defer close(r.finished)
@ -132,7 +137,8 @@ loop:
}
// Stop will cause the resetter loop to exit after the current iteration.
func (r *Resetter[T]) Stop() {
func (r *Resetter[T]) Stop(context.Context) error {
r.cancel()
<-r.finished
return nil
}

View File

@ -1,12 +1,14 @@
package dbworker
import (
"context"
"strconv"
"testing"
"time"
"github.com/derision-test/glock"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/log/logtest"
@ -43,7 +45,8 @@ func TestResetter(t *testing.T) {
resetter := newResetter(logger, store.Store[*TestRecord](s), options, clock)
go func() { resetter.Start() }()
clock.BlockingAdvance(time.Second)
resetter.Stop()
err := resetter.Stop(context.Background())
require.NoError(t, err)
if callCount := len(s.ResetStalledFunc.History()); callCount < 1 {
t.Errorf("unexpected reset stalled call count. want>=%d have=%d", 1, callCount)

View File

@ -256,12 +256,13 @@ loop:
// Stop will cause the worker loop to exit after the current iteration. This is done by canceling the
// context passed to the dequeue operations (but not the handler operations). This method blocks until
// all handler goroutines have exited.
func (w *Worker[T]) Stop() {
func (w *Worker[T]) Stop(context.Context) error {
if w.recorder != nil {
go w.recorder.LogStop(w)
}
w.dequeueCancel()
w.Wait()
return nil
}
// Wait blocks until all handler goroutines have exited.

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/derision-test/glock"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/log"
@ -48,10 +49,12 @@ func TestWorkerHandlerSuccess(t *testing.T) {
store.DequeueFunc.SetDefaultReturn(nil, false, nil)
store.MarkCompleteFunc.SetDefaultReturn(true, nil)
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
go func() { worker.Start() }()
dequeueClock.BlockingAdvance(time.Second)
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
if callCount := len(handler.HandleFunc.History()); callCount != 1 {
t.Errorf("unexpected handle call count. want=%d have=%d", 1, callCount)
@ -85,10 +88,12 @@ func TestWorkerHandlerFailure(t *testing.T) {
store.MarkErroredFunc.SetDefaultReturn(true, nil)
handler.HandleFunc.SetDefaultReturn(errors.Errorf("oops"))
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
go func() { worker.Start() }()
dequeueClock.BlockingAdvance(time.Second)
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
if callCount := len(handler.HandleFunc.History()); callCount != 1 {
t.Errorf("unexpected handle call count. want=%d have=%d", 1, callCount)
@ -131,10 +136,12 @@ func TestWorkerHandlerNonRetryableFailure(t *testing.T) {
testErr := nonRetryableTestErr{}
handler.HandleFunc.SetDefaultReturn(testErr)
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
go func() { worker.Start() }()
dequeueClock.BlockingAdvance(time.Second)
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
if callCount := len(handler.HandleFunc.History()); callCount != 1 {
t.Errorf("unexpected handle call count. want=%d have=%d", 1, callCount)
@ -202,12 +209,14 @@ func TestWorkerConcurrent(t *testing.T) {
return nil
})
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
go func() { worker.Start() }()
for range NumTestRecords {
dequeueClock.BlockingAdvance(time.Second)
}
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
// We never want to see more concurrently running jobs than configured.
// Ideally, we would also want to see that the number is exactly equal,
@ -244,10 +253,12 @@ func TestWorkerBlockingPreDequeueHook(t *testing.T) {
// Block all dequeues
handler.PreDequeueFunc.SetDefaultReturn(false, nil, nil)
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
go func() { worker.Start() }()
dequeueClock.BlockingAdvance(time.Second)
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
if callCount := len(handler.HandleFunc.History()); callCount != 0 {
t.Errorf("unexpected handle call count. want=%d have=%d", 0, callCount)
@ -278,12 +289,14 @@ func TestWorkerConditionalPreDequeueHook(t *testing.T) {
handler.PreDequeueFunc.PushReturn(true, "B", nil)
handler.PreDequeueFunc.PushReturn(true, "C", nil)
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
go func() { worker.Start() }()
dequeueClock.BlockingAdvance(time.Second)
dequeueClock.BlockingAdvance(time.Second)
dequeueClock.BlockingAdvance(time.Second)
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
if callCount := len(handler.HandleFunc.History()); callCount != 3 {
t.Errorf("unexpected handle call count. want=%d have=%d", 3, callCount)
@ -358,11 +371,13 @@ func TestWorkerDequeueHeartbeat(t *testing.T) {
return i, nil, nil
})
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
go func() { worker.Start() }()
t.Cleanup(func() {
close(doneHandling)
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
})
<-dequeued
@ -395,7 +410,8 @@ func TestWorkerNumTotalJobs(t *testing.T) {
store.MarkCompleteFunc.SetDefaultReturn(true, nil)
// Should process 5 then shut down
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
worker.Start()
if callCount := len(store.DequeueFunc.History()); callCount != 5 {
@ -437,7 +453,8 @@ func TestWorkerMaxActiveTime(t *testing.T) {
stopped := make(chan struct{})
go func() {
defer close(stopped)
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock)
worker.Start()
}()
@ -513,12 +530,14 @@ func TestWorkerCancelJobs(t *testing.T) {
clock := glock.NewMockClock()
heartbeatClock := glock.NewMockClock()
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, heartbeatClock, clock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, heartbeatClock, clock)
go func() { worker.Start() }()
t.Cleanup(func() {
// Keep the handler working until context is canceled.
close(doneHandling)
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
})
// Wait until a job has been dequeued.
@ -590,12 +609,14 @@ func TestWorkerDeadline(t *testing.T) {
})
clock := glock.NewMockClock()
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, clock, clock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, clock, clock)
go func() { worker.Start() }()
t.Cleanup(func() {
// Keep the handler working until context is canceled.
close(doneHandling)
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
})
// Wait until a job has been dequeued.
@ -644,9 +665,13 @@ func TestWorkerStopDrainsDequeueLoopOnly(t *testing.T) {
})
clock := glock.NewMockClock()
worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, clock, clock)
ctx := context.Background()
worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, clock, clock)
go func() { worker.Start() }()
t.Cleanup(func() { worker.Stop() })
t.Cleanup(func() {
err := worker.Stop(ctx)
require.NoError(t, err)
})
// Wait until a job has been dequeued.
<-dequeued
@ -657,7 +682,8 @@ func TestWorkerStopDrainsDequeueLoopOnly(t *testing.T) {
}()
// Drain dequeue loop and wait for the one active handler to finish.
worker.Stop()
err := worker.Stop(ctx)
require.NoError(t, err)
for _, call := range handler.HandleFunc.History() {
if call.Result0 != nil {

View File

@ -10,6 +10,7 @@ go_library(
],
importpath = "github.com/sourcegraph/sourcegraph/lib/background",
visibility = ["//visibility:public"],
deps = ["//lib/errors"],
)
go_test(
@ -20,7 +21,12 @@ go_test(
"mocks_test.go",
],
embed = [":background"],
deps = ["@com_github_stretchr_testify//assert"],
deps = [
"//lib/errors",
"@com_github_derision_test_go_mockgen_v2//testutil/require",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
go_mockgen(

View File

@ -2,47 +2,55 @@ package background
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// Routine represents a component of a binary that consists of a long
// running process with a graceful shutdown mechanism.
// Routine represents a background process that consists of a long-running
// process with a graceful shutdown mechanism.
type Routine interface {
// Start begins the long-running process. This routine may also implement
// a Stop method that should signal this process the application is going
// to shut down.
// Name returns the human-readable name of the routine.
Name() string
// Start begins the long-running process. This routine may also implement a Stop
// method that should signal this process the application is going to shut down.
Start()
// Stop signals the Start method to stop accepting new work and complete its
// current work. This method can but is not required to block until Start has
// returned.
Stop()
// returned. The method should respect the context deadline passed to it for
// proper graceful shutdown.
Stop(ctx context.Context) error
}
// Monitor will start the given background routines in their own
// goroutine. If the given context is canceled or a signal is received, the Stop
// method of each routine will be called. This method blocks until the Stop methods
// of each routine have returned. Two signals will cause the app to shutdown
// Monitor will start the given background routines in their own goroutine. If
// the given context is canceled or a signal is received, the Stop method of
// each routine will be called. This method blocks until the Stop methods of
// each routine have returned. Two signals will cause the app to shut down
// immediately.
func Monitor(ctx context.Context, routines ...Routine) {
//
// This function is only returned when routines are signaled to stop with
// potential errors from stopping routines.
func Monitor(ctx context.Context, routines ...Routine) error {
signals := make(chan os.Signal, 2)
signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
monitorBackgroundRoutines(ctx, signals, routines...)
return monitorBackgroundRoutines(ctx, signals, routines...)
}
func monitorBackgroundRoutines(ctx context.Context, signals <-chan os.Signal, routines ...Routine) {
func monitorBackgroundRoutines(ctx context.Context, signals <-chan os.Signal, routines ...Routine) error {
wg := &sync.WaitGroup{}
startAll(wg, routines...)
waitForSignal(ctx, signals)
stopAll(wg, routines...)
wg.Wait()
return stopAll(ctx, wg, routines...)
}
// startAll calls each routine's Start method in its own goroutine and registers
// each running goroutine with the given waitgroup.
// each running goroutine with the given waitgroup. It DOES NOT wait for the
// routines to finish starting, so the caller must wait for the waitgroup (if
// desired).
func startAll(wg *sync.WaitGroup, routines ...Routine) {
for _, r := range routines {
t := r
@ -52,12 +60,39 @@ func startAll(wg *sync.WaitGroup, routines ...Routine) {
}
// stopAll calls each routine's Stop method in its own goroutine and registers
// each running goroutine with the given waitgroup.
func stopAll(wg *sync.WaitGroup, routines ...Routine) {
// each running goroutine with the given waitgroup. It waits for all routines to
// stop or the context to be canceled.
func stopAll(ctx context.Context, wg *sync.WaitGroup, routines ...Routine) error {
var stopErrs error
var stopErrsLock sync.Mutex
for _, r := range routines {
t := r
wg.Add(1)
Go(func() { defer wg.Done(); t.Stop() })
Go(func() {
defer wg.Done()
if err := r.Stop(ctx); err != nil {
stopErrsLock.Lock()
stopErrs = errors.Append(stopErrs, errors.Wrapf(err, "stop routine %q", r.Name()))
stopErrsLock.Unlock()
}
})
}
done := make(chan struct{})
go func() {
wg.Wait()
done <- struct{}{}
}()
select {
case <-done:
return stopErrs
case <-ctx.Done():
stopErrsLock.Lock()
defer stopErrsLock.Unlock()
if stopErrs != nil {
return errors.Wrapf(ctx.Err(), "unable to stop routines gracefully with partial errors: %v", stopErrs)
}
return errors.Wrap(ctx.Err(), "unable to stop routines gracefully")
}
}
@ -89,19 +124,32 @@ func exitAfterSignals(signals <-chan os.Signal, numSignals int) {
exiter()
}
// CombinedRoutine is a list of routines which are started and stopped in unison.
// CombinedRoutine is a list of routines which are started and stopped in
// unison.
type CombinedRoutine []Routine
func (r CombinedRoutine) Start() {
func (rs CombinedRoutine) Name() string {
names := make([]string, 0, len(rs))
for _, r := range rs {
names = append(names, r.Name())
}
return fmt.Sprintf("combined%q", names) // [a b c] -> combined["one" "two" "three"]
}
// Start starts all routines, it does not wait for the routines to finish
// starting.
func (rs CombinedRoutine) Start() {
wg := &sync.WaitGroup{}
startAll(wg, r...)
startAll(wg, rs...)
wg.Wait()
}
func (r CombinedRoutine) Stop() {
// Stop attempts to gracefully stopping all routines. It attempts to collect all
// the errors returned from the routines, and respects the context deadline
// passed to it and gives up waiting when context deadline exceeded.
func (rs CombinedRoutine) Stop(ctx context.Context) error {
wg := &sync.WaitGroup{}
stopAll(wg, r...)
wg.Wait()
return stopAll(ctx, wg, rs...)
}
// LIFOStopRoutine is a list of routines which are started in unison, but stopped
@ -112,24 +160,45 @@ func (r CombinedRoutine) Stop() {
// primary service stops for a graceful shutdown.
type LIFOStopRoutine []Routine
func (r LIFOStopRoutine) Name() string { return "lifo" }
func (r LIFOStopRoutine) Start() { CombinedRoutine(r).Start() }
func (r LIFOStopRoutine) Stop() {
func (r LIFOStopRoutine) Stop(ctx context.Context) error {
var stopErr error
for i := len(r) - 1; i >= 0; i -= 1 {
r[i].Stop()
err := r[i].Stop(ctx)
if err != nil {
stopErr = errors.Append(stopErr, errors.Wrapf(err, "stop routine %q", r[i].Name()))
}
}
return stopErr
}
// NoopRoutine does nothing for start or stop.
func NoopRoutine() Routine {
return CallbackRoutine{}
// NoopRoutine return a background routine that does nothing for start or stop.
// If the name is empty, it will default to "noop".
func NoopRoutine(name string) Routine {
if name == "" {
name = "noop"
}
return CallbackRoutine{
NameFunc: func() string { return name },
}
}
// CallbackRoutine calls the StartFunc and StopFunc callbacks to implement a
// Routine. Each callback may be nil.
type CallbackRoutine struct {
NameFunc func() string
StartFunc func()
StopFunc func()
StopFunc func(ctx context.Context) error
}
func (r CallbackRoutine) Name() string {
if r.NameFunc != nil {
return r.NameFunc()
}
return "callback"
}
func (r CallbackRoutine) Start() {
@ -138,8 +207,9 @@ func (r CallbackRoutine) Start() {
}
}
func (r CallbackRoutine) Stop() {
func (r CallbackRoutine) Stop(ctx context.Context) error {
if r.StopFunc != nil {
r.StopFunc()
return r.StopFunc(ctx)
}
return nil
}

View File

@ -5,8 +5,13 @@ import (
"os"
"syscall"
"testing"
"time"
mockrequire "github.com/derision-test/go-mockgen/v2/testutil/require"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// Make the exiter a no-op in tests
@ -23,7 +28,8 @@ func TestMonitorBackgroundRoutinesSignal(t *testing.T) {
go func() {
defer close(unblocked)
monitorBackgroundRoutines(context.Background(), signals, r1, r2, r3)
err := monitorBackgroundRoutines(context.Background(), signals, r1, r2, r3)
require.NoError(t, err)
}()
signals <- syscall.SIGINT
@ -52,7 +58,8 @@ func TestMonitorBackgroundRoutinesContextCancel(t *testing.T) {
go func() {
defer close(unblocked)
monitorBackgroundRoutines(ctx, signals, r1, r2, r3)
err := monitorBackgroundRoutines(ctx, signals, r1, r2, r3)
assert.EqualError(t, err, "unable to stop routines gracefully: context canceled")
}()
cancel()
@ -68,18 +75,94 @@ func TestMonitorBackgroundRoutinesContextCancel(t *testing.T) {
}
}
func TestCombinedRoutine_Name(t *testing.T) {
r1 := NewMockRoutine()
r1.NameFunc.SetDefaultReturn("r1")
r2 := NewMockRoutine()
r2.NameFunc.SetDefaultReturn("r2")
rs := CombinedRoutine{r1, r2}
assert.Equal(t, `combined["r1" "r2"]`, rs.Name())
}
func TestCombinedRoutines(t *testing.T) {
t.Run("successful stop", func(t *testing.T) {
r1 := NewMockRoutine()
r2 := NewMockRoutine()
rs := CombinedRoutine{r1, r2}
rs.Start()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
defer cancel()
err := rs.Stop(ctx)
require.NoError(t, err)
mockrequire.Called(t, r1.StopFunc)
mockrequire.Called(t, r2.StopFunc)
})
t.Run("stop with error", func(t *testing.T) {
r1 := NewMockRoutine()
r2 := NewMockRoutine()
r2.NameFunc.SetDefaultReturn("mock")
r2.StopFunc.SetDefaultReturn(errors.New("stop error"))
rs := CombinedRoutine{r1, r2}
rs.Start()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
defer cancel()
err := rs.Stop(ctx)
assert.EqualError(t, err, `stop routine "mock": stop error`)
mockrequire.Called(t, r1.StopFunc)
mockrequire.Called(t, r2.StopFunc)
})
t.Run("stop with timeout", func(t *testing.T) {
r1 := NewMockRoutine()
r1.NameFunc.SetDefaultReturn("mock")
r1.StopFunc.SetDefaultReturn(errors.New("stop error"))
r2 := NewMockRoutine()
r2.StopFunc.PushHook(func(context.Context) error {
time.Sleep(100 * time.Millisecond)
return nil
})
rs := CombinedRoutine{r1, r2}
rs.Start()
// Context deadline is 50ms, which is half of the sleep time of r2.
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond))
defer cancel()
err := rs.Stop(ctx)
assert.EqualError(t, err, `unable to stop routines gracefully with partial errors: stop routine "mock": stop error: context deadline exceeded`)
// Although the caller doesn't wait, each routine should still be stopped if
// they get the chance. We wait long enough to avoid flakiness.
time.Sleep(100 * time.Millisecond)
mockrequire.Called(t, r1.StopFunc)
mockrequire.Called(t, r2.StopFunc)
})
}
func TestLIFOStopRoutine(t *testing.T) {
// use an unguarded slice because LIFOStopRoutine should only stop in sequence
var stopped []string
r1 := NewMockRoutine()
r1.StopFunc.PushHook(func() { stopped = append(stopped, "r1") })
r1.StopFunc.PushHook(func(context.Context) error {
stopped = append(stopped, "r1")
return nil
})
r2 := NewMockRoutine()
r2.StopFunc.PushHook(func() { stopped = append(stopped, "r2") })
r2.StopFunc.PushHook(func(context.Context) error {
stopped = append(stopped, "r2")
return nil
})
r3 := NewMockRoutine()
r3.StopFunc.PushHook(func() { stopped = append(stopped, "r3") })
r3.StopFunc.PushHook(func(context.Context) error {
stopped = append(stopped, "r3")
return nil
})
r := LIFOStopRoutine{r1, r2, r3}
r.Stop()
err := r.Stop(context.Background())
require.NoError(t, err)
// stops in reverse
assert.Equal(t, []string{"r3", "r2", "r1"}, stopped)
}

View File

@ -6,12 +6,18 @@
package background
import "sync"
import (
"context"
"sync"
)
// MockRoutine is a mock implementation of the Routine interface (from the
// package github.com/sourcegraph/sourcegraph/lib/background) used for unit
// testing.
type MockRoutine struct {
// NameFunc is an instance of a mock function object controlling the
// behavior of the method Name.
NameFunc *RoutineNameFunc
// StartFunc is an instance of a mock function object controlling the
// behavior of the method Start.
StartFunc *RoutineStartFunc
@ -24,13 +30,18 @@ type MockRoutine struct {
// return zero values for all results, unless overwritten.
func NewMockRoutine() *MockRoutine {
return &MockRoutine{
NameFunc: &RoutineNameFunc{
defaultHook: func() (r0 string) {
return
},
},
StartFunc: &RoutineStartFunc{
defaultHook: func() {
return
},
},
StopFunc: &RoutineStopFunc{
defaultHook: func() {
defaultHook: func(context.Context) (r0 error) {
return
},
},
@ -41,13 +52,18 @@ func NewMockRoutine() *MockRoutine {
// methods panic on invocation, unless overwritten.
func NewStrictMockRoutine() *MockRoutine {
return &MockRoutine{
NameFunc: &RoutineNameFunc{
defaultHook: func() string {
panic("unexpected invocation of MockRoutine.Name")
},
},
StartFunc: &RoutineStartFunc{
defaultHook: func() {
panic("unexpected invocation of MockRoutine.Start")
},
},
StopFunc: &RoutineStopFunc{
defaultHook: func() {
defaultHook: func(context.Context) error {
panic("unexpected invocation of MockRoutine.Stop")
},
},
@ -58,6 +74,9 @@ func NewStrictMockRoutine() *MockRoutine {
// methods delegate to the given implementation, unless overwritten.
func NewMockRoutineFrom(i Routine) *MockRoutine {
return &MockRoutine{
NameFunc: &RoutineNameFunc{
defaultHook: i.Name,
},
StartFunc: &RoutineStartFunc{
defaultHook: i.Start,
},
@ -67,6 +86,104 @@ func NewMockRoutineFrom(i Routine) *MockRoutine {
}
}
// RoutineNameFunc describes the behavior when the Name method of the parent
// MockRoutine instance is invoked.
type RoutineNameFunc struct {
defaultHook func() string
hooks []func() string
history []RoutineNameFuncCall
mutex sync.Mutex
}
// Name delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockRoutine) Name() string {
r0 := m.NameFunc.nextHook()()
m.NameFunc.appendCall(RoutineNameFuncCall{r0})
return r0
}
// SetDefaultHook sets function that is called when the Name method of the
// parent MockRoutine instance is invoked and the hook queue is empty.
func (f *RoutineNameFunc) SetDefaultHook(hook func() string) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Name method of the parent MockRoutine instance invokes the hook at the
// front of the queue and discards it. After the queue is empty, the default
// hook function is invoked for any future action.
func (f *RoutineNameFunc) PushHook(hook func() string) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *RoutineNameFunc) SetDefaultReturn(r0 string) {
f.SetDefaultHook(func() string {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *RoutineNameFunc) PushReturn(r0 string) {
f.PushHook(func() string {
return r0
})
}
func (f *RoutineNameFunc) nextHook() func() string {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *RoutineNameFunc) appendCall(r0 RoutineNameFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of RoutineNameFuncCall objects describing the
// invocations of this function.
func (f *RoutineNameFunc) History() []RoutineNameFuncCall {
f.mutex.Lock()
history := make([]RoutineNameFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// RoutineNameFuncCall is an object that describes an invocation of method
// Name on an instance of MockRoutine.
type RoutineNameFuncCall struct {
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 string
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c RoutineNameFuncCall) Args() []interface{} {
return []interface{}{}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c RoutineNameFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// RoutineStartFunc describes the behavior when the Start method of the
// parent MockRoutine instance is invoked.
type RoutineStartFunc struct {
@ -164,23 +281,23 @@ func (c RoutineStartFuncCall) Results() []interface{} {
// RoutineStopFunc describes the behavior when the Stop method of the parent
// MockRoutine instance is invoked.
type RoutineStopFunc struct {
defaultHook func()
hooks []func()
defaultHook func(context.Context) error
hooks []func(context.Context) error
history []RoutineStopFuncCall
mutex sync.Mutex
}
// Stop delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockRoutine) Stop() {
m.StopFunc.nextHook()()
m.StopFunc.appendCall(RoutineStopFuncCall{})
return
func (m *MockRoutine) Stop(v0 context.Context) error {
r0 := m.StopFunc.nextHook()(v0)
m.StopFunc.appendCall(RoutineStopFuncCall{v0, r0})
return r0
}
// SetDefaultHook sets function that is called when the Stop method of the
// parent MockRoutine instance is invoked and the hook queue is empty.
func (f *RoutineStopFunc) SetDefaultHook(hook func()) {
func (f *RoutineStopFunc) SetDefaultHook(hook func(context.Context) error) {
f.defaultHook = hook
}
@ -188,7 +305,7 @@ func (f *RoutineStopFunc) SetDefaultHook(hook func()) {
// Stop method of the parent MockRoutine instance invokes the hook at the
// front of the queue and discards it. After the queue is empty, the default
// hook function is invoked for any future action.
func (f *RoutineStopFunc) PushHook(hook func()) {
func (f *RoutineStopFunc) PushHook(hook func(context.Context) error) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
@ -196,20 +313,20 @@ func (f *RoutineStopFunc) PushHook(hook func()) {
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *RoutineStopFunc) SetDefaultReturn() {
f.SetDefaultHook(func() {
return
func (f *RoutineStopFunc) SetDefaultReturn(r0 error) {
f.SetDefaultHook(func(context.Context) error {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *RoutineStopFunc) PushReturn() {
f.PushHook(func() {
return
func (f *RoutineStopFunc) PushReturn(r0 error) {
f.PushHook(func(context.Context) error {
return r0
})
}
func (f *RoutineStopFunc) nextHook() func() {
func (f *RoutineStopFunc) nextHook() func(context.Context) error {
f.mutex.Lock()
defer f.mutex.Unlock()
@ -241,16 +358,23 @@ func (f *RoutineStopFunc) History() []RoutineStopFuncCall {
// RoutineStopFuncCall is an object that describes an invocation of method
// Stop on an instance of MockRoutine.
type RoutineStopFuncCall struct{}
type RoutineStopFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c RoutineStopFuncCall) Args() []interface{} {
return []interface{}{}
return []interface{}{c.Arg0}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c RoutineStopFuncCall) Results() []interface{} {
return []interface{}{}
return []interface{}{c.Result0}
}

View File

@ -71,6 +71,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/derision-test/go-mockgen/v2 v2.0.1 h1:2rzcJH6DT2jVfOt2spkmRJ8J6vQr9RQy3pXlQZ2pxNs=
github.com/derision-test/go-mockgen/v2 v2.0.1/go.mod h1:1FvbovTCW7wmMxpBu2LtbhRgbX36MC5LLa6dw1GIhGc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=

View File

@ -123,6 +123,9 @@ func Start[
log.Int("port", ctr.Port),
log.Bool("msp", ctr.MSP),
log.Bool("sentry", sentryEnabled))
background.Monitor(ctx, routine)
err = background.Monitor(ctx, routine)
if err != nil {
startLogger.Error("error stopping service routine", log.Error(err))
}
startLogger.Info("service stopped")
}