diff --git a/cmd/cody-gateway/internal/actor/source.go b/cmd/cody-gateway/internal/actor/source.go index fb5e3a075fa..1ee7afae5ad 100644 --- a/cmd/cody-gateway/internal/actor/source.go +++ b/cmd/cody-gateway/internal/actor/source.go @@ -201,6 +201,10 @@ type redisLockedBackgroundRoutine struct { routine goroutine.BackgroundRoutine } +func (s *redisLockedBackgroundRoutine) Name() string { + return s.routine.Name() +} + func (s *redisLockedBackgroundRoutine) Start() { s.logger.Info("Starting background sync routine") @@ -220,10 +224,10 @@ func (s *redisLockedBackgroundRoutine) Start() { s.routine.Start() } -func (s *redisLockedBackgroundRoutine) Stop() { +func (s *redisLockedBackgroundRoutine) Stop(ctx context.Context) error { start := time.Now() s.logger.Info("Stopping background sync routine") - s.routine.Stop() + stopErr := s.routine.Stop(ctx) // If we have the lock, release it and let somebody else work if expire := s.rmux.Until(); !expire.IsZero() && expire.After(time.Now()) { @@ -247,6 +251,7 @@ func (s *redisLockedBackgroundRoutine) Stop() { s.logger.Info("Background sync successfully stopped", log.Duration("elapsed", time.Since(start))) + return stopErr } // sourcesSyncHandler is a handler for NewPeriodicGoroutine diff --git a/cmd/cody-gateway/internal/actor/source_test.go b/cmd/cody-gateway/internal/actor/source_test.go index c9500d39ab6..3d1df1c8b74 100644 --- a/cmd/cody-gateway/internal/actor/source_test.go +++ b/cmd/cody-gateway/internal/actor/source_test.go @@ -74,7 +74,8 @@ func TestSourcesWorkers(t *testing.T) { w := NewSources(s1).Worker(observation.NewContext(logger), sourceWorkerMutex1, time.Millisecond) go func() { <-stop1 - w.Stop() + err := w.Stop(context.Background()) + require.NoError(t, err) }() w.Start() }) @@ -89,7 +90,8 @@ func TestSourcesWorkers(t *testing.T) { w := NewSources(s2).Worker(observation.NewContext(logger), sourceWorkerMutex, time.Millisecond) go func() { <-stop2 - w.Stop() + err := w.Stop(context.Background()) + require.NoError(t, err) }() w.Start() }) diff --git a/cmd/cody-gateway/internal/events/buffered.go b/cmd/cody-gateway/internal/events/buffered.go index 534c0abdfe7..bbded038972 100644 --- a/cmd/cody-gateway/internal/events/buffered.go +++ b/cmd/cody-gateway/internal/events/buffered.go @@ -132,7 +132,11 @@ func (l *BufferedLogger) LogEvent(spanCtx context.Context, event Event) error { } } -// Start begins working by procssing the logger's buffer, blocking until stop +func (l *BufferedLogger) Name() string { + return "BufferedLogger" +} + +// Start begins working by processing the logger's buffer, blocking until stop // is called and the backlog is cleared. func (l *BufferedLogger) Start() { var wg sync.WaitGroup @@ -157,7 +161,7 @@ func (l *BufferedLogger) Start() { } // Stop stops buffered logger's background processing job and flushes its buffer. -func (l *BufferedLogger) Stop() { +func (l *BufferedLogger) Stop(context.Context) error { l.bufferClosed.Store(true) close(l.bufferC) l.log.Info("buffer closed - waiting for events to flush") @@ -175,4 +179,5 @@ func (l *BufferedLogger) Stop() { l.log.Error("failed to shut down within shutdown deadline", log.Error(errors.Newf("unflushed events: %d", len(l.bufferC)))) // real error for Sentry } + return nil } diff --git a/cmd/cody-gateway/internal/events/buffered_test.go b/cmd/cody-gateway/internal/events/buffered_test.go index ff76bc9230b..754d56c8dc2 100644 --- a/cmd/cody-gateway/internal/events/buffered_test.go +++ b/cmd/cody-gateway/internal/events/buffered_test.go @@ -43,7 +43,8 @@ func TestBufferedLogger(t *testing.T) { // Stop the worker and wait for it to finish so that events flush before // making any assertions - b.Stop() + err := b.Stop(ctx) + require.NoError(t, err) wg.Wait() autogold.Expect([]string{"bar", "baz", "foo"}).Equal(t, asSortedIdentifiers(handler.ReceivedEvents)) @@ -100,7 +101,8 @@ func TestBufferedLogger(t *testing.T) { // Indicate close and stop the worker so that the buffer can flush close(blockEventSubmissionC) - b.Stop() + err = b.Stop(ctx) + require.NoError(t, err) wg.Wait() // All backlogged events get submitted. Note the "buffer-full" event is @@ -133,7 +135,8 @@ func TestBufferedLogger(t *testing.T) { assert.NoError(t, b.LogEvent(ctx, events.Event{Identifier: "foo"})) // Stop the worker and wait for it to finish - b.Stop() + err := b.Stop(ctx) + require.NoError(t, err) wg.Wait() // Submit an additional event - this should immediately attempt to diff --git a/cmd/cody-gateway/shared/main.go b/cmd/cody-gateway/shared/main.go index df1a7f0d60f..cb1df77c03e 100644 --- a/cmd/cody-gateway/shared/main.go +++ b/cmd/cody-gateway/shared/main.go @@ -235,9 +235,7 @@ func Main(ctx context.Context, obctx *observation.Context, ready service.ReadyFu backgroundRoutines = append(backgroundRoutines, w) } // Block until done - goroutine.MonitorBackgroundRoutines(ctx, backgroundRoutines...) - - return nil + return goroutine.MonitorBackgroundRoutines(ctx, backgroundRoutines...) } func newRedisStore(store redispool.KeyValue) limiter.RedisStore { diff --git a/cmd/embeddings/shared/main.go b/cmd/embeddings/shared/main.go index c8bf33af325..9983417e61e 100644 --- a/cmd/embeddings/shared/main.go +++ b/cmd/embeddings/shared/main.go @@ -91,9 +91,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic // Mark health server as ready and go! ready() - goroutine.MonitorBackgroundRoutines(ctx, server) - - return nil + return goroutine.MonitorBackgroundRoutines(ctx, server) } func NewHandler( diff --git a/cmd/executor/internal/run/run.go b/cmd/executor/internal/run/run.go index 8eb40054e52..86a77a8416a 100644 --- a/cmd/executor/internal/run/run.go +++ b/cmd/executor/internal/run/run.go @@ -127,8 +127,7 @@ func StandaloneRun(ctx context.Context, runner util.CmdRunner, logger log.Logger cancel() }() - goroutine.MonitorBackgroundRoutines(ctx, routines...) - return nil + return goroutine.MonitorBackgroundRoutines(ctx, routines...) } func mustRegisterVMCountMetric(observationCtx *observation.Context, runner util.CmdRunner, logger log.Logger, prefix string) { diff --git a/cmd/frontend/internal/cli/autoupgrade_servers.go b/cmd/frontend/internal/cli/autoupgrade_servers.go index cb53c5c95a7..af62d5ebb8b 100644 --- a/cmd/frontend/internal/cli/autoupgrade_servers.go +++ b/cmd/frontend/internal/cli/autoupgrade_servers.go @@ -9,6 +9,7 @@ import ( gcontext "github.com/gorilla/context" "github.com/gorilla/mux" + "github.com/sourcegraph/log" "github.com/sourcegraph/sourcegraph/cmd/frontend/internal/app/assetsutil" "github.com/sourcegraph/sourcegraph/cmd/frontend/internal/httpapi" @@ -70,7 +71,12 @@ func serveInternalServer(obsvCtx *observation.Context) (context.CancelFunc, erro confServer.Start() }) - return confServer.Stop, nil + return func() { + err := confServer.Stop(context.Background()) + if err != nil { + obsvCtx.Logger.Error("failed to stop conf server", log.Error(err)) + } + }, nil } func serveExternalServer(obsvCtx *observation.Context, sqlDB *sql.DB, db database.DB) (context.CancelFunc, error) { @@ -100,5 +106,10 @@ func serveExternalServer(obsvCtx *observation.Context, sqlDB *sql.DB, db databas progressServer.Start() }) - return progressServer.Stop, nil + return func() { + err := progressServer.Stop(context.Background()) + if err != nil { + obsvCtx.Logger.Error("failed to stop progress server", log.Error(err)) + } + }, nil } diff --git a/cmd/frontend/internal/cli/serve_cmd.go b/cmd/frontend/internal/cli/serve_cmd.go index 2dbcc20cfcf..19cf3d958d7 100644 --- a/cmd/frontend/internal/cli/serve_cmd.go +++ b/cmd/frontend/internal/cli/serve_cmd.go @@ -321,8 +321,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic logger.Info(fmt.Sprintf("✱ Sourcegraph is ready at: %s", globals.ExternalURL())) ready() - goroutine.MonitorBackgroundRoutines(context.Background(), routines...) - return nil + return goroutine.MonitorBackgroundRoutines(context.Background(), routines...) } func makeExternalAPI(db database.DB, logger sglog.Logger, schema *graphql.Schema, enterprise enterprise.Services, rateLimiter graphqlbackend.LimitWatcher) (goroutine.BackgroundRoutine, error) { diff --git a/cmd/frontend/internal/httpapi/releasecache/http.go b/cmd/frontend/internal/httpapi/releasecache/http.go index 8ffc3b942dc..93740d5cfd9 100644 --- a/cmd/frontend/internal/httpapi/releasecache/http.go +++ b/cmd/frontend/internal/httpapi/releasecache/http.go @@ -58,7 +58,10 @@ func NewHandler(logger log.Logger) http.Handler { // If we already have an updater goroutine running, we need to stop it. if handler.updater != nil { - handler.updater.Stop() + err := handler.updater.Stop(ctx) + if err != nil { + logger.Error("failed to stop updater routine", log.Error(err)) + } handler.updater = nil } @@ -78,7 +81,12 @@ func NewHandler(logger log.Logger) http.Handler { goroutine.WithDescription("caches src-cli versions polled periodically"), goroutine.WithInterval(config.interval), ) - go goroutine.MonitorBackgroundRoutines(ctx, handler.updater) + go func() { + err := goroutine.MonitorBackgroundRoutines(ctx, handler.updater) + if err != nil { + logger.Error("error monitoring updater routine", log.Error(err)) + } + }() handler.rc = rc handler.webhookSecret = config.webhookSecret @@ -145,7 +153,7 @@ func (h *handler) handleWebhook(w http.ResponseWriter, r *http.Request) { type signatureValidator func(signature string, payload []byte, secret []byte) error func (h *handler) doHandleWebhook(w http.ResponseWriter, r *http.Request, signatureValidator signatureValidator) { - defer r.Body.Close() + defer func() { _ = r.Body.Close() }() payload, err := io.ReadAll(r.Body) if err != nil { h.logger.Warn("error reading payload", log.Error(err)) diff --git a/cmd/gitserver/shared/shared.go b/cmd/gitserver/shared/shared.go index bcdc5464a9d..cd78a00515d 100644 --- a/cmd/gitserver/shared/shared.go +++ b/cmd/gitserver/shared/shared.go @@ -227,7 +227,10 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic ready() // Launch all routines! - goroutine.MonitorBackgroundRoutines(ctx, routines...) + err = goroutine.MonitorBackgroundRoutines(ctx, routines...) + if err != nil { + logger.Error("error monitoring background routines", log.Error(err)) + } // The most important thing this does is kill all our clones. If we just // shutdown they will be orphaned and continue running. diff --git a/cmd/msp-example/internal/example/example.go b/cmd/msp-example/internal/example/example.go index 3c800caa8df..aedc7a55159 100644 --- a/cmd/msp-example/internal/example/example.go +++ b/cmd/msp-example/internal/example/example.go @@ -166,18 +166,21 @@ type httpRoutine struct { *http.Server } +func (s *httpRoutine) Name() string { return "http" } + func (s *httpRoutine) Start() { if err := s.Server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { s.log.Error("error stopping server", log.Error(err)) } } -func (s *httpRoutine) Stop() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func (s *httpRoutine) Stop(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := s.Server.Shutdown(ctx); err != nil { - s.log.Error("error shutting down server", log.Error(err)) - } else { - s.log.Info("server stopped") + return errors.Wrap(err, "shutdown") } + + s.log.Info("server stopped") + return nil } diff --git a/cmd/precise-code-intel-worker/shared/shared.go b/cmd/precise-code-intel-worker/shared/shared.go index 1eb285b9cab..13cb9cf9ae0 100644 --- a/cmd/precise-code-intel-worker/shared/shared.go +++ b/cmd/precise-code-intel-worker/shared/shared.go @@ -94,9 +94,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic }) // Go! - goroutine.MonitorBackgroundRoutines(ctx, append(worker, server)...) - - return nil + return goroutine.MonitorBackgroundRoutines(ctx, append(worker, server)...) } func mustInitializeDB(observationCtx *observation.Context) *sql.DB { diff --git a/cmd/repo-updater/internal/scheduler/scheduler.go b/cmd/repo-updater/internal/scheduler/scheduler.go index 352313c9dc0..217661ac560 100644 --- a/cmd/repo-updater/internal/scheduler/scheduler.go +++ b/cmd/repo-updater/internal/scheduler/scheduler.go @@ -93,6 +93,10 @@ func NewUpdateScheduler(logger log.Logger, db database.DB, gitserverClient gitse } } +func (s *UpdateScheduler) Name() string { + return "UpdateScheduler" +} + func (s *UpdateScheduler) Start() { // Make sure the update scheduler acts as an internal actor, so it can see all // repos. @@ -103,10 +107,11 @@ func (s *UpdateScheduler) Start() { go s.runScheduleLoop(ctx) } -func (s *UpdateScheduler) Stop() { +func (s *UpdateScheduler) Stop(context.Context) error { if s.cancelCtx != nil { s.cancelCtx() } + return nil } // runScheduleLoop starts the loop that schedules updates by enqueuing them into the updateQueue. diff --git a/cmd/repo-updater/shared/main.go b/cmd/repo-updater/shared/main.go index b9a5051193e..e0ed2c432e5 100644 --- a/cmd/repo-updater/shared/main.go +++ b/cmd/repo-updater/shared/main.go @@ -180,9 +180,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic // after being unblocked. ready() - goroutine.MonitorBackgroundRoutines(ctx, routines...) - - return nil + return goroutine.MonitorBackgroundRoutines(ctx, routines...) } func getDB(observationCtx *observation.Context) (database.DB, error) { diff --git a/cmd/symbols/shared/main.go b/cmd/symbols/shared/main.go index 421a0df9335..dbea6a8c15c 100644 --- a/cmd/symbols/shared/main.go +++ b/cmd/symbols/shared/main.go @@ -109,9 +109,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic // Mark health server as ready and go! ready() - goroutine.MonitorBackgroundRoutines(ctx, routines...) - - return nil + return goroutine.MonitorBackgroundRoutines(ctx, routines...) } func mustInitializeFrontendDB(observationCtx *observation.Context) *sql.DB { diff --git a/cmd/syntactic-code-intel-worker/shared/shared.go b/cmd/syntactic-code-intel-worker/shared/shared.go index 92ec1532211..becce32e5d5 100644 --- a/cmd/syntactic-code-intel-worker/shared/shared.go +++ b/cmd/syntactic-code-intel-worker/shared/shared.go @@ -3,7 +3,6 @@ package shared import ( "context" "database/sql" - "net/http" "time" @@ -50,9 +49,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic }) // Go! - goroutine.MonitorBackgroundRoutines(ctx, server, indexingWorker) - - return nil + return goroutine.MonitorBackgroundRoutines(ctx, server, indexingWorker) } func initDB(observationCtx *observation.Context, name string) *sql.DB { diff --git a/cmd/telemetry-gateway/service/service.go b/cmd/telemetry-gateway/service/service.go index 95987d7f997..2b2ac702437 100644 --- a/cmd/telemetry-gateway/service/service.go +++ b/cmd/telemetry-gateway/service/service.go @@ -130,9 +130,9 @@ func (Service) Initialize(ctx context.Context, logger log.Logger, contract runti ), background.CallbackRoutine{ // No Start - serving is handled by httpserver - StopFunc: func() { + StopFunc: func(ctx context.Context) error { grpcServer.GracefulStop() - eventsTopic.Stop() + return eventsTopic.Stop(ctx) }, }, }, nil diff --git a/cmd/worker/internal/authz/perms_syncer_worker_test.go b/cmd/worker/internal/authz/perms_syncer_worker_test.go index 00210bc89c1..66c02be2e28 100644 --- a/cmd/worker/internal/authz/perms_syncer_worker_test.go +++ b/cmd/worker/internal/authz/perms_syncer_worker_test.go @@ -105,7 +105,10 @@ func TestPermsSyncerWorker_RepoSyncJobs(t *testing.T) { workerStore := makeStore(observationCtx, db.Handle(), syncTypeRepo) worker := MakeTestWorker(ctx, observationCtx, workerStore, dummySyncer, syncTypeRepo, syncJobsStore) go worker.Start() - t.Cleanup(worker.Stop) + t.Cleanup(func() { + err := worker.Stop(ctx) + require.NoError(t, err) + }) // Adding repo perms sync jobs. err = syncJobsStore.CreateRepoSyncJob(ctx, api.RepoID(1), database.PermissionSyncJobOpts{Reason: database.ReasonManualRepoSync, Priority: database.MediumPriorityPermissionsSync, TriggeredByUserID: user1.ID}) @@ -248,7 +251,10 @@ func TestPermsSyncerWorker_UserSyncJobs(t *testing.T) { workerStore := makeStore(observationCtx, db.Handle(), syncTypeUser) worker := MakeTestWorker(ctx, observationCtx, workerStore, dummySyncer, syncTypeUser, syncJobsStore) go worker.Start() - t.Cleanup(worker.Stop) + t.Cleanup(func() { + err := worker.Stop(ctx) + require.NoError(t, err) + }) // Adding user perms sync jobs. err = syncJobsStore.CreateUserSyncJob(ctx, user1.ID, diff --git a/cmd/worker/internal/codygateway/usageworker.go b/cmd/worker/internal/codygateway/usageworker.go index 6854eac8ab5..ee9148a4ffa 100644 --- a/cmd/worker/internal/codygateway/usageworker.go +++ b/cmd/worker/internal/codygateway/usageworker.go @@ -44,6 +44,10 @@ type usageRoutine struct { cancel context.CancelFunc } +func (j *usageRoutine) Name() string { + return "CodyGatewayUsageWorker" +} + func (j *usageRoutine) Start() { var ctx context.Context ctx, j.cancel = context.WithCancel(context.Background()) @@ -100,8 +104,9 @@ func (j *usageRoutine) Start() { }) } -func (j *usageRoutine) Stop() { +func (j *usageRoutine) Stop(context.Context) error { if j.cancel != nil { j.cancel() } + return nil } diff --git a/cmd/worker/internal/licensecheck/check.go b/cmd/worker/internal/licensecheck/check.go index bc6d510f079..eaeda2da307 100644 --- a/cmd/worker/internal/licensecheck/check.go +++ b/cmd/worker/internal/licensecheck/check.go @@ -199,6 +199,11 @@ func StartLicenseCheck(originalCtx context.Context, logger log.Logger, db databa goroutine.WithInterval(licensing.LicenseCheckInterval), goroutine.WithInitialDelay(initialWaitInterval), ) - go goroutine.MonitorBackgroundRoutines(ctxWithCancel, routine) + go func() { + err := goroutine.MonitorBackgroundRoutines(ctxWithCancel, routine) + if err != nil { + logger.Error("error monitoring background routines", log.Error(err)) + } + }() }) } diff --git a/cmd/worker/internal/licensecheck/worker.go b/cmd/worker/internal/licensecheck/worker.go index 7a85a844562..fb1d904e9de 100644 --- a/cmd/worker/internal/licensecheck/worker.go +++ b/cmd/worker/internal/licensecheck/worker.go @@ -6,14 +6,13 @@ import ( "github.com/sourcegraph/log" "github.com/sourcegraph/sourcegraph/cmd/worker/job" + workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/dotcom" "github.com/sourcegraph/sourcegraph/internal/env" "github.com/sourcegraph/sourcegraph/internal/goroutine" "github.com/sourcegraph/sourcegraph/internal/licensing" "github.com/sourcegraph/sourcegraph/internal/observation" - - workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db" ) type licenseWorker struct{} @@ -51,6 +50,10 @@ type licenseChecksWrapper struct { db database.DB } +func (l *licenseChecksWrapper) Name() string { + return "LicenseChecks" +} + func (l *licenseChecksWrapper) Start() { goroutine.Go(func() { licensing.StartMaxUserCount(l.logger, &usersStore{ @@ -62,9 +65,7 @@ func (l *licenseChecksWrapper) Start() { } } -func (l *licenseChecksWrapper) Stop() { - // no-op -} +func (l *licenseChecksWrapper) Stop(context.Context) error { return nil } type usersStore struct { db database.DB diff --git a/cmd/worker/internal/search/exhaustive_search_test.go b/cmd/worker/internal/search/exhaustive_search_test.go index 8584ce38e15..3fa92ca654a 100644 --- a/cmd/worker/internal/search/exhaustive_search_test.go +++ b/cmd/worker/internal/search/exhaustive_search_test.go @@ -102,7 +102,10 @@ func TestExhaustiveSearch(t *testing.T) { require.NoError(err) for _, routine := range routines { go routine.Start() - defer routine.Stop() + defer func() { + err := routine.Stop(context.Background()) + require.NoError(err) + }() } require.Eventually(func() bool { return !searchJob.hasWork(workerCtx) diff --git a/cmd/worker/shared/main.go b/cmd/worker/shared/main.go index 8bc48d42cf9..9bf5f3faa68 100644 --- a/cmd/worker/shared/main.go +++ b/cmd/worker/shared/main.go @@ -205,8 +205,7 @@ func Start(ctx context.Context, observationCtx *observation.Context, ready servi allRoutines = append(allRoutines, r.Routine) } - goroutine.MonitorBackgroundRoutines(ctx, allRoutines...) - return nil + return goroutine.MonitorBackgroundRoutines(ctx, allRoutines...) } // loadConfigs calls Load on the configs of each of the jobs registered in this binary. diff --git a/dev/build-tracker/BUILD.bazel b/dev/build-tracker/BUILD.bazel index 7115858989a..6bf58d3adb9 100644 --- a/dev/build-tracker/BUILD.bazel +++ b/dev/build-tracker/BUILD.bazel @@ -63,6 +63,7 @@ go_test( "@com_github_buildkite_go_buildkite_v3//buildkite", "@com_github_gorilla_mux//:mux", "@com_github_sourcegraph_log//logtest", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_google_cloud_go_bigquery//:bigquery", ], diff --git a/dev/build-tracker/main.go b/dev/build-tracker/main.go index aa92698e7d2..e0db66162a6 100644 --- a/dev/build-tracker/main.go +++ b/dev/build-tracker/main.go @@ -69,7 +69,7 @@ func NewServer(addr string, logger log.Logger, c config.Config, bqWriter BigQuer bkClient: buildkite.NewClient(bk.Client()), } - // Register routes the the server will be responding too + // Register routes the server will be responding too r := mux.NewRouter() r.Path("/buildkite").HandlerFunc(server.handleEvent).Methods(http.MethodPost) r.Path("/-/healthz").HandlerFunc(server.handleHealthz).Methods(http.MethodGet) @@ -85,20 +85,23 @@ func NewServer(addr string, logger log.Logger, c config.Config, bqWriter BigQuer return server } +func (s *Server) Name() string { return "build-tracker" } + func (s *Server) Start() { if err := s.http.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { s.logger.Error("error stopping server", log.Error(err)) } } -func (s *Server) Stop() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func (s *Server) Stop(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := s.http.Shutdown(ctx); err != nil { - s.logger.Error("error shutting down server", log.Error(err)) - } else { - s.logger.Info("server stopped") + return errors.Wrap(err, "shutdown server") } + + s.logger.Info("server stopped") + return nil } func (s *Server) handleGetBuild(w http.ResponseWriter, req *http.Request) { @@ -116,7 +119,6 @@ func (s *Server) handleGetBuild(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusUnauthorized) return } - } vars := mux.Vars(req) diff --git a/dev/build-tracker/server_test.go b/dev/build-tracker/server_test.go index cba313316a8..05094974e52 100644 --- a/dev/build-tracker/server_test.go +++ b/dev/build-tracker/server_test.go @@ -13,6 +13,7 @@ import ( "github.com/buildkite/go-buildkite/v3/buildkite" "github.com/gorilla/mux" "github.com/sourcegraph/log/logtest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/sourcegraph/sourcegraph/dev/build-tracker/build" @@ -145,7 +146,13 @@ func TestOldBuildsGetDeleted(t *testing.T) { server.store.Set(b) ctx, cancel := context.WithCancel(context.Background()) - go goroutine.MonitorBackgroundRoutines(ctx, deleteOldBuilds(logger, server.store, 10*time.Millisecond, 24*time.Hour)) + go func() { + err := goroutine.MonitorBackgroundRoutines( + ctx, + deleteOldBuilds(logger, server.store, 10*time.Millisecond, 24*time.Hour), + ) + assert.EqualError(t, err, "unable to stop routines gracefully: context canceled") + }() time.Sleep(20 * time.Millisecond) cancel() @@ -167,7 +174,13 @@ func TestOldBuildsGetDeleted(t *testing.T) { server.store.Set(b) ctx, cancel := context.WithCancel(context.Background()) - go goroutine.MonitorBackgroundRoutines(ctx, deleteOldBuilds(logger, server.store, 10*time.Millisecond, 24*time.Hour)) + go func() { + err := goroutine.MonitorBackgroundRoutines( + ctx, + deleteOldBuilds(logger, server.store, 10*time.Millisecond, 24*time.Hour), + ) + assert.EqualError(t, err, "unable to stop routines gracefully: context canceled") + }() time.Sleep(20 * time.Millisecond) cancel() diff --git a/dev/linearhooks/internal/service/service.go b/dev/linearhooks/internal/service/service.go index 437e74dc0da..32727986796 100644 --- a/dev/linearhooks/internal/service/service.go +++ b/dev/linearhooks/internal/service/service.go @@ -89,18 +89,23 @@ type httpRoutine struct { *http.Server } +func (s *httpRoutine) Name() string { + return "http" +} + func (s *httpRoutine) Start() { if err := s.Server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { s.log.Error("error stopping server", log.Error(err)) } } -func (s *httpRoutine) Stop() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func (s *httpRoutine) Stop(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := s.Server.Shutdown(ctx); err != nil { s.log.Error("error shutting down server", log.Error(err)) } else { s.log.Info("server stopped") } + return nil } diff --git a/internal/batches/scheduler/scheduler.go b/internal/batches/scheduler/scheduler.go index 9d23b97c932..0c9d87c4256 100644 --- a/internal/batches/scheduler/scheduler.go +++ b/internal/batches/scheduler/scheduler.go @@ -100,12 +100,13 @@ func (s *Scheduler) Start() { } } -func (s *Scheduler) Stop() { +func (s *Scheduler) Stop(context.Context) error { if s.recorder != nil { go s.recorder.LogStop(s) } s.done <- struct{}{} close(s.done) + return nil } func (s *Scheduler) enqueueChangeset() error { diff --git a/internal/batches/syncer/syncer.go b/internal/batches/syncer/syncer.go index 5e40d44d1aa..3ddef84e7b0 100644 --- a/internal/batches/syncer/syncer.go +++ b/internal/batches/syncer/syncer.go @@ -68,6 +68,10 @@ func NewSyncRegistry(ctx context.Context, observationCtx *observation.Context, b } } +func (s *SyncRegistry) Name() string { + return "SyncRegistry" +} + func (s *SyncRegistry) Start() { // Fetch initial list of syncers. if err := s.syncCodeHosts(s.ctx); err != nil { @@ -88,11 +92,15 @@ func (s *SyncRegistry) Start() { goroutine.WithInterval(externalServiceSyncerInterval), ) - goroutine.MonitorBackgroundRoutines(s.ctx, externalServiceSyncer) + err := goroutine.MonitorBackgroundRoutines(s.ctx, externalServiceSyncer) + if err != nil { + s.logger.Error("error monitoring background routines", log.Error(err)) + } } -func (s *SyncRegistry) Stop() { +func (s *SyncRegistry) Stop(context.Context) error { s.cancel() + return nil } // EnqueueChangesetSyncs will enqueue the changesets with the supplied ids for high priority syncing. diff --git a/internal/database/migration/multiversion/run.go b/internal/database/migration/multiversion/run.go index abb3b9f0ce6..b0d8ee87b0c 100644 --- a/internal/database/migration/multiversion/run.go +++ b/internal/database/migration/multiversion/run.go @@ -210,7 +210,12 @@ func RunOutOfBandMigrations( } go runner.StartPartial(ids) - defer runner.Stop() + defer func() { + err := runner.Stop(ctx) + if err != nil { + out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Failed to stop out of band migrations: %s", err)) + } + }() defer func() { if err == nil { out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Out of band migrations complete")) diff --git a/internal/debugserver/debug.go b/internal/debugserver/debug.go index 32c4495d6e2..19407925ab0 100644 --- a/internal/debugserver/debug.go +++ b/internal/debugserver/debug.go @@ -84,7 +84,7 @@ type Dumper interface { // Any extra endpoints supplied will be registered via their own declared path. func NewServerRoutine(ready <-chan struct{}, extra ...Endpoint) goroutine.BackgroundRoutine { if addr == "" { - return goroutine.NoopRoutine() + return goroutine.NoopRoutine("noop server") } handler := httpserver.NewHandler(func(router *mux.Router) { diff --git a/internal/goroutine/BUILD.bazel b/internal/goroutine/BUILD.bazel index 7ca6653c91e..ec79c4bc4f8 100644 --- a/internal/goroutine/BUILD.bazel +++ b/internal/goroutine/BUILD.bazel @@ -37,6 +37,7 @@ go_test( deps = [ "//lib/errors", "@com_github_derision_test_glock//:glock", + "@com_github_stretchr_testify//require", ], ) diff --git a/internal/goroutine/background.go b/internal/goroutine/background.go index 9b30669e6c0..f1d04e10909 100644 --- a/internal/goroutine/background.go +++ b/internal/goroutine/background.go @@ -35,5 +35,6 @@ var MonitorBackgroundRoutines = background.Monitor // CombinedRoutine is a list of routines which are started and stopped in unison. type CombinedRoutine = background.CombinedRoutine -// NoopRoutine does nothing for start or stop. +// NoopRoutine return a background routine that does nothing for start or stop. +// If the name is empty, it will default to "noop". var NoopRoutine = background.NoopRoutine diff --git a/internal/goroutine/example_test.go b/internal/goroutine/example_test.go index bfe8d63f967..410967a5741 100644 --- a/internal/goroutine/example_test.go +++ b/internal/goroutine/example_test.go @@ -10,6 +10,10 @@ type exampleRoutine struct { done chan struct{} } +func (m *exampleRoutine) Name() string { + return "exampleRoutine" +} + func (m *exampleRoutine) Start() { for { select { @@ -24,8 +28,9 @@ func (m *exampleRoutine) Start() { } } -func (m *exampleRoutine) Stop() { +func (m *exampleRoutine) Stop(context.Context) error { m.done <- struct{}{} + return nil } func ExampleBackgroundRoutine() { @@ -35,7 +40,12 @@ func ExampleBackgroundRoutine() { ctx, cancel := context.WithCancel(context.Background()) - go MonitorBackgroundRoutines(ctx, r) + go func() { + err := MonitorBackgroundRoutines(ctx, r) + if err != nil { + fmt.Printf("error: %v\n", err) + } + }() time.Sleep(500 * time.Millisecond) cancel() @@ -58,7 +68,12 @@ func ExamplePeriodicGoroutine() { WithInterval(200*time.Millisecond), ) - go MonitorBackgroundRoutines(ctx, r) + go func() { + err := MonitorBackgroundRoutines(ctx, r) + if err != nil { + fmt.Printf("error: %v\n", err) + } + }() time.Sleep(500 * time.Millisecond) cancel() diff --git a/internal/goroutine/mocks_test.go b/internal/goroutine/mocks_test.go index 659b5529e99..8c55ff503a4 100644 --- a/internal/goroutine/mocks_test.go +++ b/internal/goroutine/mocks_test.go @@ -16,6 +16,9 @@ import ( // github.com/sourcegraph/sourcegraph/internal/goroutine) used for unit // testing. type MockBackgroundRoutine struct { + // NameFunc is an instance of a mock function object controlling the + // behavior of the method Name. + NameFunc *BackgroundRoutineNameFunc // StartFunc is an instance of a mock function object controlling the // behavior of the method Start. StartFunc *BackgroundRoutineStartFunc @@ -29,13 +32,18 @@ type MockBackgroundRoutine struct { // overwritten. func NewMockBackgroundRoutine() *MockBackgroundRoutine { return &MockBackgroundRoutine{ + NameFunc: &BackgroundRoutineNameFunc{ + defaultHook: func() (r0 string) { + return + }, + }, StartFunc: &BackgroundRoutineStartFunc{ defaultHook: func() { return }, }, StopFunc: &BackgroundRoutineStopFunc{ - defaultHook: func() { + defaultHook: func(context.Context) (r0 error) { return }, }, @@ -47,13 +55,18 @@ func NewMockBackgroundRoutine() *MockBackgroundRoutine { // overwritten. func NewStrictMockBackgroundRoutine() *MockBackgroundRoutine { return &MockBackgroundRoutine{ + NameFunc: &BackgroundRoutineNameFunc{ + defaultHook: func() string { + panic("unexpected invocation of MockBackgroundRoutine.Name") + }, + }, StartFunc: &BackgroundRoutineStartFunc{ defaultHook: func() { panic("unexpected invocation of MockBackgroundRoutine.Start") }, }, StopFunc: &BackgroundRoutineStopFunc{ - defaultHook: func() { + defaultHook: func(context.Context) error { panic("unexpected invocation of MockBackgroundRoutine.Stop") }, }, @@ -65,6 +78,9 @@ func NewStrictMockBackgroundRoutine() *MockBackgroundRoutine { // implementation, unless overwritten. func NewMockBackgroundRoutineFrom(i BackgroundRoutine) *MockBackgroundRoutine { return &MockBackgroundRoutine{ + NameFunc: &BackgroundRoutineNameFunc{ + defaultHook: i.Name, + }, StartFunc: &BackgroundRoutineStartFunc{ defaultHook: i.Start, }, @@ -74,6 +90,105 @@ func NewMockBackgroundRoutineFrom(i BackgroundRoutine) *MockBackgroundRoutine { } } +// BackgroundRoutineNameFunc describes the behavior when the Name method of +// the parent MockBackgroundRoutine instance is invoked. +type BackgroundRoutineNameFunc struct { + defaultHook func() string + hooks []func() string + history []BackgroundRoutineNameFuncCall + mutex sync.Mutex +} + +// Name delegates to the next hook function in the queue and stores the +// parameter and result values of this invocation. +func (m *MockBackgroundRoutine) Name() string { + r0 := m.NameFunc.nextHook()() + m.NameFunc.appendCall(BackgroundRoutineNameFuncCall{r0}) + return r0 +} + +// SetDefaultHook sets function that is called when the Name method of the +// parent MockBackgroundRoutine instance is invoked and the hook queue is +// empty. +func (f *BackgroundRoutineNameFunc) SetDefaultHook(hook func() string) { + f.defaultHook = hook +} + +// PushHook adds a function to the end of hook queue. Each invocation of the +// Name method of the parent MockBackgroundRoutine instance invokes the hook +// at the front of the queue and discards it. After the queue is empty, the +// default hook function is invoked for any future action. +func (f *BackgroundRoutineNameFunc) PushHook(hook func() string) { + f.mutex.Lock() + f.hooks = append(f.hooks, hook) + f.mutex.Unlock() +} + +// SetDefaultReturn calls SetDefaultHook with a function that returns the +// given values. +func (f *BackgroundRoutineNameFunc) SetDefaultReturn(r0 string) { + f.SetDefaultHook(func() string { + return r0 + }) +} + +// PushReturn calls PushHook with a function that returns the given values. +func (f *BackgroundRoutineNameFunc) PushReturn(r0 string) { + f.PushHook(func() string { + return r0 + }) +} + +func (f *BackgroundRoutineNameFunc) nextHook() func() string { + f.mutex.Lock() + defer f.mutex.Unlock() + + if len(f.hooks) == 0 { + return f.defaultHook + } + + hook := f.hooks[0] + f.hooks = f.hooks[1:] + return hook +} + +func (f *BackgroundRoutineNameFunc) appendCall(r0 BackgroundRoutineNameFuncCall) { + f.mutex.Lock() + f.history = append(f.history, r0) + f.mutex.Unlock() +} + +// History returns a sequence of BackgroundRoutineNameFuncCall objects +// describing the invocations of this function. +func (f *BackgroundRoutineNameFunc) History() []BackgroundRoutineNameFuncCall { + f.mutex.Lock() + history := make([]BackgroundRoutineNameFuncCall, len(f.history)) + copy(history, f.history) + f.mutex.Unlock() + + return history +} + +// BackgroundRoutineNameFuncCall is an object that describes an invocation +// of method Name on an instance of MockBackgroundRoutine. +type BackgroundRoutineNameFuncCall struct { + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 string +} + +// Args returns an interface slice containing the arguments of this +// invocation. +func (c BackgroundRoutineNameFuncCall) Args() []interface{} { + return []interface{}{} +} + +// Results returns an interface slice containing the results of this +// invocation. +func (c BackgroundRoutineNameFuncCall) Results() []interface{} { + return []interface{}{c.Result0} +} + // BackgroundRoutineStartFunc describes the behavior when the Start method // of the parent MockBackgroundRoutine instance is invoked. type BackgroundRoutineStartFunc struct { @@ -172,24 +287,24 @@ func (c BackgroundRoutineStartFuncCall) Results() []interface{} { // BackgroundRoutineStopFunc describes the behavior when the Stop method of // the parent MockBackgroundRoutine instance is invoked. type BackgroundRoutineStopFunc struct { - defaultHook func() - hooks []func() + defaultHook func(context.Context) error + hooks []func(context.Context) error history []BackgroundRoutineStopFuncCall mutex sync.Mutex } // Stop delegates to the next hook function in the queue and stores the // parameter and result values of this invocation. -func (m *MockBackgroundRoutine) Stop() { - m.StopFunc.nextHook()() - m.StopFunc.appendCall(BackgroundRoutineStopFuncCall{}) - return +func (m *MockBackgroundRoutine) Stop(v0 context.Context) error { + r0 := m.StopFunc.nextHook()(v0) + m.StopFunc.appendCall(BackgroundRoutineStopFuncCall{v0, r0}) + return r0 } // SetDefaultHook sets function that is called when the Stop method of the // parent MockBackgroundRoutine instance is invoked and the hook queue is // empty. -func (f *BackgroundRoutineStopFunc) SetDefaultHook(hook func()) { +func (f *BackgroundRoutineStopFunc) SetDefaultHook(hook func(context.Context) error) { f.defaultHook = hook } @@ -197,7 +312,7 @@ func (f *BackgroundRoutineStopFunc) SetDefaultHook(hook func()) { // Stop method of the parent MockBackgroundRoutine instance invokes the hook // at the front of the queue and discards it. After the queue is empty, the // default hook function is invoked for any future action. -func (f *BackgroundRoutineStopFunc) PushHook(hook func()) { +func (f *BackgroundRoutineStopFunc) PushHook(hook func(context.Context) error) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -205,20 +320,20 @@ func (f *BackgroundRoutineStopFunc) PushHook(hook func()) { // SetDefaultReturn calls SetDefaultHook with a function that returns the // given values. -func (f *BackgroundRoutineStopFunc) SetDefaultReturn() { - f.SetDefaultHook(func() { - return +func (f *BackgroundRoutineStopFunc) SetDefaultReturn(r0 error) { + f.SetDefaultHook(func(context.Context) error { + return r0 }) } // PushReturn calls PushHook with a function that returns the given values. -func (f *BackgroundRoutineStopFunc) PushReturn() { - f.PushHook(func() { - return +func (f *BackgroundRoutineStopFunc) PushReturn(r0 error) { + f.PushHook(func(context.Context) error { + return r0 }) } -func (f *BackgroundRoutineStopFunc) nextHook() func() { +func (f *BackgroundRoutineStopFunc) nextHook() func(context.Context) error { f.mutex.Lock() defer f.mutex.Unlock() @@ -250,18 +365,25 @@ func (f *BackgroundRoutineStopFunc) History() []BackgroundRoutineStopFuncCall { // BackgroundRoutineStopFuncCall is an object that describes an invocation // of method Stop on an instance of MockBackgroundRoutine. -type BackgroundRoutineStopFuncCall struct{} +type BackgroundRoutineStopFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 error +} // Args returns an interface slice containing the arguments of this // invocation. func (c BackgroundRoutineStopFuncCall) Args() []interface{} { - return []interface{}{} + return []interface{}{c.Arg0} } // Results returns an interface slice containing the results of this // invocation. func (c BackgroundRoutineStopFuncCall) Results() []interface{} { - return []interface{}{} + return []interface{}{c.Result0} } // MockErrorHandler is a mock implementation of the ErrorHandler interface diff --git a/internal/goroutine/periodic.go b/internal/goroutine/periodic.go index 57fce1d7292..5292e772c5c 100644 --- a/internal/goroutine/periodic.go +++ b/internal/goroutine/periodic.go @@ -181,12 +181,13 @@ func (r *PeriodicGoroutine) Start() { // Stop will cancel the context passed to the handler function to stop the current // iteration of work, then break the loop in the Start method so that no new work // is accepted. This method blocks until Start has returned. -func (r *PeriodicGoroutine) Stop() { +func (r *PeriodicGoroutine) Stop(context.Context) error { if r.recorder != nil { go r.recorder.LogStop(r) } r.cancel() <-r.finished + return nil } func (r *PeriodicGoroutine) runHandlerPool() { diff --git a/internal/goroutine/periodic_test.go b/internal/goroutine/periodic_test.go index 53400a28ab7..eadd885ff9d 100644 --- a/internal/goroutine/periodic_test.go +++ b/internal/goroutine/periodic_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/derision-test/glock" + "github.com/stretchr/testify/require" "github.com/sourcegraph/sourcegraph/lib/errors" ) @@ -42,7 +43,8 @@ func TestPeriodicGoroutine(t *testing.T) { <-called clock.BlockingAdvance(time.Second) <-called - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) if calls := len(handler.HandleFunc.History()); calls != 4 { t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls) @@ -80,7 +82,8 @@ func TestPeriodicGoroutineReinvoke(t *testing.T) { witnessHandler() clock.BlockingAdvance(time.Second) witnessHandler() - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) if calls := len(handler.HandleFunc.History()); calls != 4*maxConsecutiveReinvocations { t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4*maxConsecutiveReinvocations, calls) @@ -120,7 +123,8 @@ func TestPeriodicGoroutineWithDynamicInterval(t *testing.T) { <-called clock.BlockingAdvance(3 * time.Second) <-called - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) if calls := len(handler.HandleFunc.History()); calls != 4 { t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls) @@ -158,7 +162,8 @@ func TestPeriodicGoroutineWithInitialDelay(t *testing.T) { <-called clock.BlockingAdvance(time.Second) <-called - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) if calls := len(handler.HandleFunc.History()); calls != 3 { t.Errorf("unexpected number of handler invocations. want=%d have=%d", 3, calls) @@ -199,7 +204,8 @@ func TestPeriodicGoroutineConcurrency(t *testing.T) { <-called } - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) if calls := len(handler.HandleFunc.History()); calls != 3*concurrency { t.Errorf("unexpected number of handler invocations. want=%d have=%d", 3*concurrency, calls) @@ -260,7 +266,8 @@ func TestPeriodicGoroutineWithDynamicConcurrency(t *testing.T) { <-exit // wait for blocked handler to exit } - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) // N.B.: no need for assertions here as getting through the test at all to this // point without some permanent blockage shows that each of the pool sizes behave @@ -297,7 +304,8 @@ func TestPeriodicGoroutineError(t *testing.T) { <-called clock.BlockingAdvance(time.Second) <-called - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) if calls := len(handler.HandleFunc.History()); calls != 4 { t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls) @@ -352,7 +360,8 @@ func TestPeriodicGoroutinePanic(t *testing.T) { case <-time.After(time.Second): t.Fatal("second call didn't happen within 1s") } - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) if calls := len(handler.HandleFunc.History()); calls != 2 { t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls) @@ -382,7 +391,8 @@ func TestPeriodicGoroutineContextError(t *testing.T) { ) go goroutine.Start() <-called - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) if calls := len(handler.HandleFunc.History()); calls != 1 { t.Errorf("unexpected number of handler invocations. want=%d have=%d", 1, calls) @@ -417,7 +427,8 @@ func TestPeriodicGoroutineFinalizer(t *testing.T) { <-called clock.BlockingAdvance(time.Second) <-called - goroutine.Stop() + err := goroutine.Stop(context.Background()) + require.NoError(t, err) if calls := len(handler.HandleFunc.History()); calls != 4 { t.Errorf("unexpected number of handler invocations. want=%d have=%d", 4, calls) diff --git a/internal/goroutine/recorder/common_test.go b/internal/goroutine/recorder/common_test.go index 6298aaab4dc..76ffbba56d3 100644 --- a/internal/goroutine/recorder/common_test.go +++ b/internal/goroutine/recorder/common_test.go @@ -1,13 +1,15 @@ package recorder import ( + "context" "testing" "time" "github.com/sourcegraph/log" + "github.com/stretchr/testify/assert" + "github.com/sourcegraph/sourcegraph/internal/rcache" "github.com/sourcegraph/sourcegraph/lib/errors" - "github.com/stretchr/testify/assert" ) // TestLoggerAndReaderHappyPaths tests pretty much everything in the happy path of both the logger and the log reader. @@ -114,8 +116,9 @@ func (r *RoutineMock) Start() { // Do nothing } -func (r *RoutineMock) Stop() { +func (r *RoutineMock) Stop(context.Context) error { // Do nothing + return nil } func (r *RoutineMock) Name() string { diff --git a/internal/goroutine/recorder/recorder.go b/internal/goroutine/recorder/recorder.go index 6640a0dc6c8..0a25d2faf8f 100644 --- a/internal/goroutine/recorder/recorder.go +++ b/internal/goroutine/recorder/recorder.go @@ -1,6 +1,7 @@ package recorder import ( + "context" "encoding/json" "time" @@ -12,7 +13,7 @@ import ( type Recordable interface { Start() - Stop() + Stop(ctx context.Context) error Name() string Type() RoutineType JobName() string diff --git a/internal/grpc/grpcserver/grpcserver.go b/internal/grpc/grpcserver/grpcserver.go index 6db7bc35b39..ca7036108be 100644 --- a/internal/grpc/grpcserver/grpcserver.go +++ b/internal/grpc/grpcserver/grpcserver.go @@ -1,6 +1,7 @@ package grpcserver import ( + "context" "net" "os" "sync" @@ -53,6 +54,10 @@ func newServer(logger log.Logger, grpcServer *grpc.Server, makeListener func() ( return s } +func (s *server) Name() string { + return "gRPC server" +} + func (s *server) Start() { listener, err := s.makeListener() if err != nil { @@ -66,7 +71,7 @@ func (s *server) Start() { } } -func (s *server) Stop() { +func (s *server) Stop(context.Context) error { s.once.Do(func() { s.logger.Info("Shutting down gRPC server") if s.preShutdownPause > 0 { @@ -90,4 +95,5 @@ func (s *server) Stop() { s.logger.Warn("gRPC server forcefully stopped") }) + return nil } diff --git a/internal/httpserver/server.go b/internal/httpserver/server.go index 298ded869e4..5a914e5fc0c 100644 --- a/internal/httpserver/server.go +++ b/internal/httpserver/server.go @@ -51,6 +51,10 @@ func newServer(httpServer *http.Server, makeListener func() (net.Listener, error return s } +func (s *server) Name() string { + return "HTTP server" +} + func (s *server) Start() { listener, err := s.makeListener() if err != nil { @@ -64,17 +68,18 @@ func (s *server) Start() { } } -func (s *server) Stop() { +func (s *server) Stop(ctx context.Context) error { s.once.Do(func() { if s.preShutdownPause > 0 { time.Sleep(s.preShutdownPause) } - ctx, cancel := context.WithTimeout(context.Background(), goroutine.GracefulShutdownTimeout) + ctx, cancel := context.WithTimeout(ctx, goroutine.GracefulShutdownTimeout) defer cancel() if err := s.server.Shutdown(ctx); err != nil { log15.Error("Failed to shutdown server", "error", err) } }) + return nil } diff --git a/internal/insights/scheduler/scheduler_test.go b/internal/insights/scheduler/scheduler_test.go index f8bcb879ec0..0ce722b10bd 100644 --- a/internal/insights/scheduler/scheduler_test.go +++ b/internal/insights/scheduler/scheduler_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" edb "github.com/sourcegraph/sourcegraph/internal/database" @@ -22,7 +23,7 @@ import ( func Test_MonitorStartsAndStops(t *testing.T) { logger := logtest.Scoped(t) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() insightsDB := edb.NewInsightsDB(dbtest.NewInsightsDB(logger, t), logger) repos := dbmocks.NewMockRepoStore() @@ -33,7 +34,8 @@ func Test_MonitorStartsAndStops(t *testing.T) { CostAnalyzer: priority.NewQueryAnalyzer(), } routines := NewBackgroundJobMonitor(ctx, config).Routines() - goroutine.MonitorBackgroundRoutines(ctx, routines...) + err := goroutine.MonitorBackgroundRoutines(ctx, routines...) + assert.EqualError(t, err, "unable to stop routines gracefully: context deadline exceeded") } func TestScheduler_InitialBackfill(t *testing.T) { diff --git a/internal/oobmigration/BUILD.bazel b/internal/oobmigration/BUILD.bazel index aa8a7769123..5902a09441f 100644 --- a/internal/oobmigration/BUILD.bazel +++ b/internal/oobmigration/BUILD.bazel @@ -74,6 +74,7 @@ go_test( "@com_github_keegancsmith_sqlf//:sqlf", "@com_github_sourcegraph_log//:log", "@com_github_sourcegraph_log//logtest", + "@com_github_stretchr_testify//require", ], ) diff --git a/internal/oobmigration/runner.go b/internal/oobmigration/runner.go index 427be667068..97f3834b31d 100644 --- a/internal/oobmigration/runner.go +++ b/internal/oobmigration/runner.go @@ -193,6 +193,10 @@ func (r *Runner) UpdateDirection(ctx context.Context, ids []int, applyReverse bo return nil } +func (r *Runner) Name() string { + return "oobmigrationRunner" +} + // Start runs registered migrators on a loop until they complete. This method will periodically // re-read from the database in order to refresh its current view of the migrations. func (r *Runner) Start(currentVersion, firstVersion Version) { @@ -338,9 +342,10 @@ func (r *Runner) ensureProcessorIsRunning(wg *sync.WaitGroup, m map[int]chan Mig } // Stop will cancel the context used in Start, then blocks until Start has returned. -func (r *Runner) Stop() { +func (r *Runner) Stop(context.Context) error { r.cancel() <-r.finished + return nil } type migratorOptions struct { diff --git a/internal/oobmigration/runner_test.go b/internal/oobmigration/runner_test.go index 1f40c08adf7..3f5b8d37292 100644 --- a/internal/oobmigration/runner_test.go +++ b/internal/oobmigration/runner_test.go @@ -8,6 +8,7 @@ import ( "github.com/derision-test/glock" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" "github.com/sourcegraph/log" "github.com/sourcegraph/log/logtest" @@ -36,7 +37,8 @@ func TestRunner(t *testing.T) { go runner.startInternal(allowAll) tickN(ticker, 3) - runner.Stop() + err := runner.Stop(context.Background()) + require.NoError(t, err) if callCount := len(migrator.UpFunc.History()); callCount != 3 { t.Errorf("unexpected number of calls to Up. want=%d have=%d", 3, callCount) @@ -67,7 +69,8 @@ func TestRunnerError(t *testing.T) { go runner.startInternal(allowAll) tickN(ticker, 1) - runner.Stop() + err := runner.Stop(context.Background()) + require.NoError(t, err) if calls := store.AddErrorFunc.history; len(calls) != 1 { t.Fatalf("unexpected number of calls to AddError. want=%d have=%d", 1, len(calls)) @@ -124,7 +127,8 @@ func TestRunnerRemovesCompleted(t *testing.T) { tickN(ticker1, 5) tickN(ticker2, 5) tickN(ticker3, 5) - runner.Stop() + err := runner.Stop(context.Background()) + require.NoError(t, err) // not finished if callCount := len(migrator1.UpFunc.History()); callCount != 5 { @@ -417,7 +421,8 @@ func TestRunnerBoundsFilter(t *testing.T) { return m.ID != 2 }) tickN(ticker, 64) - runner.Stop() + err := runner.Stop(context.Background()) + require.NoError(t, err) // not called if callCount := len(migrator2.UpFunc.History()); callCount != 0 { diff --git a/internal/pubsub/pubsubtest/pubsubtest.go b/internal/pubsub/pubsubtest/pubsubtest.go index f0db7784f86..427a60ce818 100644 --- a/internal/pubsub/pubsubtest/pubsubtest.go +++ b/internal/pubsub/pubsubtest/pubsubtest.go @@ -50,4 +50,4 @@ func (c *MemoryTopicClient) PublishMessage(ctx context.Context, message []byte, } func (c *MemoryTopicClient) Ping(context.Context) error { return nil } -func (c *MemoryTopicClient) Stop() {} +func (c *MemoryTopicClient) Stop(context.Context) error { return nil } diff --git a/internal/pubsub/topic.go b/internal/pubsub/topic.go index bfaffebe30c..562bec3e5ec 100644 --- a/internal/pubsub/topic.go +++ b/internal/pubsub/topic.go @@ -25,7 +25,7 @@ type TopicClient interface { Ping(ctx context.Context) error // Stop stops the topic publishing channel. The client should not be used after // calling Stop. - Stop() + Stop(ctx context.Context) error } // TopicPublisher is a Pub/Sub publisher bound to a topic. @@ -101,8 +101,9 @@ func (c *topicClient) PublishMessage(ctx context.Context, message []byte, attrib return nil } -func (c *topicClient) Stop() { +func (c *topicClient) Stop(context.Context) error { c.topic.Stop() + return nil } // NewNoopTopicClient creates a no-op Pub/Sub client that does nothing on any @@ -118,7 +119,7 @@ func (c *noopTopicClient) Publish(context.Context, ...[]byte) error { return nil func (c *noopTopicClient) PublishMessage(context.Context, []byte, map[string]string) error { return nil } -func (c *noopTopicClient) Stop() {} +func (c *noopTopicClient) Stop(context.Context) error { return nil } // NewLoggingTopicClient creates a Pub/Sub client that just logs all messages, // and does nothing otherwise. This is also a useful stub implementation of the diff --git a/internal/repos/sync_worker.go b/internal/repos/sync_worker.go index 5f62d083eaa..3f937489a9c 100644 --- a/internal/repos/sync_worker.go +++ b/internal/repos/sync_worker.go @@ -90,7 +90,7 @@ func NewSyncWorker(ctx context.Context, observationCtx *observation.Context, dbH if opts.CleanupOldJobs { janitor = newJobCleanerRoutine(ctx, database.NewDBWith(observationCtx.Logger, basestore.NewWithHandle(dbHandle)), opts.CleanupOldJobsInterval) } else { - janitor = goroutine.NoopRoutine() + janitor = goroutine.NoopRoutine("noop sync worker") } return worker, resetter, janitor diff --git a/internal/repos/sync_worker_test.go b/internal/repos/sync_worker_test.go index 6e615a850f0..31681fa2fc2 100644 --- a/internal/repos/sync_worker_test.go +++ b/internal/repos/sync_worker_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/keegancsmith/sqlf" + "github.com/stretchr/testify/require" "github.com/sourcegraph/log" @@ -64,9 +65,14 @@ func TestSyncWorkerPlumbing(t *testing.T) { // finalising the row which means that when running tests in verbose mode we'll // see "sql: transaction has already been committed or rolled back". These // errors can be ignored. - defer janitor.Stop() - defer resetter.Stop() - defer worker.Stop() + t.Cleanup(func() { + err := janitor.Stop(ctx) + require.NoError(t, err) + err = resetter.Stop(ctx) + require.NoError(t, err) + err = worker.Stop(ctx) + require.NoError(t, err) + }) var job *repos.SyncJob select { diff --git a/internal/repos/syncer_test.go b/internal/repos/syncer_test.go index 4b905cff18d..3aa8a2e6098 100644 --- a/internal/repos/syncer_test.go +++ b/internal/repos/syncer_test.go @@ -12,6 +12,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/keegancsmith/sqlf" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/time/rate" @@ -954,12 +955,16 @@ func TestSyncRun(t *testing.T) { done := make(chan struct{}) go func() { - goroutine.MonitorBackgroundRoutines(ctx, syncer.Routines(ctx, store, repos.RunOptions{ - EnqueueInterval: func() time.Duration { return time.Second }, - IsDotCom: false, - MinSyncInterval: func() time.Duration { return 1 * time.Millisecond }, - DequeueInterval: 1 * time.Millisecond, - })...) + err := goroutine.MonitorBackgroundRoutines( + ctx, + syncer.Routines(ctx, store, repos.RunOptions{ + EnqueueInterval: func() time.Duration { return time.Second }, + IsDotCom: false, + MinSyncInterval: func() time.Duration { return 1 * time.Millisecond }, + DequeueInterval: 1 * time.Millisecond, + })..., + ) + assert.EqualError(t, err, "unable to stop routines gracefully: context canceled") done <- struct{}{} }() @@ -1109,12 +1114,16 @@ func TestSyncerMultipleServices(t *testing.T) { done := make(chan struct{}) go func() { - goroutine.MonitorBackgroundRoutines(ctx, syncer.Routines(ctx, store, repos.RunOptions{ - EnqueueInterval: func() time.Duration { return time.Second }, - IsDotCom: false, - MinSyncInterval: func() time.Duration { return 1 * time.Minute }, - DequeueInterval: 1 * time.Millisecond, - })...) + err := goroutine.MonitorBackgroundRoutines( + ctx, + syncer.Routines(ctx, store, repos.RunOptions{ + EnqueueInterval: func() time.Duration { return time.Second }, + IsDotCom: false, + MinSyncInterval: func() time.Duration { return 1 * time.Minute }, + DequeueInterval: 1 * time.Millisecond, + })..., + ) + assert.EqualError(t, err, "unable to stop routines gracefully: context canceled") done <- struct{}{} }() diff --git a/internal/workerutil/BUILD.bazel b/internal/workerutil/BUILD.bazel index 2965111b1c6..d69b39ddc2c 100644 --- a/internal/workerutil/BUILD.bazel +++ b/internal/workerutil/BUILD.bazel @@ -46,6 +46,7 @@ go_test( "@com_github_google_go_cmp//cmp", "@com_github_sourcegraph_log//:log", "@com_github_sourcegraph_log//logtest", + "@com_github_stretchr_testify//require", ], ) diff --git a/internal/workerutil/dbworker/BUILD.bazel b/internal/workerutil/dbworker/BUILD.bazel index 914060499bc..7601407df65 100644 --- a/internal/workerutil/dbworker/BUILD.bazel +++ b/internal/workerutil/dbworker/BUILD.bazel @@ -34,5 +34,6 @@ go_test( "@com_github_derision_test_glock//:glock", "@com_github_prometheus_client_golang//prometheus", "@com_github_sourcegraph_log//logtest", + "@com_github_stretchr_testify//require", ], ) diff --git a/internal/workerutil/dbworker/resetter.go b/internal/workerutil/dbworker/resetter.go index 10c33d63938..12657ebfb84 100644 --- a/internal/workerutil/dbworker/resetter.go +++ b/internal/workerutil/dbworker/resetter.go @@ -2,6 +2,7 @@ package dbworker import ( "context" + "fmt" "time" "github.com/derision-test/glock" @@ -96,6 +97,10 @@ func newResetter[T workerutil.Record](logger log.Logger, store store.Store[T], o } } +func (r *Resetter[T]) Name() string { + return fmt.Sprintf("dbworker.Resetter[%s]", r.options.Name) +} + // Start begins periodically calling reset stalled on the underlying store. func (r *Resetter[T]) Start() { defer close(r.finished) @@ -132,7 +137,8 @@ loop: } // Stop will cause the resetter loop to exit after the current iteration. -func (r *Resetter[T]) Stop() { +func (r *Resetter[T]) Stop(context.Context) error { r.cancel() <-r.finished + return nil } diff --git a/internal/workerutil/dbworker/resetter_test.go b/internal/workerutil/dbworker/resetter_test.go index 099186cca0e..182836123e6 100644 --- a/internal/workerutil/dbworker/resetter_test.go +++ b/internal/workerutil/dbworker/resetter_test.go @@ -1,12 +1,14 @@ package dbworker import ( + "context" "strconv" "testing" "time" "github.com/derision-test/glock" "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" "github.com/sourcegraph/log/logtest" @@ -43,7 +45,8 @@ func TestResetter(t *testing.T) { resetter := newResetter(logger, store.Store[*TestRecord](s), options, clock) go func() { resetter.Start() }() clock.BlockingAdvance(time.Second) - resetter.Stop() + err := resetter.Stop(context.Background()) + require.NoError(t, err) if callCount := len(s.ResetStalledFunc.History()); callCount < 1 { t.Errorf("unexpected reset stalled call count. want>=%d have=%d", 1, callCount) diff --git a/internal/workerutil/worker.go b/internal/workerutil/worker.go index 71ae2fca18f..078268e69ea 100644 --- a/internal/workerutil/worker.go +++ b/internal/workerutil/worker.go @@ -256,12 +256,13 @@ loop: // Stop will cause the worker loop to exit after the current iteration. This is done by canceling the // context passed to the dequeue operations (but not the handler operations). This method blocks until // all handler goroutines have exited. -func (w *Worker[T]) Stop() { +func (w *Worker[T]) Stop(context.Context) error { if w.recorder != nil { go w.recorder.LogStop(w) } w.dequeueCancel() w.Wait() + return nil } // Wait blocks until all handler goroutines have exited. diff --git a/internal/workerutil/worker_test.go b/internal/workerutil/worker_test.go index 7cf8d0db341..fe7a00f6722 100644 --- a/internal/workerutil/worker_test.go +++ b/internal/workerutil/worker_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/derision-test/glock" + "github.com/stretchr/testify/require" "github.com/sourcegraph/log" @@ -48,10 +49,12 @@ func TestWorkerHandlerSuccess(t *testing.T) { store.DequeueFunc.SetDefaultReturn(nil, false, nil) store.MarkCompleteFunc.SetDefaultReturn(true, nil) - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) go func() { worker.Start() }() dequeueClock.BlockingAdvance(time.Second) - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) if callCount := len(handler.HandleFunc.History()); callCount != 1 { t.Errorf("unexpected handle call count. want=%d have=%d", 1, callCount) @@ -85,10 +88,12 @@ func TestWorkerHandlerFailure(t *testing.T) { store.MarkErroredFunc.SetDefaultReturn(true, nil) handler.HandleFunc.SetDefaultReturn(errors.Errorf("oops")) - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) go func() { worker.Start() }() dequeueClock.BlockingAdvance(time.Second) - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) if callCount := len(handler.HandleFunc.History()); callCount != 1 { t.Errorf("unexpected handle call count. want=%d have=%d", 1, callCount) @@ -131,10 +136,12 @@ func TestWorkerHandlerNonRetryableFailure(t *testing.T) { testErr := nonRetryableTestErr{} handler.HandleFunc.SetDefaultReturn(testErr) - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) go func() { worker.Start() }() dequeueClock.BlockingAdvance(time.Second) - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) if callCount := len(handler.HandleFunc.History()); callCount != 1 { t.Errorf("unexpected handle call count. want=%d have=%d", 1, callCount) @@ -202,12 +209,14 @@ func TestWorkerConcurrent(t *testing.T) { return nil }) - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) go func() { worker.Start() }() for range NumTestRecords { dequeueClock.BlockingAdvance(time.Second) } - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) // We never want to see more concurrently running jobs than configured. // Ideally, we would also want to see that the number is exactly equal, @@ -244,10 +253,12 @@ func TestWorkerBlockingPreDequeueHook(t *testing.T) { // Block all dequeues handler.PreDequeueFunc.SetDefaultReturn(false, nil, nil) - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) go func() { worker.Start() }() dequeueClock.BlockingAdvance(time.Second) - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) if callCount := len(handler.HandleFunc.History()); callCount != 0 { t.Errorf("unexpected handle call count. want=%d have=%d", 0, callCount) @@ -278,12 +289,14 @@ func TestWorkerConditionalPreDequeueHook(t *testing.T) { handler.PreDequeueFunc.PushReturn(true, "B", nil) handler.PreDequeueFunc.PushReturn(true, "C", nil) - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) go func() { worker.Start() }() dequeueClock.BlockingAdvance(time.Second) dequeueClock.BlockingAdvance(time.Second) dequeueClock.BlockingAdvance(time.Second) - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) if callCount := len(handler.HandleFunc.History()); callCount != 3 { t.Errorf("unexpected handle call count. want=%d have=%d", 3, callCount) @@ -358,11 +371,13 @@ func TestWorkerDequeueHeartbeat(t *testing.T) { return i, nil, nil }) - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) go func() { worker.Start() }() t.Cleanup(func() { close(doneHandling) - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) }) <-dequeued @@ -395,7 +410,8 @@ func TestWorkerNumTotalJobs(t *testing.T) { store.MarkCompleteFunc.SetDefaultReturn(true, nil) // Should process 5 then shut down - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) worker.Start() if callCount := len(store.DequeueFunc.History()); callCount != 5 { @@ -437,7 +453,8 @@ func TestWorkerMaxActiveTime(t *testing.T) { stopped := make(chan struct{}) go func() { defer close(stopped) - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, dequeueClock, heartbeatClock, shutdownClock) worker.Start() }() @@ -513,12 +530,14 @@ func TestWorkerCancelJobs(t *testing.T) { clock := glock.NewMockClock() heartbeatClock := glock.NewMockClock() - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, heartbeatClock, clock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, heartbeatClock, clock) go func() { worker.Start() }() t.Cleanup(func() { // Keep the handler working until context is canceled. close(doneHandling) - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) }) // Wait until a job has been dequeued. @@ -590,12 +609,14 @@ func TestWorkerDeadline(t *testing.T) { }) clock := glock.NewMockClock() - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, clock, clock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, clock, clock) go func() { worker.Start() }() t.Cleanup(func() { // Keep the handler working until context is canceled. close(doneHandling) - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) }) // Wait until a job has been dequeued. @@ -644,9 +665,13 @@ func TestWorkerStopDrainsDequeueLoopOnly(t *testing.T) { }) clock := glock.NewMockClock() - worker := newWorker(context.Background(), Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, clock, clock) + ctx := context.Background() + worker := newWorker(ctx, Store[*TestRecord](store), Handler[*TestRecord](handler), options, clock, clock, clock) go func() { worker.Start() }() - t.Cleanup(func() { worker.Stop() }) + t.Cleanup(func() { + err := worker.Stop(ctx) + require.NoError(t, err) + }) // Wait until a job has been dequeued. <-dequeued @@ -657,7 +682,8 @@ func TestWorkerStopDrainsDequeueLoopOnly(t *testing.T) { }() // Drain dequeue loop and wait for the one active handler to finish. - worker.Stop() + err := worker.Stop(ctx) + require.NoError(t, err) for _, call := range handler.HandleFunc.History() { if call.Result0 != nil { diff --git a/lib/background/BUILD.bazel b/lib/background/BUILD.bazel index 8f599a2ae11..82a68f11d03 100644 --- a/lib/background/BUILD.bazel +++ b/lib/background/BUILD.bazel @@ -10,6 +10,7 @@ go_library( ], importpath = "github.com/sourcegraph/sourcegraph/lib/background", visibility = ["//visibility:public"], + deps = ["//lib/errors"], ) go_test( @@ -20,7 +21,12 @@ go_test( "mocks_test.go", ], embed = [":background"], - deps = ["@com_github_stretchr_testify//assert"], + deps = [ + "//lib/errors", + "@com_github_derision_test_go_mockgen_v2//testutil/require", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], ) go_mockgen( diff --git a/lib/background/background.go b/lib/background/background.go index 70199fbb518..66726c8a8b4 100644 --- a/lib/background/background.go +++ b/lib/background/background.go @@ -2,47 +2,55 @@ package background import ( "context" + "fmt" "os" "os/signal" "sync" "syscall" + + "github.com/sourcegraph/sourcegraph/lib/errors" ) -// Routine represents a component of a binary that consists of a long -// running process with a graceful shutdown mechanism. +// Routine represents a background process that consists of a long-running +// process with a graceful shutdown mechanism. type Routine interface { - // Start begins the long-running process. This routine may also implement - // a Stop method that should signal this process the application is going - // to shut down. + // Name returns the human-readable name of the routine. + Name() string + // Start begins the long-running process. This routine may also implement a Stop + // method that should signal this process the application is going to shut down. Start() - // Stop signals the Start method to stop accepting new work and complete its // current work. This method can but is not required to block until Start has - // returned. - Stop() + // returned. The method should respect the context deadline passed to it for + // proper graceful shutdown. + Stop(ctx context.Context) error } -// Monitor will start the given background routines in their own -// goroutine. If the given context is canceled or a signal is received, the Stop -// method of each routine will be called. This method blocks until the Stop methods -// of each routine have returned. Two signals will cause the app to shutdown +// Monitor will start the given background routines in their own goroutine. If +// the given context is canceled or a signal is received, the Stop method of +// each routine will be called. This method blocks until the Stop methods of +// each routine have returned. Two signals will cause the app to shut down // immediately. -func Monitor(ctx context.Context, routines ...Routine) { +// +// This function is only returned when routines are signaled to stop with +// potential errors from stopping routines. +func Monitor(ctx context.Context, routines ...Routine) error { signals := make(chan os.Signal, 2) signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) - monitorBackgroundRoutines(ctx, signals, routines...) + return monitorBackgroundRoutines(ctx, signals, routines...) } -func monitorBackgroundRoutines(ctx context.Context, signals <-chan os.Signal, routines ...Routine) { +func monitorBackgroundRoutines(ctx context.Context, signals <-chan os.Signal, routines ...Routine) error { wg := &sync.WaitGroup{} startAll(wg, routines...) waitForSignal(ctx, signals) - stopAll(wg, routines...) - wg.Wait() + return stopAll(ctx, wg, routines...) } // startAll calls each routine's Start method in its own goroutine and registers -// each running goroutine with the given waitgroup. +// each running goroutine with the given waitgroup. It DOES NOT wait for the +// routines to finish starting, so the caller must wait for the waitgroup (if +// desired). func startAll(wg *sync.WaitGroup, routines ...Routine) { for _, r := range routines { t := r @@ -52,12 +60,39 @@ func startAll(wg *sync.WaitGroup, routines ...Routine) { } // stopAll calls each routine's Stop method in its own goroutine and registers -// each running goroutine with the given waitgroup. -func stopAll(wg *sync.WaitGroup, routines ...Routine) { +// each running goroutine with the given waitgroup. It waits for all routines to +// stop or the context to be canceled. +func stopAll(ctx context.Context, wg *sync.WaitGroup, routines ...Routine) error { + var stopErrs error + var stopErrsLock sync.Mutex for _, r := range routines { - t := r wg.Add(1) - Go(func() { defer wg.Done(); t.Stop() }) + Go(func() { + defer wg.Done() + if err := r.Stop(ctx); err != nil { + stopErrsLock.Lock() + stopErrs = errors.Append(stopErrs, errors.Wrapf(err, "stop routine %q", r.Name())) + stopErrsLock.Unlock() + } + }) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + done <- struct{}{} + }() + + select { + case <-done: + return stopErrs + case <-ctx.Done(): + stopErrsLock.Lock() + defer stopErrsLock.Unlock() + if stopErrs != nil { + return errors.Wrapf(ctx.Err(), "unable to stop routines gracefully with partial errors: %v", stopErrs) + } + return errors.Wrap(ctx.Err(), "unable to stop routines gracefully") } } @@ -89,19 +124,32 @@ func exitAfterSignals(signals <-chan os.Signal, numSignals int) { exiter() } -// CombinedRoutine is a list of routines which are started and stopped in unison. +// CombinedRoutine is a list of routines which are started and stopped in +// unison. type CombinedRoutine []Routine -func (r CombinedRoutine) Start() { +func (rs CombinedRoutine) Name() string { + names := make([]string, 0, len(rs)) + for _, r := range rs { + names = append(names, r.Name()) + } + return fmt.Sprintf("combined%q", names) // [a b c] -> combined["one" "two" "three"] +} + +// Start starts all routines, it does not wait for the routines to finish +// starting. +func (rs CombinedRoutine) Start() { wg := &sync.WaitGroup{} - startAll(wg, r...) + startAll(wg, rs...) wg.Wait() } -func (r CombinedRoutine) Stop() { +// Stop attempts to gracefully stopping all routines. It attempts to collect all +// the errors returned from the routines, and respects the context deadline +// passed to it and gives up waiting when context deadline exceeded. +func (rs CombinedRoutine) Stop(ctx context.Context) error { wg := &sync.WaitGroup{} - stopAll(wg, r...) - wg.Wait() + return stopAll(ctx, wg, rs...) } // LIFOStopRoutine is a list of routines which are started in unison, but stopped @@ -112,24 +160,45 @@ func (r CombinedRoutine) Stop() { // primary service stops for a graceful shutdown. type LIFOStopRoutine []Routine +func (r LIFOStopRoutine) Name() string { return "lifo" } + func (r LIFOStopRoutine) Start() { CombinedRoutine(r).Start() } -func (r LIFOStopRoutine) Stop() { +func (r LIFOStopRoutine) Stop(ctx context.Context) error { + var stopErr error for i := len(r) - 1; i >= 0; i -= 1 { - r[i].Stop() + err := r[i].Stop(ctx) + if err != nil { + stopErr = errors.Append(stopErr, errors.Wrapf(err, "stop routine %q", r[i].Name())) + } } + return stopErr } -// NoopRoutine does nothing for start or stop. -func NoopRoutine() Routine { - return CallbackRoutine{} +// NoopRoutine return a background routine that does nothing for start or stop. +// If the name is empty, it will default to "noop". +func NoopRoutine(name string) Routine { + if name == "" { + name = "noop" + } + return CallbackRoutine{ + NameFunc: func() string { return name }, + } } // CallbackRoutine calls the StartFunc and StopFunc callbacks to implement a // Routine. Each callback may be nil. type CallbackRoutine struct { + NameFunc func() string StartFunc func() - StopFunc func() + StopFunc func(ctx context.Context) error +} + +func (r CallbackRoutine) Name() string { + if r.NameFunc != nil { + return r.NameFunc() + } + return "callback" } func (r CallbackRoutine) Start() { @@ -138,8 +207,9 @@ func (r CallbackRoutine) Start() { } } -func (r CallbackRoutine) Stop() { +func (r CallbackRoutine) Stop(ctx context.Context) error { if r.StopFunc != nil { - r.StopFunc() + return r.StopFunc(ctx) } + return nil } diff --git a/lib/background/background_test.go b/lib/background/background_test.go index ee6fe5899bb..91155139423 100644 --- a/lib/background/background_test.go +++ b/lib/background/background_test.go @@ -5,8 +5,13 @@ import ( "os" "syscall" "testing" + "time" + mockrequire "github.com/derision-test/go-mockgen/v2/testutil/require" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/sourcegraph/sourcegraph/lib/errors" ) // Make the exiter a no-op in tests @@ -23,7 +28,8 @@ func TestMonitorBackgroundRoutinesSignal(t *testing.T) { go func() { defer close(unblocked) - monitorBackgroundRoutines(context.Background(), signals, r1, r2, r3) + err := monitorBackgroundRoutines(context.Background(), signals, r1, r2, r3) + require.NoError(t, err) }() signals <- syscall.SIGINT @@ -52,7 +58,8 @@ func TestMonitorBackgroundRoutinesContextCancel(t *testing.T) { go func() { defer close(unblocked) - monitorBackgroundRoutines(ctx, signals, r1, r2, r3) + err := monitorBackgroundRoutines(ctx, signals, r1, r2, r3) + assert.EqualError(t, err, "unable to stop routines gracefully: context canceled") }() cancel() @@ -68,18 +75,94 @@ func TestMonitorBackgroundRoutinesContextCancel(t *testing.T) { } } +func TestCombinedRoutine_Name(t *testing.T) { + r1 := NewMockRoutine() + r1.NameFunc.SetDefaultReturn("r1") + r2 := NewMockRoutine() + r2.NameFunc.SetDefaultReturn("r2") + rs := CombinedRoutine{r1, r2} + assert.Equal(t, `combined["r1" "r2"]`, rs.Name()) +} + +func TestCombinedRoutines(t *testing.T) { + t.Run("successful stop", func(t *testing.T) { + r1 := NewMockRoutine() + r2 := NewMockRoutine() + rs := CombinedRoutine{r1, r2} + rs.Start() + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second)) + defer cancel() + err := rs.Stop(ctx) + require.NoError(t, err) + mockrequire.Called(t, r1.StopFunc) + mockrequire.Called(t, r2.StopFunc) + }) + + t.Run("stop with error", func(t *testing.T) { + r1 := NewMockRoutine() + r2 := NewMockRoutine() + r2.NameFunc.SetDefaultReturn("mock") + r2.StopFunc.SetDefaultReturn(errors.New("stop error")) + rs := CombinedRoutine{r1, r2} + rs.Start() + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second)) + defer cancel() + err := rs.Stop(ctx) + assert.EqualError(t, err, `stop routine "mock": stop error`) + mockrequire.Called(t, r1.StopFunc) + mockrequire.Called(t, r2.StopFunc) + }) + + t.Run("stop with timeout", func(t *testing.T) { + r1 := NewMockRoutine() + r1.NameFunc.SetDefaultReturn("mock") + r1.StopFunc.SetDefaultReturn(errors.New("stop error")) + r2 := NewMockRoutine() + r2.StopFunc.PushHook(func(context.Context) error { + time.Sleep(100 * time.Millisecond) + return nil + }) + rs := CombinedRoutine{r1, r2} + rs.Start() + + // Context deadline is 50ms, which is half of the sleep time of r2. + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond)) + defer cancel() + err := rs.Stop(ctx) + assert.EqualError(t, err, `unable to stop routines gracefully with partial errors: stop routine "mock": stop error: context deadline exceeded`) + + // Although the caller doesn't wait, each routine should still be stopped if + // they get the chance. We wait long enough to avoid flakiness. + time.Sleep(100 * time.Millisecond) + mockrequire.Called(t, r1.StopFunc) + mockrequire.Called(t, r2.StopFunc) + }) +} + func TestLIFOStopRoutine(t *testing.T) { // use an unguarded slice because LIFOStopRoutine should only stop in sequence var stopped []string r1 := NewMockRoutine() - r1.StopFunc.PushHook(func() { stopped = append(stopped, "r1") }) + r1.StopFunc.PushHook(func(context.Context) error { + stopped = append(stopped, "r1") + return nil + }) r2 := NewMockRoutine() - r2.StopFunc.PushHook(func() { stopped = append(stopped, "r2") }) + r2.StopFunc.PushHook(func(context.Context) error { + stopped = append(stopped, "r2") + return nil + }) r3 := NewMockRoutine() - r3.StopFunc.PushHook(func() { stopped = append(stopped, "r3") }) + r3.StopFunc.PushHook(func(context.Context) error { + stopped = append(stopped, "r3") + return nil + }) r := LIFOStopRoutine{r1, r2, r3} - r.Stop() + err := r.Stop(context.Background()) + require.NoError(t, err) // stops in reverse assert.Equal(t, []string{"r3", "r2", "r1"}, stopped) } diff --git a/lib/background/mocks_test.go b/lib/background/mocks_test.go index 7d364ae518e..2c01b3ad132 100644 --- a/lib/background/mocks_test.go +++ b/lib/background/mocks_test.go @@ -6,12 +6,18 @@ package background -import "sync" +import ( + "context" + "sync" +) // MockRoutine is a mock implementation of the Routine interface (from the // package github.com/sourcegraph/sourcegraph/lib/background) used for unit // testing. type MockRoutine struct { + // NameFunc is an instance of a mock function object controlling the + // behavior of the method Name. + NameFunc *RoutineNameFunc // StartFunc is an instance of a mock function object controlling the // behavior of the method Start. StartFunc *RoutineStartFunc @@ -24,13 +30,18 @@ type MockRoutine struct { // return zero values for all results, unless overwritten. func NewMockRoutine() *MockRoutine { return &MockRoutine{ + NameFunc: &RoutineNameFunc{ + defaultHook: func() (r0 string) { + return + }, + }, StartFunc: &RoutineStartFunc{ defaultHook: func() { return }, }, StopFunc: &RoutineStopFunc{ - defaultHook: func() { + defaultHook: func(context.Context) (r0 error) { return }, }, @@ -41,13 +52,18 @@ func NewMockRoutine() *MockRoutine { // methods panic on invocation, unless overwritten. func NewStrictMockRoutine() *MockRoutine { return &MockRoutine{ + NameFunc: &RoutineNameFunc{ + defaultHook: func() string { + panic("unexpected invocation of MockRoutine.Name") + }, + }, StartFunc: &RoutineStartFunc{ defaultHook: func() { panic("unexpected invocation of MockRoutine.Start") }, }, StopFunc: &RoutineStopFunc{ - defaultHook: func() { + defaultHook: func(context.Context) error { panic("unexpected invocation of MockRoutine.Stop") }, }, @@ -58,6 +74,9 @@ func NewStrictMockRoutine() *MockRoutine { // methods delegate to the given implementation, unless overwritten. func NewMockRoutineFrom(i Routine) *MockRoutine { return &MockRoutine{ + NameFunc: &RoutineNameFunc{ + defaultHook: i.Name, + }, StartFunc: &RoutineStartFunc{ defaultHook: i.Start, }, @@ -67,6 +86,104 @@ func NewMockRoutineFrom(i Routine) *MockRoutine { } } +// RoutineNameFunc describes the behavior when the Name method of the parent +// MockRoutine instance is invoked. +type RoutineNameFunc struct { + defaultHook func() string + hooks []func() string + history []RoutineNameFuncCall + mutex sync.Mutex +} + +// Name delegates to the next hook function in the queue and stores the +// parameter and result values of this invocation. +func (m *MockRoutine) Name() string { + r0 := m.NameFunc.nextHook()() + m.NameFunc.appendCall(RoutineNameFuncCall{r0}) + return r0 +} + +// SetDefaultHook sets function that is called when the Name method of the +// parent MockRoutine instance is invoked and the hook queue is empty. +func (f *RoutineNameFunc) SetDefaultHook(hook func() string) { + f.defaultHook = hook +} + +// PushHook adds a function to the end of hook queue. Each invocation of the +// Name method of the parent MockRoutine instance invokes the hook at the +// front of the queue and discards it. After the queue is empty, the default +// hook function is invoked for any future action. +func (f *RoutineNameFunc) PushHook(hook func() string) { + f.mutex.Lock() + f.hooks = append(f.hooks, hook) + f.mutex.Unlock() +} + +// SetDefaultReturn calls SetDefaultHook with a function that returns the +// given values. +func (f *RoutineNameFunc) SetDefaultReturn(r0 string) { + f.SetDefaultHook(func() string { + return r0 + }) +} + +// PushReturn calls PushHook with a function that returns the given values. +func (f *RoutineNameFunc) PushReturn(r0 string) { + f.PushHook(func() string { + return r0 + }) +} + +func (f *RoutineNameFunc) nextHook() func() string { + f.mutex.Lock() + defer f.mutex.Unlock() + + if len(f.hooks) == 0 { + return f.defaultHook + } + + hook := f.hooks[0] + f.hooks = f.hooks[1:] + return hook +} + +func (f *RoutineNameFunc) appendCall(r0 RoutineNameFuncCall) { + f.mutex.Lock() + f.history = append(f.history, r0) + f.mutex.Unlock() +} + +// History returns a sequence of RoutineNameFuncCall objects describing the +// invocations of this function. +func (f *RoutineNameFunc) History() []RoutineNameFuncCall { + f.mutex.Lock() + history := make([]RoutineNameFuncCall, len(f.history)) + copy(history, f.history) + f.mutex.Unlock() + + return history +} + +// RoutineNameFuncCall is an object that describes an invocation of method +// Name on an instance of MockRoutine. +type RoutineNameFuncCall struct { + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 string +} + +// Args returns an interface slice containing the arguments of this +// invocation. +func (c RoutineNameFuncCall) Args() []interface{} { + return []interface{}{} +} + +// Results returns an interface slice containing the results of this +// invocation. +func (c RoutineNameFuncCall) Results() []interface{} { + return []interface{}{c.Result0} +} + // RoutineStartFunc describes the behavior when the Start method of the // parent MockRoutine instance is invoked. type RoutineStartFunc struct { @@ -164,23 +281,23 @@ func (c RoutineStartFuncCall) Results() []interface{} { // RoutineStopFunc describes the behavior when the Stop method of the parent // MockRoutine instance is invoked. type RoutineStopFunc struct { - defaultHook func() - hooks []func() + defaultHook func(context.Context) error + hooks []func(context.Context) error history []RoutineStopFuncCall mutex sync.Mutex } // Stop delegates to the next hook function in the queue and stores the // parameter and result values of this invocation. -func (m *MockRoutine) Stop() { - m.StopFunc.nextHook()() - m.StopFunc.appendCall(RoutineStopFuncCall{}) - return +func (m *MockRoutine) Stop(v0 context.Context) error { + r0 := m.StopFunc.nextHook()(v0) + m.StopFunc.appendCall(RoutineStopFuncCall{v0, r0}) + return r0 } // SetDefaultHook sets function that is called when the Stop method of the // parent MockRoutine instance is invoked and the hook queue is empty. -func (f *RoutineStopFunc) SetDefaultHook(hook func()) { +func (f *RoutineStopFunc) SetDefaultHook(hook func(context.Context) error) { f.defaultHook = hook } @@ -188,7 +305,7 @@ func (f *RoutineStopFunc) SetDefaultHook(hook func()) { // Stop method of the parent MockRoutine instance invokes the hook at the // front of the queue and discards it. After the queue is empty, the default // hook function is invoked for any future action. -func (f *RoutineStopFunc) PushHook(hook func()) { +func (f *RoutineStopFunc) PushHook(hook func(context.Context) error) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -196,20 +313,20 @@ func (f *RoutineStopFunc) PushHook(hook func()) { // SetDefaultReturn calls SetDefaultHook with a function that returns the // given values. -func (f *RoutineStopFunc) SetDefaultReturn() { - f.SetDefaultHook(func() { - return +func (f *RoutineStopFunc) SetDefaultReturn(r0 error) { + f.SetDefaultHook(func(context.Context) error { + return r0 }) } // PushReturn calls PushHook with a function that returns the given values. -func (f *RoutineStopFunc) PushReturn() { - f.PushHook(func() { - return +func (f *RoutineStopFunc) PushReturn(r0 error) { + f.PushHook(func(context.Context) error { + return r0 }) } -func (f *RoutineStopFunc) nextHook() func() { +func (f *RoutineStopFunc) nextHook() func(context.Context) error { f.mutex.Lock() defer f.mutex.Unlock() @@ -241,16 +358,23 @@ func (f *RoutineStopFunc) History() []RoutineStopFuncCall { // RoutineStopFuncCall is an object that describes an invocation of method // Stop on an instance of MockRoutine. -type RoutineStopFuncCall struct{} +type RoutineStopFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 error +} // Args returns an interface slice containing the arguments of this // invocation. func (c RoutineStopFuncCall) Args() []interface{} { - return []interface{}{} + return []interface{}{c.Arg0} } // Results returns an interface slice containing the results of this // invocation. func (c RoutineStopFuncCall) Results() []interface{} { - return []interface{}{} + return []interface{}{c.Result0} } diff --git a/lib/managedservicesplatform/go.sum b/lib/managedservicesplatform/go.sum index d3f3f58b168..7e40bb8b71a 100644 --- a/lib/managedservicesplatform/go.sum +++ b/lib/managedservicesplatform/go.sum @@ -71,6 +71,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/derision-test/go-mockgen/v2 v2.0.1 h1:2rzcJH6DT2jVfOt2spkmRJ8J6vQr9RQy3pXlQZ2pxNs= +github.com/derision-test/go-mockgen/v2 v2.0.1/go.mod h1:1FvbovTCW7wmMxpBu2LtbhRgbX36MC5LLa6dw1GIhGc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/lib/managedservicesplatform/runtime/service.go b/lib/managedservicesplatform/runtime/service.go index 6b257abee26..a5782be2f89 100644 --- a/lib/managedservicesplatform/runtime/service.go +++ b/lib/managedservicesplatform/runtime/service.go @@ -123,6 +123,9 @@ func Start[ log.Int("port", ctr.Port), log.Bool("msp", ctr.MSP), log.Bool("sentry", sentryEnabled)) - background.Monitor(ctx, routine) + err = background.Monitor(ctx, routine) + if err != nil { + startLogger.Error("error stopping service routine", log.Error(err)) + } startLogger.Info("service stopped") }