mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 17:31:43 +00:00
leader: Add a leader election package (#12289)
* rcache: Add options param to TryAcquireMutex * leader: Add a leader package A small helper package that wraps our Redis based mutex to allow only a single instance of a function to run concurrently. * Update CODEOWNERS * Update internal/rcache/mutex_test.go Co-authored-by: Asdine El Hrychy <asdine.elhrychy@gmail.com> * No need for release func The outler leader loop can release when the worker function returns * Add jitter and respect ctx during sleep * Make worker fn to last parameter Makes it easier to use especially when fn is anonymous * Improve test Co-authored-by: Asdine El Hrychy <asdine.elhrychy@gmail.com>
This commit is contained in:
parent
f3d88f690a
commit
5cf616b29a
1
.github/CODEOWNERS
vendored
1
.github/CODEOWNERS
vendored
@ -216,6 +216,7 @@ Dockerfile @sourcegraph/distribution
|
||||
# Backend shared packages
|
||||
/internal/endpoint/ @keegancsmith @slimsag
|
||||
/internal/rcache/ @keegancsmith
|
||||
/internal/leader @sourcegraph/cloud
|
||||
/internal/redispool/ @keegancsmith
|
||||
/internal/store/ @keegancsmith
|
||||
/internal/metrics @keegancsmith @slimsag
|
||||
|
||||
56
internal/leader/leader.go
Normal file
56
internal/leader/leader.go
Normal file
@ -0,0 +1,56 @@
|
||||
package leader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/rcache"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultAcquireInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
// AcquireInterval defines how frequently we should attempt to acquire
|
||||
// leadership when not the leader.
|
||||
AcquireInterval time.Duration
|
||||
MutexOptions rcache.MutexOptions
|
||||
}
|
||||
|
||||
// Do will ensure that only one instance of workFn is running globally per key at any point using a mutex
|
||||
// stored in Redis.
|
||||
// workFn could lose leadership at any point so it is important that the supplied context is checked before performing
|
||||
// any work that should not run in parallel with another worker.
|
||||
// release can be called from within workFn to explicitly release the lock.
|
||||
func Do(parentCtx context.Context, key string, options Options, workFn func(ctx context.Context)) {
|
||||
if options.AcquireInterval == 0 {
|
||||
options.AcquireInterval = defaultAcquireInterval
|
||||
}
|
||||
for {
|
||||
if parentCtx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel, ok := rcache.TryAcquireMutex(parentCtx, key, options.MutexOptions)
|
||||
if !ok {
|
||||
select {
|
||||
case <-parentCtx.Done():
|
||||
return
|
||||
case <-time.After(jitter(options.AcquireInterval)):
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
func() {
|
||||
defer cancel()
|
||||
workFn(ctx)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// jitter returns the base duration increased by a random amount of up to 25%
|
||||
func jitter(base time.Duration) time.Duration {
|
||||
return base + time.Duration(rand.Int63n(int64(base/4)))
|
||||
}
|
||||
66
internal/leader/leader_test.go
Normal file
66
internal/leader/leader_test.go
Normal file
@ -0,0 +1,66 @@
|
||||
package leader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/rcache"
|
||||
)
|
||||
|
||||
func TestDoWhileLeader(t *testing.T) {
|
||||
rcache.SetupForTest(t)
|
||||
|
||||
key := "test-leader"
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// In case we don't make it to cancel lower down
|
||||
t.Cleanup(cancel)
|
||||
|
||||
var count int64
|
||||
|
||||
fn := func(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
count++
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
options := Options{
|
||||
AcquireInterval: 50 * time.Millisecond,
|
||||
MutexOptions: rcache.MutexOptions{
|
||||
Tries: 1,
|
||||
RetryDelay: 10 * time.Millisecond,
|
||||
},
|
||||
}
|
||||
|
||||
cancelled := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
Do(ctx, key, options, fn)
|
||||
cancelled <- struct{}{}
|
||||
}()
|
||||
go func() {
|
||||
Do(ctx, key, options, fn)
|
||||
cancelled <- struct{}{}
|
||||
}()
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
cancel()
|
||||
|
||||
if count != 1 {
|
||||
t.Fatalf("Count > 1: %d", count)
|
||||
}
|
||||
|
||||
// Check that Do exits after cancelled
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Fatal("Timeout")
|
||||
case <-cancelled:
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -9,22 +9,27 @@ import (
|
||||
"github.com/go-redsync/redsync"
|
||||
)
|
||||
|
||||
var (
|
||||
// mutexExpiry is relatively long since we currently are only using
|
||||
// locks for co-ordinating longer running processes. If we want short
|
||||
// lived granular locks, we should switch away from Redis.
|
||||
mutexExpiry = time.Minute
|
||||
// mutexTries is how many tries we have before we give up acquiring a
|
||||
// lock. We make it low since we want to give up quickly + we only
|
||||
// have a single node. So failing to acquire the lock will be
|
||||
// unrelated to failing to reach quoram. var to allow tests to
|
||||
// override.
|
||||
mutexTries = 3
|
||||
// mutexDelay is how long to sleep between attempts to lock. We use
|
||||
// the default delay.
|
||||
mutexDelay = 512 * time.Millisecond
|
||||
const (
|
||||
DefaultMutexExpiry = time.Minute
|
||||
// We make it low since we want to give up quickly. Failing to acquire the lock will be
|
||||
// unrelated to failing to reach quorum.
|
||||
DefaultMutexTries = 3
|
||||
DefaultMutexDelay = 512 * time.Millisecond
|
||||
)
|
||||
|
||||
// MutexOptions hold options passed to TryAcquireMutex. It is safe to
|
||||
// pass zero values in which case defaults will be used instead.
|
||||
type MutexOptions struct {
|
||||
// Expiry sets how long a lock should be held. Under normal
|
||||
// operation it will be extended on an interval of (Expiry / 2)
|
||||
Expiry time.Duration
|
||||
// Tries is how many tries we have before we give up acquiring a
|
||||
// lock.
|
||||
Tries int
|
||||
// RetryDelay is how long to sleep between attempts to lock
|
||||
RetryDelay time.Duration
|
||||
}
|
||||
|
||||
// TryAcquireMutex tries to Lock a distributed mutex. If the mutex is already
|
||||
// locked, it will return `ctx, nil, false`. Otherwise it returns `ctx,
|
||||
// release, true`. Release must be called to free the lock.
|
||||
@ -38,16 +43,26 @@ var (
|
||||
// they key no longer exists in Redis
|
||||
// A caller can therefore assume that they are the sole holder of the lock as long as the
|
||||
// context has not been cancelled.
|
||||
func TryAcquireMutex(ctx context.Context, name string) (context.Context, func(), bool) {
|
||||
func TryAcquireMutex(ctx context.Context, name string, options MutexOptions) (context.Context, func(), bool) {
|
||||
// We return a canceled context if we fail, so create the context here
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
if options.Expiry == 0 {
|
||||
options.Expiry = DefaultMutexExpiry
|
||||
}
|
||||
if options.Tries == 0 {
|
||||
options.Tries = DefaultMutexTries
|
||||
}
|
||||
if options.RetryDelay == 0 {
|
||||
options.RetryDelay = DefaultMutexDelay
|
||||
}
|
||||
|
||||
name = fmt.Sprintf("%s:mutex:%s", globalPrefix, name)
|
||||
mu := redsync.New([]redsync.Pool{pool}).NewMutex(
|
||||
name,
|
||||
redsync.SetExpiry(mutexExpiry),
|
||||
redsync.SetTries(mutexTries),
|
||||
redsync.SetRetryDelay(mutexDelay),
|
||||
redsync.SetExpiry(options.Expiry),
|
||||
redsync.SetTries(options.Tries),
|
||||
redsync.SetRetryDelay(options.RetryDelay),
|
||||
)
|
||||
|
||||
err := mu.Lock()
|
||||
@ -57,7 +72,7 @@ func TryAcquireMutex(ctx context.Context, name string) (context.Context, func(),
|
||||
}
|
||||
unlockedC := make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(mutexExpiry / 2)
|
||||
ticker := time.NewTicker(options.Expiry / 2)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -8,12 +8,17 @@ import (
|
||||
func TestTryAcquireMutex(t *testing.T) {
|
||||
SetupForTest(t)
|
||||
|
||||
ctx, release, ok := TryAcquireMutex(context.Background(), "test")
|
||||
options := MutexOptions{
|
||||
// Make mutex fail faster
|
||||
Tries: 1,
|
||||
}
|
||||
|
||||
ctx, release, ok := TryAcquireMutex(context.Background(), "test", options)
|
||||
if !ok {
|
||||
t.Fatalf("expected to acquire mutex")
|
||||
}
|
||||
|
||||
if _, _, ok = TryAcquireMutex(context.Background(), "test"); ok {
|
||||
if _, _, ok = TryAcquireMutex(context.Background(), "test", options); ok {
|
||||
t.Fatalf("expected to fail to acquire mutex")
|
||||
}
|
||||
|
||||
@ -24,14 +29,14 @@ func TestTryAcquireMutex(t *testing.T) {
|
||||
|
||||
// Test out if cancelling the parent context allows us to still release
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, release, ok = TryAcquireMutex(ctx, "test")
|
||||
_, release, ok = TryAcquireMutex(ctx, "test", options)
|
||||
if !ok {
|
||||
t.Fatalf("expected to acquire mutex")
|
||||
}
|
||||
cancel()
|
||||
release()
|
||||
|
||||
_, release, ok = TryAcquireMutex(context.Background(), "test")
|
||||
_, release, ok = TryAcquireMutex(context.Background(), "test", options)
|
||||
if !ok {
|
||||
t.Fatalf("expected to acquire mutex")
|
||||
}
|
||||
|
||||
@ -195,8 +195,6 @@ func SetupForTest(t TB) {
|
||||
}
|
||||
|
||||
globalPrefix = "__test__" + t.Name()
|
||||
// Make mutex fails faster
|
||||
mutexTries = 1
|
||||
c := pool.Get()
|
||||
defer c.Close()
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user