mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 19:21:50 +00:00
redispool: use postgres for redispool.Store in App (#47188)
We introduce a new table "redis_key_value" which is written to by KeyValue. The DB backed KeyValue store needs to wait for the database to be ready. However, Store is a package global. As such we use a pattern were we register the DB into redispool once the DB is ready. Before that all the KeyValue operations will return an error. Test Plan: go test and manual testing of Sourcegraph App.
This commit is contained in:
parent
a879b16d03
commit
35497d5527
@ -115,6 +115,11 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
|
||||
}
|
||||
}
|
||||
|
||||
// After our DB, redis is our next most important datastore
|
||||
if err := redispoolRegisterDB(db); err != nil {
|
||||
return errors.Wrap(err, "failed to register postgres backed redis")
|
||||
}
|
||||
|
||||
// override site config first
|
||||
if err := overrideSiteConfig(ctx, logger, db); err != nil {
|
||||
return errors.Wrap(err, "failed to apply site config overrides")
|
||||
@ -380,3 +385,15 @@ func makeRateLimitWatcher() (*graphqlbackend.BasicLimitWatcher, error) {
|
||||
|
||||
return graphqlbackend.NewBasicLimitWatcher(sglog.Scoped("BasicLimitWatcher", "basic rate-limiter"), store), nil
|
||||
}
|
||||
|
||||
// redispoolRegisterDB registers our postgres backed redis. These package
|
||||
// avoid depending on each other, hence the wrapping to get Go to play nice
|
||||
// with the interface definitions.
|
||||
func redispoolRegisterDB(db database.DB) error {
|
||||
kvNoTX := db.RedisKeyValue()
|
||||
return redispool.DBRegisterStore(func(ctx context.Context, f func(redispool.DBStore) error) error {
|
||||
return kvNoTX.WithTransact(ctx, func(tx database.RedisKeyValueStore) error {
|
||||
return f(tx)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@ -6901,6 +6901,9 @@ type MockEnterpriseDB struct {
|
||||
// QueryRowContextFunc is an instance of a mock function object
|
||||
// controlling the behavior of the method QueryRowContext.
|
||||
QueryRowContextFunc *EnterpriseDBQueryRowContextFunc
|
||||
// RedisKeyValueFunc is an instance of a mock function object
|
||||
// controlling the behavior of the method RedisKeyValue.
|
||||
RedisKeyValueFunc *EnterpriseDBRedisKeyValueFunc
|
||||
// RepoKVPsFunc is an instance of a mock function object controlling the
|
||||
// behavior of the method RepoKVPs.
|
||||
RepoKVPsFunc *EnterpriseDBRepoKVPsFunc
|
||||
@ -7115,6 +7118,11 @@ func NewMockEnterpriseDB() *MockEnterpriseDB {
|
||||
return
|
||||
},
|
||||
},
|
||||
RedisKeyValueFunc: &EnterpriseDBRedisKeyValueFunc{
|
||||
defaultHook: func() (r0 database.RedisKeyValueStore) {
|
||||
return
|
||||
},
|
||||
},
|
||||
RepoKVPsFunc: &EnterpriseDBRepoKVPsFunc{
|
||||
defaultHook: func() (r0 database.RepoKVPStore) {
|
||||
return
|
||||
@ -7372,6 +7380,11 @@ func NewStrictMockEnterpriseDB() *MockEnterpriseDB {
|
||||
panic("unexpected invocation of MockEnterpriseDB.QueryRowContext")
|
||||
},
|
||||
},
|
||||
RedisKeyValueFunc: &EnterpriseDBRedisKeyValueFunc{
|
||||
defaultHook: func() database.RedisKeyValueStore {
|
||||
panic("unexpected invocation of MockEnterpriseDB.RedisKeyValue")
|
||||
},
|
||||
},
|
||||
RepoKVPsFunc: &EnterpriseDBRepoKVPsFunc{
|
||||
defaultHook: func() database.RepoKVPStore {
|
||||
panic("unexpected invocation of MockEnterpriseDB.RepoKVPs")
|
||||
@ -7572,6 +7585,9 @@ func NewMockEnterpriseDBFrom(i EnterpriseDB) *MockEnterpriseDB {
|
||||
QueryRowContextFunc: &EnterpriseDBQueryRowContextFunc{
|
||||
defaultHook: i.QueryRowContext,
|
||||
},
|
||||
RedisKeyValueFunc: &EnterpriseDBRedisKeyValueFunc{
|
||||
defaultHook: i.RedisKeyValue,
|
||||
},
|
||||
RepoKVPsFunc: &EnterpriseDBRepoKVPsFunc{
|
||||
defaultHook: i.RepoKVPs,
|
||||
},
|
||||
@ -10593,6 +10609,105 @@ func (c EnterpriseDBQueryRowContextFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0}
|
||||
}
|
||||
|
||||
// EnterpriseDBRedisKeyValueFunc describes the behavior when the
|
||||
// RedisKeyValue method of the parent MockEnterpriseDB instance is invoked.
|
||||
type EnterpriseDBRedisKeyValueFunc struct {
|
||||
defaultHook func() database.RedisKeyValueStore
|
||||
hooks []func() database.RedisKeyValueStore
|
||||
history []EnterpriseDBRedisKeyValueFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// RedisKeyValue delegates to the next hook function in the queue and stores
|
||||
// the parameter and result values of this invocation.
|
||||
func (m *MockEnterpriseDB) RedisKeyValue() database.RedisKeyValueStore {
|
||||
r0 := m.RedisKeyValueFunc.nextHook()()
|
||||
m.RedisKeyValueFunc.appendCall(EnterpriseDBRedisKeyValueFuncCall{r0})
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the RedisKeyValue method
|
||||
// of the parent MockEnterpriseDB instance is invoked and the hook queue is
|
||||
// empty.
|
||||
func (f *EnterpriseDBRedisKeyValueFunc) SetDefaultHook(hook func() database.RedisKeyValueStore) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
// PushHook adds a function to the end of hook queue. Each invocation of the
|
||||
// RedisKeyValue method of the parent MockEnterpriseDB instance invokes the
|
||||
// hook at the front of the queue and discards it. After the queue is empty,
|
||||
// the default hook function is invoked for any future action.
|
||||
func (f *EnterpriseDBRedisKeyValueFunc) PushHook(hook func() database.RedisKeyValueStore) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *EnterpriseDBRedisKeyValueFunc) SetDefaultReturn(r0 database.RedisKeyValueStore) {
|
||||
f.SetDefaultHook(func() database.RedisKeyValueStore {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *EnterpriseDBRedisKeyValueFunc) PushReturn(r0 database.RedisKeyValueStore) {
|
||||
f.PushHook(func() database.RedisKeyValueStore {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
func (f *EnterpriseDBRedisKeyValueFunc) nextHook() func() database.RedisKeyValueStore {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
if len(f.hooks) == 0 {
|
||||
return f.defaultHook
|
||||
}
|
||||
|
||||
hook := f.hooks[0]
|
||||
f.hooks = f.hooks[1:]
|
||||
return hook
|
||||
}
|
||||
|
||||
func (f *EnterpriseDBRedisKeyValueFunc) appendCall(r0 EnterpriseDBRedisKeyValueFuncCall) {
|
||||
f.mutex.Lock()
|
||||
f.history = append(f.history, r0)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// History returns a sequence of EnterpriseDBRedisKeyValueFuncCall objects
|
||||
// describing the invocations of this function.
|
||||
func (f *EnterpriseDBRedisKeyValueFunc) History() []EnterpriseDBRedisKeyValueFuncCall {
|
||||
f.mutex.Lock()
|
||||
history := make([]EnterpriseDBRedisKeyValueFuncCall, len(f.history))
|
||||
copy(history, f.history)
|
||||
f.mutex.Unlock()
|
||||
|
||||
return history
|
||||
}
|
||||
|
||||
// EnterpriseDBRedisKeyValueFuncCall is an object that describes an
|
||||
// invocation of method RedisKeyValue on an instance of MockEnterpriseDB.
|
||||
type EnterpriseDBRedisKeyValueFuncCall struct {
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 database.RedisKeyValueStore
|
||||
}
|
||||
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c EnterpriseDBRedisKeyValueFuncCall) Args() []interface{} {
|
||||
return []interface{}{}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
// invocation.
|
||||
func (c EnterpriseDBRedisKeyValueFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0}
|
||||
}
|
||||
|
||||
// EnterpriseDBRepoKVPsFunc describes the behavior when the RepoKVPs method
|
||||
// of the parent MockEnterpriseDB instance is invoked.
|
||||
type EnterpriseDBRepoKVPsFunc struct {
|
||||
|
||||
@ -41,6 +41,7 @@ type DB interface {
|
||||
Permissions() PermissionStore
|
||||
PermissionSyncJobs() PermissionSyncJobStore
|
||||
Phabricator() PhabricatorStore
|
||||
RedisKeyValue() RedisKeyValueStore
|
||||
Repos() RepoStore
|
||||
RepoKVPs() RepoKVPStore
|
||||
RolePermissions() RolePermissionStore
|
||||
@ -200,6 +201,10 @@ func (d *db) Phabricator() PhabricatorStore {
|
||||
return PhabricatorWith(d.Store)
|
||||
}
|
||||
|
||||
func (d *db) RedisKeyValue() RedisKeyValueStore {
|
||||
return &redisKeyValueStore{d.Store}
|
||||
}
|
||||
|
||||
func (d *db) Repos() RepoStore {
|
||||
return ReposWith(d.logger, d.Store)
|
||||
}
|
||||
|
||||
@ -3828,6 +3828,9 @@ type MockDB struct {
|
||||
// QueryRowContextFunc is an instance of a mock function object
|
||||
// controlling the behavior of the method QueryRowContext.
|
||||
QueryRowContextFunc *DBQueryRowContextFunc
|
||||
// RedisKeyValueFunc is an instance of a mock function object
|
||||
// controlling the behavior of the method RedisKeyValue.
|
||||
RedisKeyValueFunc *DBRedisKeyValueFunc
|
||||
// RepoKVPsFunc is an instance of a mock function object controlling the
|
||||
// behavior of the method RepoKVPs.
|
||||
RepoKVPsFunc *DBRepoKVPsFunc
|
||||
@ -4029,6 +4032,11 @@ func NewMockDB() *MockDB {
|
||||
return
|
||||
},
|
||||
},
|
||||
RedisKeyValueFunc: &DBRedisKeyValueFunc{
|
||||
defaultHook: func() (r0 RedisKeyValueStore) {
|
||||
return
|
||||
},
|
||||
},
|
||||
RepoKVPsFunc: &DBRepoKVPsFunc{
|
||||
defaultHook: func() (r0 RepoKVPStore) {
|
||||
return
|
||||
@ -4271,6 +4279,11 @@ func NewStrictMockDB() *MockDB {
|
||||
panic("unexpected invocation of MockDB.QueryRowContext")
|
||||
},
|
||||
},
|
||||
RedisKeyValueFunc: &DBRedisKeyValueFunc{
|
||||
defaultHook: func() RedisKeyValueStore {
|
||||
panic("unexpected invocation of MockDB.RedisKeyValue")
|
||||
},
|
||||
},
|
||||
RepoKVPsFunc: &DBRepoKVPsFunc{
|
||||
defaultHook: func() RepoKVPStore {
|
||||
panic("unexpected invocation of MockDB.RepoKVPs")
|
||||
@ -4459,6 +4472,9 @@ func NewMockDBFrom(i DB) *MockDB {
|
||||
QueryRowContextFunc: &DBQueryRowContextFunc{
|
||||
defaultHook: i.QueryRowContext,
|
||||
},
|
||||
RedisKeyValueFunc: &DBRedisKeyValueFunc{
|
||||
defaultHook: i.RedisKeyValue,
|
||||
},
|
||||
RepoKVPsFunc: &DBRepoKVPsFunc{
|
||||
defaultHook: i.RepoKVPs,
|
||||
},
|
||||
@ -7248,6 +7264,104 @@ func (c DBQueryRowContextFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0}
|
||||
}
|
||||
|
||||
// DBRedisKeyValueFunc describes the behavior when the RedisKeyValue method
|
||||
// of the parent MockDB instance is invoked.
|
||||
type DBRedisKeyValueFunc struct {
|
||||
defaultHook func() RedisKeyValueStore
|
||||
hooks []func() RedisKeyValueStore
|
||||
history []DBRedisKeyValueFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// RedisKeyValue delegates to the next hook function in the queue and stores
|
||||
// the parameter and result values of this invocation.
|
||||
func (m *MockDB) RedisKeyValue() RedisKeyValueStore {
|
||||
r0 := m.RedisKeyValueFunc.nextHook()()
|
||||
m.RedisKeyValueFunc.appendCall(DBRedisKeyValueFuncCall{r0})
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the RedisKeyValue method
|
||||
// of the parent MockDB instance is invoked and the hook queue is empty.
|
||||
func (f *DBRedisKeyValueFunc) SetDefaultHook(hook func() RedisKeyValueStore) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
// PushHook adds a function to the end of hook queue. Each invocation of the
|
||||
// RedisKeyValue method of the parent MockDB instance invokes the hook at
|
||||
// the front of the queue and discards it. After the queue is empty, the
|
||||
// default hook function is invoked for any future action.
|
||||
func (f *DBRedisKeyValueFunc) PushHook(hook func() RedisKeyValueStore) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *DBRedisKeyValueFunc) SetDefaultReturn(r0 RedisKeyValueStore) {
|
||||
f.SetDefaultHook(func() RedisKeyValueStore {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *DBRedisKeyValueFunc) PushReturn(r0 RedisKeyValueStore) {
|
||||
f.PushHook(func() RedisKeyValueStore {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
func (f *DBRedisKeyValueFunc) nextHook() func() RedisKeyValueStore {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
if len(f.hooks) == 0 {
|
||||
return f.defaultHook
|
||||
}
|
||||
|
||||
hook := f.hooks[0]
|
||||
f.hooks = f.hooks[1:]
|
||||
return hook
|
||||
}
|
||||
|
||||
func (f *DBRedisKeyValueFunc) appendCall(r0 DBRedisKeyValueFuncCall) {
|
||||
f.mutex.Lock()
|
||||
f.history = append(f.history, r0)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// History returns a sequence of DBRedisKeyValueFuncCall objects describing
|
||||
// the invocations of this function.
|
||||
func (f *DBRedisKeyValueFunc) History() []DBRedisKeyValueFuncCall {
|
||||
f.mutex.Lock()
|
||||
history := make([]DBRedisKeyValueFuncCall, len(f.history))
|
||||
copy(history, f.history)
|
||||
f.mutex.Unlock()
|
||||
|
||||
return history
|
||||
}
|
||||
|
||||
// DBRedisKeyValueFuncCall is an object that describes an invocation of
|
||||
// method RedisKeyValue on an instance of MockDB.
|
||||
type DBRedisKeyValueFuncCall struct {
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 RedisKeyValueStore
|
||||
}
|
||||
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c DBRedisKeyValueFuncCall) Args() []interface{} {
|
||||
return []interface{}{}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
// invocation.
|
||||
func (c DBRedisKeyValueFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0}
|
||||
}
|
||||
|
||||
// DBRepoKVPsFunc describes the behavior when the RepoKVPs method of the
|
||||
// parent MockDB instance is invoked.
|
||||
type DBRepoKVPsFunc struct {
|
||||
|
||||
81
internal/database/redis_key_value.go
Normal file
81
internal/database/redis_key_value.go
Normal file
@ -0,0 +1,81 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/keegancsmith/sqlf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
)
|
||||
|
||||
// RedisKeyValueStore is a store that exists to satisfy the interface
|
||||
// redispool.DBStore. This is the interface that is needed to replace redis
|
||||
// with postgres.
|
||||
//
|
||||
// We do not directly implement the interface since that introduces
|
||||
// complications around dependency graphs.
|
||||
type RedisKeyValueStore interface {
|
||||
basestore.ShareableStore
|
||||
WithTransact(context.Context, func(RedisKeyValueStore) error) error
|
||||
Get(ctx context.Context, namespace, key string) (value []byte, ok bool, err error)
|
||||
Set(ctx context.Context, namespace, key string, value []byte) (err error)
|
||||
Delete(ctx context.Context, namespace, key string) (err error)
|
||||
}
|
||||
|
||||
type redisKeyValueStore struct {
|
||||
*basestore.Store
|
||||
}
|
||||
|
||||
var _ RedisKeyValueStore = (*redisKeyValueStore)(nil)
|
||||
|
||||
func (f *redisKeyValueStore) WithTransact(ctx context.Context, fn func(RedisKeyValueStore) error) error {
|
||||
return f.Store.WithTransact(ctx, func(tx *basestore.Store) error {
|
||||
return fn(&redisKeyValueStore{Store: tx})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *redisKeyValueStore) Get(ctx context.Context, namespace, key string) ([]byte, bool, error) {
|
||||
// redispool will often follow up a Get with a Set (eg for implementing
|
||||
// redis INCR). As such we need to lock the row with FOR UPDATE.
|
||||
q := sqlf.Sprintf(`
|
||||
SELECT value FROM redis_key_value
|
||||
WHERE namespace = %s AND key = %s
|
||||
FOR UPDATE
|
||||
`, namespace, key)
|
||||
row := s.QueryRow(ctx, q)
|
||||
|
||||
var value []byte
|
||||
err := row.Scan(&value)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, false, nil
|
||||
} else if err != nil {
|
||||
return nil, false, err
|
||||
} else {
|
||||
return value, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *redisKeyValueStore) Set(ctx context.Context, namespace, key string, value []byte) error {
|
||||
// value schema does not allow null, nor do we need to preserve nil. So
|
||||
// convert to empty string for robustness. This invariant is documented in
|
||||
// redispool.DBStore and enforced by tests.
|
||||
if value == nil {
|
||||
value = []byte{}
|
||||
}
|
||||
|
||||
q := sqlf.Sprintf(`
|
||||
INSERT INTO redis_key_value (namespace, key, value)
|
||||
VALUES (%s, %s, %s)
|
||||
ON CONFLICT (namespace, key) DO UPDATE SET value = EXCLUDED.value
|
||||
`, namespace, key, value)
|
||||
return s.Exec(ctx, q)
|
||||
}
|
||||
|
||||
func (s *redisKeyValueStore) Delete(ctx context.Context, namespace, key string) error {
|
||||
q := sqlf.Sprintf(`
|
||||
DELETE FROM redis_key_value
|
||||
WHERE namespace = %s AND key = %s
|
||||
`, namespace, key)
|
||||
return s.Exec(ctx, q)
|
||||
}
|
||||
64
internal/database/redis_key_value_test.go
Normal file
64
internal/database/redis_key_value_test.go
Normal file
@ -0,0 +1,64 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/sourcegraph/log/logtest"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRedisKeyValue(t *testing.T) {
|
||||
require := require.New(t)
|
||||
logger := logtest.Scoped(t)
|
||||
db := NewDB(logger, dbtest.NewDB(logger, t))
|
||||
ctx := context.Background()
|
||||
kv := db.RedisKeyValue()
|
||||
|
||||
// Two basic helpers to reduce the noise of get testing
|
||||
requireMissing := func(namespace, key string) {
|
||||
t.Helper()
|
||||
_, ok, err := kv.Get(ctx, namespace, key)
|
||||
require.NoError(err)
|
||||
require.False(ok)
|
||||
}
|
||||
requireValue := func(namespace, key, value string) {
|
||||
want := []byte(value)
|
||||
t.Helper()
|
||||
v, ok, err := kv.Get(ctx, namespace, key)
|
||||
require.NoError(err)
|
||||
require.True(ok)
|
||||
require.Equal(want, v)
|
||||
}
|
||||
|
||||
// Basic testing. We heavily rely on the integration test in redispool to
|
||||
// properly exercise the store.
|
||||
|
||||
// get on missing, set, then get works
|
||||
requireMissing("namespace", "key")
|
||||
require.NoError(kv.Set(ctx, "namespace", "key", []byte("value")))
|
||||
requireValue("namespace", "key", "value")
|
||||
|
||||
// set on existing key updates it
|
||||
require.NoError(kv.Set(ctx, "namespace", "key", []byte("horsegraph")))
|
||||
requireValue("namespace", "key", "horsegraph")
|
||||
|
||||
// delete makes the following get missing
|
||||
require.NoError(kv.Delete(ctx, "namespace", "key"))
|
||||
requireMissing("namespace", "key")
|
||||
|
||||
// deleting a key that doesn't exist doesn't fail
|
||||
require.NoError(kv.Delete(ctx, "namespace", "missing"))
|
||||
|
||||
// test binary data
|
||||
binary := string([]byte{0, 1, 0}) // use string to ensure we don't mutate in Set.
|
||||
require.NoError(kv.Set(ctx, "namespace", "binary", []byte(binary)))
|
||||
requireValue("namespace", "binary", binary)
|
||||
|
||||
// nil should be treated like an empty slice
|
||||
require.NoError(kv.Set(ctx, "namespace", "nil", nil))
|
||||
require.NoError(kv.Set(ctx, "namespace", "empty", []byte{}))
|
||||
requireValue("namespace", "nil", "")
|
||||
requireValue("namespace", "empty", "")
|
||||
}
|
||||
@ -18118,6 +18118,65 @@
|
||||
"Constraints": null,
|
||||
"Triggers": []
|
||||
},
|
||||
{
|
||||
"Name": "redis_key_value",
|
||||
"Comment": "",
|
||||
"Columns": [
|
||||
{
|
||||
"Name": "key",
|
||||
"Index": 2,
|
||||
"TypeName": "text",
|
||||
"IsNullable": false,
|
||||
"Default": "",
|
||||
"CharacterMaximumLength": 0,
|
||||
"IsIdentity": false,
|
||||
"IdentityGeneration": "",
|
||||
"IsGenerated": "NEVER",
|
||||
"GenerationExpression": "",
|
||||
"Comment": ""
|
||||
},
|
||||
{
|
||||
"Name": "namespace",
|
||||
"Index": 1,
|
||||
"TypeName": "text",
|
||||
"IsNullable": false,
|
||||
"Default": "",
|
||||
"CharacterMaximumLength": 0,
|
||||
"IsIdentity": false,
|
||||
"IdentityGeneration": "",
|
||||
"IsGenerated": "NEVER",
|
||||
"GenerationExpression": "",
|
||||
"Comment": ""
|
||||
},
|
||||
{
|
||||
"Name": "value",
|
||||
"Index": 3,
|
||||
"TypeName": "bytea",
|
||||
"IsNullable": false,
|
||||
"Default": "",
|
||||
"CharacterMaximumLength": 0,
|
||||
"IsIdentity": false,
|
||||
"IdentityGeneration": "",
|
||||
"IsGenerated": "NEVER",
|
||||
"GenerationExpression": "",
|
||||
"Comment": ""
|
||||
}
|
||||
],
|
||||
"Indexes": [
|
||||
{
|
||||
"Name": "redis_key_value_pkey",
|
||||
"IsPrimaryKey": true,
|
||||
"IsUnique": true,
|
||||
"IsExclusion": false,
|
||||
"IsDeferrable": false,
|
||||
"IndexDefinition": "CREATE UNIQUE INDEX redis_key_value_pkey ON redis_key_value USING btree (namespace, key) INCLUDE (value)",
|
||||
"ConstraintType": "p",
|
||||
"ConstraintDefinition": "PRIMARY KEY (namespace, key) INCLUDE (value)"
|
||||
}
|
||||
],
|
||||
"Constraints": null,
|
||||
"Triggers": []
|
||||
},
|
||||
{
|
||||
"Name": "registry_extension_releases",
|
||||
"Comment": "",
|
||||
|
||||
@ -2793,6 +2793,18 @@ Referenced by:
|
||||
|
||||
```
|
||||
|
||||
# Table "public.redis_key_value"
|
||||
```
|
||||
Column | Type | Collation | Nullable | Default
|
||||
-----------+-------+-----------+----------+---------
|
||||
namespace | text | | not null |
|
||||
key | text | | not null |
|
||||
value | bytea | | not null |
|
||||
Indexes:
|
||||
"redis_key_value_pkey" PRIMARY KEY, btree (namespace, key) INCLUDE (value)
|
||||
|
||||
```
|
||||
|
||||
# Table "public.registry_extension_releases"
|
||||
```
|
||||
Column | Type | Collation | Nullable | Default
|
||||
|
||||
105
internal/redispool/db.go
Normal file
105
internal/redispool/db.go
Normal file
@ -0,0 +1,105 @@
|
||||
package redispool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
)
|
||||
|
||||
// DBStore is the methods needed by DBKeyValue to implement the core of
|
||||
// KeyValue. See database.RedisKeyValueStore for the implementation of this
|
||||
// interface.
|
||||
//
|
||||
// We do not directly import that interface since that introduces
|
||||
// complications around dependency graphs.
|
||||
//
|
||||
// Note: DBKeyValue uses a coarse global mutex for all transactions on-top of
|
||||
// whatever transaction DBStoreTransact provides. The intention of these
|
||||
// interfaces is to be used in a single process application (like Sourcegraph
|
||||
// App). We would need to change the design of NaiveKeyValueStore to allow for
|
||||
// retries to smoothly avoid global mutexes.
|
||||
type DBStore interface {
|
||||
// Get returns the value for (namespace, key). ok is false if the
|
||||
// (namespace, key) has not been set.
|
||||
//
|
||||
// Note: We recommend using "SELECT ... FOR UPDATE" since this call is
|
||||
// often followed by Set in the same transaction.
|
||||
Get(ctx context.Context, namespace, key string) (value []byte, ok bool, err error)
|
||||
// Set will upsert value for (namespace, key). If value is nil it should
|
||||
// be persisted as an empty byte slice.
|
||||
Set(ctx context.Context, namespace, key string, value []byte) (err error)
|
||||
// Delete will remove (namespace, key). If (namespace, key) is not in the
|
||||
// store, the delete is a noop.
|
||||
Delete(ctx context.Context, namespace, key string) (err error)
|
||||
}
|
||||
|
||||
// DBStoreTransact is a function which is like the WithTransact which will run
|
||||
// f inside of a transaction. f is a function which will read/update a
|
||||
// DBStore.
|
||||
type DBStoreTransact func(ctx context.Context, f func(DBStore) error) error
|
||||
|
||||
var dbStoreTransact atomic.Value
|
||||
|
||||
// DBRegisterStore registers our database with the redispool package. Until
|
||||
// this is called all KeyValue operations against a DB backed KeyValue will
|
||||
// fail with an error. As such this function should be called early on (as
|
||||
// soon as we have a useable DB connection).
|
||||
//
|
||||
// An error will be returned if this function is called more than once.
|
||||
func DBRegisterStore(transact DBStoreTransact) error {
|
||||
ok := dbStoreTransact.CompareAndSwap(nil, transact)
|
||||
if !ok {
|
||||
return errors.New("redispool.DBRegisterStore has already been called")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// dbMu protects _all_ possible interactions with the database in DBKeyValue.
|
||||
// This is to avoid concurrent get/sets on the same key resulting in one of
|
||||
// the sets failing due to serializability.
|
||||
var dbMu sync.Mutex
|
||||
|
||||
// DBKeyValue returns a KeyValue with namespace. Namespaces allow us to have
|
||||
// distinct KeyValue stores, but still use the same underlying DBStore
|
||||
// storage.
|
||||
//
|
||||
// Note: This is designed for use in a single process application like
|
||||
// Sourcegraph App. All transactions are additionally protected by a global
|
||||
// mutex to avoid the need to handle database serializability errors.
|
||||
func DBKeyValue(namespace string) KeyValue {
|
||||
store := func(ctx context.Context, key string, f NaiveUpdater) error {
|
||||
dbMu.Lock()
|
||||
defer dbMu.Unlock()
|
||||
|
||||
transact := dbStoreTransact.Load()
|
||||
if transact == nil {
|
||||
return errors.New("redispool.DBRegisterStore has not been called")
|
||||
}
|
||||
|
||||
return transact.(DBStoreTransact)(ctx, func(store DBStore) error {
|
||||
beforeStr, found, err := store.Get(ctx, namespace, key)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "redispool.DBKeyValue failed to get %q in namespace %q", key, namespace)
|
||||
}
|
||||
|
||||
before := NaiveValue(beforeStr)
|
||||
after, remove := f(before, found)
|
||||
if remove {
|
||||
if found {
|
||||
if err := store.Delete(ctx, namespace, key); err != nil {
|
||||
return errors.Wrapf(err, "redispool.DBKeyValue failed to delete %q in namespace %q", key, namespace)
|
||||
}
|
||||
}
|
||||
} else if before != after {
|
||||
if err := store.Set(ctx, namespace, key, []byte(after)); err != nil {
|
||||
return errors.Wrapf(err, "redispool.DBKeyValue failed to set %q in namespace %q", key, namespace)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return FromNaiveKeyValueStore(store)
|
||||
}
|
||||
@ -2,6 +2,7 @@ package redispool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
@ -105,11 +106,25 @@ type redisKeyValue struct {
|
||||
prefix string
|
||||
}
|
||||
|
||||
// MemoryKeyValue is the special URI which is recognized by NewKeyValue to
|
||||
// create an in memory key value.
|
||||
const MemoryKeyValueURI = "redis+memory:memory"
|
||||
|
||||
const dbKeyValueURIScheme = "redis+postgres"
|
||||
|
||||
// DBKeyValueURI returns a URI to connect to the DB backed redis with the
|
||||
// specified namespace.
|
||||
func DBKeyValueURI(namespace string) string {
|
||||
return dbKeyValueURIScheme + ":" + namespace
|
||||
}
|
||||
|
||||
// NewKeyValue returns a KeyValue for addr. addr is treated as follows:
|
||||
//
|
||||
// 1. if addr == MemoryKeyValueURI we use a KeyValue that lives
|
||||
// in memory of the current process.
|
||||
// 2. otherwise treat as a redis address.
|
||||
// 2. if addr was created by DBKeyValueURI we use a KeyValue that is backed
|
||||
// by postgres.
|
||||
// 3. otherwise treat as a redis address.
|
||||
//
|
||||
// poolOpts is a required argument which sets defaults in the case we connect
|
||||
// to redis. If used we only override TestOnBorrow and Dial.
|
||||
@ -117,6 +132,11 @@ func NewKeyValue(addr string, poolOpts *redis.Pool) KeyValue {
|
||||
if addr == MemoryKeyValueURI {
|
||||
return MemoryKeyValue()
|
||||
}
|
||||
|
||||
if schema, namespace, ok := strings.Cut(addr, ":"); ok && schema == dbKeyValueURIScheme {
|
||||
return DBKeyValue(namespace)
|
||||
}
|
||||
|
||||
poolOpts.TestOnBorrow = func(c redis.Conn, t time.Time) error {
|
||||
_, err := c.Do("PING")
|
||||
return err
|
||||
|
||||
@ -5,10 +5,6 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// MemoryKeyValue is the special URI which is recognized by NewKeyValue to
|
||||
// create an in memory key value.
|
||||
const MemoryKeyValueURI = "keyvalue:memory"
|
||||
|
||||
// MemoryKeyValue returns an in memory KeyValue.
|
||||
func MemoryKeyValue() KeyValue {
|
||||
var mu sync.Mutex
|
||||
|
||||
@ -1,11 +1,68 @@
|
||||
package redispool_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/sourcegraph/log/logtest"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
|
||||
"github.com/sourcegraph/sourcegraph/internal/redispool"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
)
|
||||
|
||||
func TestInMemoryKeyValue(t *testing.T) {
|
||||
testKeyValue(t, redispool.MemoryKeyValue())
|
||||
}
|
||||
|
||||
func TestDBKeyValue(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping DB test since -short is specified")
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
require := require{TB: t}
|
||||
db := redispool.DBKeyValue("test")
|
||||
|
||||
require.Equal(db.Get("db_test"), errors.New("redispool.DBRegisterStore has not been called"))
|
||||
|
||||
// Now register and check if db starts to work
|
||||
if err := redispool.DBRegisterStore(dbStoreTransact(t)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Run("integration", func(t *testing.T) {
|
||||
testKeyValue(t, db)
|
||||
})
|
||||
|
||||
// Ensure we can't register twice
|
||||
if err := redispool.DBRegisterStore(dbStoreTransact(t)); err == nil {
|
||||
t.Fatal("expected second call to DBRegisterStore to fail")
|
||||
}
|
||||
if err := redispool.DBRegisterStore(nil); err == nil {
|
||||
t.Fatal("expected third call to DBRegisterStore to fail")
|
||||
}
|
||||
// Ensure we are still working
|
||||
require.Equal(db.Get("db_test"), redis.ErrNil)
|
||||
|
||||
// Check that namespacing works. Intentionally use same namespace as db
|
||||
// for db1.
|
||||
db1 := redispool.DBKeyValue("test")
|
||||
db2 := redispool.DBKeyValue("test2")
|
||||
require.Works(db1.Set("db_test", "1"))
|
||||
require.Works(db2.Set("db_test", "2"))
|
||||
require.Equal(db1.Get("db_test"), "1")
|
||||
require.Equal(db2.Get("db_test"), "2")
|
||||
}
|
||||
|
||||
func dbStoreTransact(t *testing.T) redispool.DBStoreTransact {
|
||||
logger := logtest.Scoped(t)
|
||||
kvNoTX := database.NewDB(logger, dbtest.NewDB(logger, t)).RedisKeyValue()
|
||||
|
||||
return func(ctx context.Context, f func(redispool.DBStore) error) error {
|
||||
return kvNoTX.WithTransact(ctx, func(tx database.RedisKeyValueStore) error {
|
||||
return f(tx)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ var addresses = func() struct {
|
||||
for _, addr := range []string{
|
||||
env.Get("REDIS_STORE_ENDPOINT", "", "redis used for persistent stores (eg HTTP sessions). Default redis-store:6379"),
|
||||
fallback,
|
||||
maybe(deploy.IsDeployTypeSingleProgram(deployType), MemoryKeyValueURI),
|
||||
maybe(deploy.IsDeployTypeSingleProgram(deployType), DBKeyValueURI("store")),
|
||||
"redis-store:6379",
|
||||
} {
|
||||
if addr != "" {
|
||||
|
||||
@ -1,6 +1,12 @@
|
||||
package redispool
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/sourcegraph/log/logtest"
|
||||
)
|
||||
|
||||
func TestSchemeMatcher(t *testing.T) {
|
||||
tests := []struct {
|
||||
@ -20,3 +26,9 @@ func TestSchemeMatcher(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
logtest.Init(m)
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
3
migrations/frontend/1675257827_redis_key_value/down.sql
Normal file
3
migrations/frontend/1675257827_redis_key_value/down.sql
Normal file
@ -0,0 +1,3 @@
|
||||
-- Undo the changes made in the up migration
|
||||
|
||||
DROP TABLE IF EXISTS redis_key_value;
|
||||
@ -0,0 +1,2 @@
|
||||
name: redis_key_value
|
||||
parents: [1675155867]
|
||||
8
migrations/frontend/1675257827_redis_key_value/up.sql
Normal file
8
migrations/frontend/1675257827_redis_key_value/up.sql
Normal file
@ -0,0 +1,8 @@
|
||||
-- Perform migration here.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS redis_key_value (
|
||||
namespace TEXT NOT NULL,
|
||||
key TEXT NOT NULL,
|
||||
value BYTEA NOT NULL,
|
||||
PRIMARY KEY (namespace, key) INCLUDE (value)
|
||||
);
|
||||
@ -3497,6 +3497,12 @@ CREATE VIEW reconciler_changesets AS
|
||||
LEFT JOIN orgs namespace_org ON ((batch_changes.namespace_org_id = namespace_org.id)))
|
||||
WHERE ((c.batch_change_ids ? (batch_changes.id)::text) AND (namespace_user.deleted_at IS NULL) AND (namespace_org.deleted_at IS NULL)))));
|
||||
|
||||
CREATE TABLE redis_key_value (
|
||||
namespace text NOT NULL,
|
||||
key text NOT NULL,
|
||||
value bytea NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE registry_extension_releases (
|
||||
id bigint NOT NULL,
|
||||
registry_extension_id integer NOT NULL,
|
||||
@ -4551,6 +4557,9 @@ ALTER TABLE ONLY product_licenses
|
||||
ALTER TABLE ONLY product_subscriptions
|
||||
ADD CONSTRAINT product_subscriptions_pkey PRIMARY KEY (id);
|
||||
|
||||
ALTER TABLE ONLY redis_key_value
|
||||
ADD CONSTRAINT redis_key_value_pkey PRIMARY KEY (namespace, key) INCLUDE (value);
|
||||
|
||||
ALTER TABLE ONLY registry_extension_releases
|
||||
ADD CONSTRAINT registry_extension_releases_pkey PRIMARY KEY (id);
|
||||
|
||||
|
||||
@ -827,7 +827,7 @@ commands:
|
||||
sourcegraph:
|
||||
description: Single program (Go static binary) distribution
|
||||
cmd: |
|
||||
unset SRC_GIT_SERVERS INDEXED_SEARCH_SERVERS
|
||||
unset SRC_GIT_SERVERS INDEXED_SEARCH_SERVERS REDIS_ENDPOINT
|
||||
|
||||
# TODO: This should be fixed
|
||||
export SOURCEGRAPH_LICENSE_GENERATION_KEY=$(cat ../dev-private/enterprise/dev/test-license-generation-key.pem)
|
||||
@ -1131,8 +1131,6 @@ commandsets:
|
||||
requiresDevPrivate: true
|
||||
checks:
|
||||
- docker
|
||||
# TODO: App's dependency on Redis will be removed.
|
||||
- redis
|
||||
- postgres
|
||||
- git
|
||||
commands:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user