diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 28c6abb705e..d818672a11c 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -216,6 +216,7 @@ Dockerfile @sourcegraph/distribution # Backend shared packages /internal/endpoint/ @keegancsmith @slimsag /internal/rcache/ @keegancsmith +/internal/leader @sourcegraph/cloud /internal/redispool/ @keegancsmith /internal/store/ @keegancsmith /internal/metrics @keegancsmith @slimsag diff --git a/internal/leader/leader.go b/internal/leader/leader.go new file mode 100644 index 00000000000..eafb8599343 --- /dev/null +++ b/internal/leader/leader.go @@ -0,0 +1,56 @@ +package leader + +import ( + "context" + "math/rand" + "time" + + "github.com/sourcegraph/sourcegraph/internal/rcache" +) + +const ( + defaultAcquireInterval = 30 * time.Second +) + +type Options struct { + // AcquireInterval defines how frequently we should attempt to acquire + // leadership when not the leader. + AcquireInterval time.Duration + MutexOptions rcache.MutexOptions +} + +// Do will ensure that only one instance of workFn is running globally per key at any point using a mutex +// stored in Redis. +// workFn could lose leadership at any point so it is important that the supplied context is checked before performing +// any work that should not run in parallel with another worker. +// release can be called from within workFn to explicitly release the lock. +func Do(parentCtx context.Context, key string, options Options, workFn func(ctx context.Context)) { + if options.AcquireInterval == 0 { + options.AcquireInterval = defaultAcquireInterval + } + for { + if parentCtx.Err() != nil { + return + } + + ctx, cancel, ok := rcache.TryAcquireMutex(parentCtx, key, options.MutexOptions) + if !ok { + select { + case <-parentCtx.Done(): + return + case <-time.After(jitter(options.AcquireInterval)): + } + continue + } + + func() { + defer cancel() + workFn(ctx) + }() + } +} + +// jitter returns the base duration increased by a random amount of up to 25% +func jitter(base time.Duration) time.Duration { + return base + time.Duration(rand.Int63n(int64(base/4))) +} diff --git a/internal/leader/leader_test.go b/internal/leader/leader_test.go new file mode 100644 index 00000000000..9cf2b760374 --- /dev/null +++ b/internal/leader/leader_test.go @@ -0,0 +1,66 @@ +package leader + +import ( + "context" + "testing" + "time" + + "github.com/sourcegraph/sourcegraph/internal/rcache" +) + +func TestDoWhileLeader(t *testing.T) { + rcache.SetupForTest(t) + + key := "test-leader" + ctx, cancel := context.WithCancel(context.Background()) + // In case we don't make it to cancel lower down + t.Cleanup(cancel) + + var count int64 + + fn := func(ctx context.Context) { + select { + case <-ctx.Done(): + return + default: + } + count++ + <-ctx.Done() + } + + options := Options{ + AcquireInterval: 50 * time.Millisecond, + MutexOptions: rcache.MutexOptions{ + Tries: 1, + RetryDelay: 10 * time.Millisecond, + }, + } + + cancelled := make(chan struct{}) + + go func() { + Do(ctx, key, options, fn) + cancelled <- struct{}{} + }() + go func() { + Do(ctx, key, options, fn) + cancelled <- struct{}{} + }() + + time.Sleep(500 * time.Millisecond) + cancel() + + if count != 1 { + t.Fatalf("Count > 1: %d", count) + } + + // Check that Do exits after cancelled + for i := 0; i < 2; i++ { + select { + case <-time.After(500 * time.Millisecond): + t.Fatal("Timeout") + case <-cancelled: + } + } + +} diff --git a/internal/rcache/mutex.go b/internal/rcache/mutex.go index da2a8240590..4f96699f95d 100644 --- a/internal/rcache/mutex.go +++ b/internal/rcache/mutex.go @@ -9,22 +9,27 @@ import ( "github.com/go-redsync/redsync" ) -var ( - // mutexExpiry is relatively long since we currently are only using - // locks for co-ordinating longer running processes. If we want short - // lived granular locks, we should switch away from Redis. - mutexExpiry = time.Minute - // mutexTries is how many tries we have before we give up acquiring a - // lock. We make it low since we want to give up quickly + we only - // have a single node. So failing to acquire the lock will be - // unrelated to failing to reach quoram. var to allow tests to - // override. - mutexTries = 3 - // mutexDelay is how long to sleep between attempts to lock. We use - // the default delay. - mutexDelay = 512 * time.Millisecond +const ( + DefaultMutexExpiry = time.Minute + // We make it low since we want to give up quickly. Failing to acquire the lock will be + // unrelated to failing to reach quorum. + DefaultMutexTries = 3 + DefaultMutexDelay = 512 * time.Millisecond ) +// MutexOptions hold options passed to TryAcquireMutex. It is safe to +// pass zero values in which case defaults will be used instead. +type MutexOptions struct { + // Expiry sets how long a lock should be held. Under normal + // operation it will be extended on an interval of (Expiry / 2) + Expiry time.Duration + // Tries is how many tries we have before we give up acquiring a + // lock. + Tries int + // RetryDelay is how long to sleep between attempts to lock + RetryDelay time.Duration +} + // TryAcquireMutex tries to Lock a distributed mutex. If the mutex is already // locked, it will return `ctx, nil, false`. Otherwise it returns `ctx, // release, true`. Release must be called to free the lock. @@ -38,16 +43,26 @@ var ( // they key no longer exists in Redis // A caller can therefore assume that they are the sole holder of the lock as long as the // context has not been cancelled. -func TryAcquireMutex(ctx context.Context, name string) (context.Context, func(), bool) { +func TryAcquireMutex(ctx context.Context, name string, options MutexOptions) (context.Context, func(), bool) { // We return a canceled context if we fail, so create the context here ctx, cancel := context.WithCancel(ctx) + if options.Expiry == 0 { + options.Expiry = DefaultMutexExpiry + } + if options.Tries == 0 { + options.Tries = DefaultMutexTries + } + if options.RetryDelay == 0 { + options.RetryDelay = DefaultMutexDelay + } + name = fmt.Sprintf("%s:mutex:%s", globalPrefix, name) mu := redsync.New([]redsync.Pool{pool}).NewMutex( name, - redsync.SetExpiry(mutexExpiry), - redsync.SetTries(mutexTries), - redsync.SetRetryDelay(mutexDelay), + redsync.SetExpiry(options.Expiry), + redsync.SetTries(options.Tries), + redsync.SetRetryDelay(options.RetryDelay), ) err := mu.Lock() @@ -57,7 +72,7 @@ func TryAcquireMutex(ctx context.Context, name string) (context.Context, func(), } unlockedC := make(chan struct{}) go func() { - ticker := time.NewTicker(mutexExpiry / 2) + ticker := time.NewTicker(options.Expiry / 2) for { select { case <-ctx.Done(): diff --git a/internal/rcache/mutex_test.go b/internal/rcache/mutex_test.go index 7adda9cf96d..5e1e3e2c585 100644 --- a/internal/rcache/mutex_test.go +++ b/internal/rcache/mutex_test.go @@ -8,12 +8,17 @@ import ( func TestTryAcquireMutex(t *testing.T) { SetupForTest(t) - ctx, release, ok := TryAcquireMutex(context.Background(), "test") + options := MutexOptions{ + // Make mutex fail faster + Tries: 1, + } + + ctx, release, ok := TryAcquireMutex(context.Background(), "test", options) if !ok { t.Fatalf("expected to acquire mutex") } - if _, _, ok = TryAcquireMutex(context.Background(), "test"); ok { + if _, _, ok = TryAcquireMutex(context.Background(), "test", options); ok { t.Fatalf("expected to fail to acquire mutex") } @@ -24,14 +29,14 @@ func TestTryAcquireMutex(t *testing.T) { // Test out if cancelling the parent context allows us to still release ctx, cancel := context.WithCancel(context.Background()) - _, release, ok = TryAcquireMutex(ctx, "test") + _, release, ok = TryAcquireMutex(ctx, "test", options) if !ok { t.Fatalf("expected to acquire mutex") } cancel() release() - _, release, ok = TryAcquireMutex(context.Background(), "test") + _, release, ok = TryAcquireMutex(context.Background(), "test", options) if !ok { t.Fatalf("expected to acquire mutex") } diff --git a/internal/rcache/rcache.go b/internal/rcache/rcache.go index 774a738ec8a..3fdd415f46c 100644 --- a/internal/rcache/rcache.go +++ b/internal/rcache/rcache.go @@ -195,8 +195,6 @@ func SetupForTest(t TB) { } globalPrefix = "__test__" + t.Name() - // Make mutex fails faster - mutexTries = 1 c := pool.Get() defer c.Close()