pings, telemetry-gateway: stop pubsub client before stopping server (#60451)

Possible alleviation to recent errors like https://sourcegraph.slack.com/archives/C06CCJR4K9R/p1707434020627279 and https://sourcegraph.slack.com/archives/C06CCJR4K9R/p1707772710344069 - it's possible that server stops before the pub/sub client, cancelling the context of ongoing requests and causing them to be interrupted. This could possibly be fixed by ensuring the pub/sub client clears its backlog before we stop the server.

## Test plan

Unit tests on the new LIFOStopRoutine:

```
go test -race -count=100 -run ^TestLIFOStopRoutine$ github.com/sourcegraph/sourcegraph/lib/background
```
This commit is contained in:
Robert Lin 2024-02-13 23:33:31 -08:00 committed by GitHub
parent 46e107a3fb
commit 9ef8f45559
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 43 additions and 7 deletions

View File

@ -35,7 +35,7 @@ func (s Service) Initialize(
logger log.Logger,
contract runtime.Contract,
config Config,
) (background.CombinedRoutine, error) {
) (background.Routine, error) {
logger.Info("starting service")
if !config.StatelessMode {

View File

@ -28,7 +28,7 @@ var _ runtime.Service[Config] = (*Service)(nil)
func (Service) Name() string { return "pings" }
func (Service) Version() string { return version.Version() }
func (Service) Initialize(ctx context.Context, logger log.Logger, contract runtime.Contract, config Config) (background.CombinedRoutine, error) {
func (Service) Initialize(ctx context.Context, logger log.Logger, contract runtime.Contract, config Config) (background.Routine, error) {
pubsubClient, err := pubsub.NewTopicClient(config.PubSub.ProjectID, config.PubSub.TopicID)
if err != nil {
return nil, errors.Errorf("create Pub/Sub client: %v", err)
@ -48,7 +48,7 @@ func (Service) Initialize(ctx context.Context, logger log.Logger, contract runti
pubsubClient: pubsubClient,
})
return background.CombinedRoutine{
return background.LIFOStopRoutine{
httpserver.NewFromAddr(
fmt.Sprintf(":%d", contract.Port),
&http.Server{

View File

@ -33,7 +33,7 @@ var _ runtime.Service[Config] = (*Service)(nil)
func (Service) Name() string { return "telemetry-gateway" }
func (Service) Version() string { return version.Version() }
func (Service) Initialize(ctx context.Context, logger log.Logger, contract runtime.Contract, config Config) (background.CombinedRoutine, error) {
func (Service) Initialize(ctx context.Context, logger log.Logger, contract runtime.Contract, config Config) (background.Routine, error) {
// We use Sourcegraph tracing code, so explicitly configure a trace policy
policy.SetTracePolicy(policy.TraceAll)
@ -77,7 +77,7 @@ func (Service) Initialize(ctx context.Context, logger log.Logger, contract runti
diagnosticsServer.Handle(grpcUI.Path, grpcUI.Handler)
}
return background.CombinedRoutine{
return background.LIFOStopRoutine{
httpserver.NewFromAddr(
listenAddr,
&http.Server{

View File

@ -19,4 +19,5 @@ go_test(
"mocks_test.go",
],
embed = [":background"],
deps = ["@com_github_stretchr_testify//assert"],
)

View File

@ -104,6 +104,22 @@ func (r CombinedRoutine) Stop() {
wg.Wait()
}
// LIFOStopRoutine is a list of routines which are started in unison, but stopped
// sequentially last-in-first-out (the last Routine is stopped, and once it
// successfully stops, the next routine is stopped).
//
// This is useful for services where subprocessors should be stopped before the
// primary service stops for a graceful shutdown.
type LIFOStopRoutine []Routine
func (r LIFOStopRoutine) Start() { CombinedRoutine(r).Start() }
func (r LIFOStopRoutine) Stop() {
for i := len(r) - 1; i >= 0; i -= 1 {
r[i].Stop()
}
}
// NoopRoutine does nothing for start or stop.
func NoopRoutine() Routine {
return CallbackRoutine{}

View File

@ -5,6 +5,8 @@ import (
"os"
"syscall"
"testing"
"github.com/stretchr/testify/assert"
)
// Make the exiter a no-op in tests
@ -65,3 +67,19 @@ func TestMonitorBackgroundRoutinesContextCancel(t *testing.T) {
}
}
}
func TestLIFOStopRoutine(t *testing.T) {
// use an unguarded slice because LIFOStopRoutine should only stop in sequence
var stopped []string
r1 := NewMockRoutine()
r1.StopFunc.PushHook(func() { stopped = append(stopped, "r1") })
r2 := NewMockRoutine()
r2.StopFunc.PushHook(func() { stopped = append(stopped, "r2") })
r3 := NewMockRoutine()
r3.StopFunc.PushHook(func() { stopped = append(stopped, "r3") })
r := LIFOStopRoutine{r1, r2, r3}
r.Stop()
// stops in reverse
assert.Equal(t, []string{"r3", "r2", "r1"}, stopped)
}

View File

@ -19,13 +19,14 @@ type ServiceMetadata interface {
type Service[ConfigT any] interface {
ServiceMetadata
// Initialize should use given configuration to build a combined background
// routine that implements starting and stopping the service.
// routine (such as background.CombinedRoutine or background.LIFOStopRoutine)
// that implements starting and stopping the service.
Initialize(
ctx context.Context,
logger log.Logger,
contract Contract,
config ConfigT,
) (background.CombinedRoutine, error)
) (background.Routine, error)
}
// Start handles the entire lifecycle of the program running Service, and should