conf: only allow updates to store via client (#39105)

conf.client is typically started with a goroutine that polls the frontend API for configuration updates. If an update is detected, conf.store is updated with a cached value of the new configuration. Consumers of configuration get values from conf.store instead of the frontend API

In the frontend itself, however, we start a conf.Server and a conf.client to be backed by a ConfigurationSource, and both poll this ConfigurationSource for updates. Both hold references to the same conf.store, and both write the updates they receive from ConfigurationSource to the same conf.store. The key difference is that conf.client will notify subscribers of conf.Watch, and conf.Server will not. This leads to the following scenario:

1. Edit the site config in the UI
2. GraphQL API calls conf.Server.Write
3. conf.Server has a channel internally that immediately procs its source poller, which updates conf.store with the new configuration
4. conf.client polls ConfigurationSource and gets the new update. However, it then compares it to conf.store, which has the new value from conf.Server, and detects no diff, causing watchers to not get notified.

This fix:

- removes conf.Server's polling routine entirely
- removes conf.Server's access to conf.store entirely
- to preserve the fast-update behaviour (docstrings indicate this is important for tests), conf.client can now have a channel set, sourceUpdates, that also procs its polling if non-nil
- adds some more robust logging about conf.client updates

Co-authored-by: Keegan Carruthers-Smith <keegan.csmith@gmail.com>
This commit is contained in:
Robert Lin 2022-07-20 09:25:52 -07:00 committed by GitHub
parent e382e74e0f
commit 4a51796fc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 479 additions and 143 deletions

View File

@ -140,7 +140,7 @@ func (r *siteConfigurationResolver) EffectiveContents(ctx context.Context) (JSON
if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.db); err != nil {
return "", err
}
siteConfig, err := conf.RedactSecrets(globals.ConfigurationServerFrontendOnly.Raw())
siteConfig, err := conf.RedactSecrets(conf.Raw())
return JSONCString(siteConfig.Site), err
}
@ -174,7 +174,7 @@ func (r *schemaResolver) UpdateSiteConfiguration(ctx context.Context, args *stru
return false, errors.Errorf("site configuration is invalid: %s", strings.Join(problems, ","))
}
prev := globals.ConfigurationServerFrontendOnly.Raw()
prev := conf.Raw()
unredacted, err := conf.UnredactSecrets(args.Input, prev)
if err != nil {
return false, errors.Errorf("error unredacting secrets: %s", err)

View File

@ -11,7 +11,6 @@ import (
"github.com/inconshreveable/log15"
"github.com/sourcegraph/sourcegraph/cmd/frontend/backend"
"github.com/sourcegraph/sourcegraph/cmd/frontend/globals"
"github.com/sourcegraph/sourcegraph/cmd/frontend/internal/app/updatecheck"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/conf"
@ -127,7 +126,7 @@ func init() {
return nil
}
problems, err := conf.Validate(globals.ConfigurationServerFrontendOnly.Raw())
problems, err := conf.Validate(conf.Raw())
if err != nil {
return []*Alert{
{

View File

@ -17,7 +17,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sourcegraph/sourcegraph/cmd/frontend/globals"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
@ -33,7 +32,7 @@ import (
)
func printConfigValidation() {
messages, err := conf.Validate(globals.ConfigurationServerFrontendOnly.Raw())
messages, err := conf.Validate(conf.Raw())
if err != nil {
log.Printf("Warning: Unable to validate Sourcegraph site configuration: %s", err)
return

View File

@ -15,6 +15,7 @@ import (
"github.com/sourcegraph/sourcegraph/cmd/frontend/backend"
"github.com/sourcegraph/sourcegraph/cmd/frontend/globals"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/gitserver"
"github.com/sourcegraph/sourcegraph/internal/jsonc"
@ -143,12 +144,9 @@ func serveExternalServicesList(db database.DB) func(w http.ResponseWriter, r *ht
}
}
func serveConfiguration(w http.ResponseWriter, r *http.Request) error {
raw, err := globals.ConfigurationServerFrontendOnly.Source.Read(r.Context())
if err != nil {
return err
}
err = json.NewEncoder(w).Encode(raw)
func serveConfiguration(w http.ResponseWriter, _ *http.Request) error {
raw := conf.Raw()
err := json.NewEncoder(w).Encode(raw)
if err != nil {
return errors.Wrap(err, "Encode")
}

View File

@ -84,8 +84,6 @@ func main() {
env.Lock()
env.HandleHelpFlag()
conf.Init()
logging.Init()
liblog := log.Init(log.Resource{
@ -94,6 +92,8 @@ func main() {
InstanceID: hostname.Get(),
}, log.NewSentrySinkWithOptions(sentrylib.ClientOptions{SampleRate: 0.2})) // Experimental: DevX is observing how sampling affects the errors signal
defer liblog.Sync()
conf.Init()
go conf.Watch(liblog.Update(conf.GetLogSinks))
tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient())

View File

@ -77,14 +77,14 @@ func Main(enterpriseInit EnterpriseInit) {
env.Lock()
env.HandleHelpFlag()
conf.Init()
liblog := log.Init(log.Resource{
Name: env.MyName,
Version: version.Version(),
InstanceID: hostname.Get(),
}, log.NewSentrySinkWithOptions(sentrylib.ClientOptions{SampleRate: 0.2})) // Experimental: DevX is observing how sampling affects the errors signal
defer liblog.Sync()
conf.Init()
go conf.Watch(liblog.Update(conf.GetLogSinks))
tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient())

View File

@ -228,15 +228,15 @@ func main() {
env.Lock()
env.HandleHelpFlag()
stdlog.SetFlags(0)
conf.Init()
logging.Init()
liblog := log.Init(log.Resource{
Name: env.MyName,
Version: version.Version(),
InstanceID: hostname.Get(),
}, log.NewSentrySinkWithOptions(sentrylib.ClientOptions{SampleRate: 0.2})) // Experimental: DevX is observing how sampling affects the errors signal
defer liblog.Sync()
conf.Init()
go conf.Watch(liblog.Update(conf.GetLogSinks))
tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient())
trace.Init()

View File

@ -39,15 +39,15 @@ type SetupFunc func(observationContext *observation.Context, gitserverClient git
func Main(setup SetupFunc) {
// Initialization
env.HandleHelpFlag()
conf.Init()
logging.Init()
liblog := log.Init(log.Resource{
Name: env.MyName,
Version: version.Version(),
InstanceID: hostname.Get(),
}, log.NewSentrySinkWithOptions(sentrylib.ClientOptions{SampleRate: 0.2})) // Experimental: DevX is observing how sampling affects the errors signal
defer liblog.Sync()
conf.Init()
go conf.Watch(liblog.Update(conf.GetLogSinks))
tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient())
trace.Init()

View File

@ -51,7 +51,6 @@ func main() {
env.Lock()
env.HandleHelpFlag()
conf.Init()
logging.Init()
liblog := log.Init(log.Resource{
Name: env.MyName,
@ -59,6 +58,8 @@ func main() {
InstanceID: hostname.Get(),
})
defer liblog.Sync()
conf.Init()
go conf.Watch(liblog.Update(conf.GetLogSinks))
tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient())
trace.Init()

View File

@ -2,13 +2,14 @@ package conf
import (
"context"
"log"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/api/internalapi"
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
"github.com/sourcegraph/sourcegraph/lib/errors"
@ -20,6 +21,12 @@ type client struct {
passthrough ConfigurationSource
watchersMu sync.Mutex
watchers []chan struct{}
// sourceUpdates receives events that indicate the configuration source has been
// updated. It should prompt the client to update the store, and the received channel
// should be closed when future queries to the client returns the most up to date
// configuration.
sourceUpdates <-chan chan struct{}
}
var _ conftypes.UnifiedQuerier = &client{}
@ -175,7 +182,7 @@ func (c *client) Cached(f func() any) (wrapped func() any) {
}
// notifyWatchers runs all the callbacks registered via client.Watch() whenever
// the configuration has changed.
// the configuration has changed. It does not block on individual sends.
func (c *client) notifyWatchers() {
c.watchersMu.Lock()
defer c.watchersMu.Unlock()
@ -199,8 +206,8 @@ type continuousUpdateOptions struct {
// contact the frontend for configuration) start up before the frontend.
delayBeforeUnreachableLog time.Duration
log func(format string, v ...any) // log.Printf equivalent
sleep func() // sleep between updates
logger log.Logger
sleepBetweenUpdates func() // sleep between updates
}
// continuouslyUpdate runs (*client).fetchAndUpdate in an infinite loop, with error logging and
@ -209,16 +216,16 @@ type continuousUpdateOptions struct {
// The optOnlySetByTests parameter is ONLY customized by tests. All callers in main code should pass
// nil (so that the same defaults are used).
func (c *client) continuouslyUpdate(optOnlySetByTests *continuousUpdateOptions) {
opt := optOnlySetByTests
if opt == nil {
opts := optOnlySetByTests
if opts == nil {
// Apply defaults.
opt = &continuousUpdateOptions{
opts = &continuousUpdateOptions{
// This needs to be long enough to allow the frontend to fully migrate the PostgreSQL
// database in most cases, to avoid log spam when running sourcegraph/server for the
// first time.
delayBeforeUnreachableLog: 15 * time.Second,
log: log.Printf,
sleep: func() {
logger: log.Scoped("conf.client", "configuration client"),
sleepBetweenUpdates: func() {
jitter := time.Duration(rand.Int63n(5 * int64(time.Second)))
time.Sleep(jitter)
},
@ -230,15 +237,43 @@ func (c *client) continuouslyUpdate(optOnlySetByTests *continuousUpdateOptions)
return errors.As(err, &e) && e.Op == "dial"
}
waitForSleep := func() <-chan struct{} {
c := make(chan struct{}, 1)
go func() {
opts.sleepBetweenUpdates()
close(c)
}()
return c
}
// Make an initial fetch an update - this is likely to error, so just discard the
// error on this initial attempt.
_ = c.fetchAndUpdate(opts.logger)
start := time.Now()
for {
err := c.fetchAndUpdate()
logger := opts.logger
// signalDoneReading, if set, indicates that we were prompted to update because
// the source has been updated.
var signalDoneReading chan struct{}
select {
case signalDoneReading = <-c.sourceUpdates:
// Config was changed at source, so let's check now
logger = logger.With(log.String("triggered_by", "sourceUpdates"))
case <-waitForSleep():
// File possibly changed at source, so check now.
logger = logger.With(log.String("triggered_by", "waitForSleep"))
}
logger.Debug("checking for updates")
err := c.fetchAndUpdate(logger)
if err != nil {
// Suppress log messages for errors caused by the frontend being unreachable until we've
// given the frontend enough time to initialize (in case other services start up before
// the frontend), to reduce log spam.
if time.Since(start) > opt.delayBeforeUnreachableLog || !isFrontendUnreachableError(err) {
opt.log("received error during background config update, err: %s", err)
if time.Since(start) > opts.delayBeforeUnreachableLog || !isFrontendUnreachableError(err) {
logger.Error("received error during background config update", log.Error(err))
}
} else {
// We successfully fetched the config, we reset the timer to give
@ -246,13 +281,17 @@ func (c *client) continuouslyUpdate(optOnlySetByTests *continuousUpdateOptions)
start = time.Now()
}
opt.sleep()
// Indicate that we are done reading, if we were prompted to update by the updates
// channel
if signalDoneReading != nil {
close(signalDoneReading)
}
}
}
func (c *client) fetchAndUpdate() error {
ctx := context.Background()
func (c *client) fetchAndUpdate(logger log.Logger) error {
var (
ctx = context.Background()
newConfig conftypes.RawUnified
err error
)
@ -271,7 +310,11 @@ func (c *client) fetchAndUpdate() error {
}
if configChange.Changed {
logger.Info("config changed, notifying watchers",
log.Int("watchers", len(c.watchers)))
c.notifyWatchers()
} else {
logger.Debug("no config changes detected")
}
return nil

View File

@ -1,20 +1,23 @@
package conf
import (
"fmt"
"net"
"net/url"
"runtime"
"strings"
"testing"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/log/logtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/internal/api/internalapi"
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func TestClient_continuouslyUpdate(t *testing.T) {
func TestClientContinuouslyUpdate(t *testing.T) {
t.Run("suppresses errors due to temporarily unreachable frontend", func(t *testing.T) {
internalapi.MockClientConfiguration = func() (conftypes.RawUnified, error) {
return conftypes.RawUnified{}, &url.Error{
@ -29,27 +32,26 @@ func TestClient_continuouslyUpdate(t *testing.T) {
defer func() { internalapi.MockClientConfiguration = nil }()
var client client
var logMessages []string
logger, exportLogs := logtest.Captured(t)
done := make(chan struct{})
sleeps := 0
const delayBeforeUnreachableLog = 150 * time.Millisecond // assumes first loop iter executes within this time period
go client.continuouslyUpdate(&continuousUpdateOptions{
delayBeforeUnreachableLog: delayBeforeUnreachableLog,
log: func(format string, v ...any) {
logMessages = append(logMessages, fmt.Sprintf(format, v...))
},
sleep: func() {
logger: logger,
sleepBetweenUpdates: func() {
logMessages := exportLogs()
switch sleeps {
case 0:
if len(logMessages) > 0 {
t.Errorf("got log messages (below), want no log before delayBeforeUnreachableLog\n\n%s", strings.Join(logMessages, "\n"))
for _, message := range logMessages {
require.NotEqual(t, message.Level, log.LevelError, "expected no error messages before delayBeforeUnreachableLog")
}
time.Sleep(delayBeforeUnreachableLog)
sleeps++
time.Sleep(delayBeforeUnreachableLog)
case 1:
if len(logMessages) != 1 {
t.Errorf("got %d log messages, want exactly 1 log after delayBeforeUnreachableLog", len(logMessages))
}
require.Len(t, logMessages, 2)
assert.Contains(t, logMessages[0].Message, "checking")
assert.Contains(t, logMessages[1].Message, "received error")
// Exit goroutine after this test is done.
close(done)
@ -59,4 +61,55 @@ func TestClient_continuouslyUpdate(t *testing.T) {
})
<-done
})
t.Run("watchers are called on update", func(t *testing.T) {
updates := make(chan chan struct{})
mockSource := NewMockConfigurationSource()
client := &client{
store: newStore(),
passthrough: mockSource,
sourceUpdates: updates,
}
client.store.initialize()
mockSource.ReadFunc.PushReturn(conftypes.RawUnified{
Site: ``,
}, nil)
mockSource.ReadFunc.PushReturn(conftypes.RawUnified{
Site: `{"log":{}}`,
}, nil)
mockSource.ReadFunc.PushReturn(conftypes.RawUnified{
Site: `{}`,
}, nil)
done := make(chan struct{})
go client.continuouslyUpdate(&continuousUpdateOptions{
delayBeforeUnreachableLog: 0,
logger: logtest.Scoped(t),
// sleepBetweenUpdates never returns - this behaviour is tested above in the
// other test
sleepBetweenUpdates: func() {
<-done
runtime.Goexit()
},
})
called := make(chan string, 1)
client.Watch(func() {
called <- client.Raw().Site
})
assert.Equal(t, ``, <-called) // watch makes initial call with initial conf
update := make(chan struct{})
updates <- update
<-update
assert.Equal(t, `{"log":{}}`, <-called)
update2 := make(chan struct{})
updates <- update2
<-update2
assert.Equal(t, `{}`, <-called)
close(done)
})
}

View File

@ -89,8 +89,7 @@ func getMode() configurationMode {
var configurationServerFrontendOnlyInitialized = make(chan struct{})
func initDefaultClient() *client {
clientStore := defaultStore
defaultClient := &client{store: clientStore}
defaultClient := &client{store: newStore()}
mode := getMode()
// Don't kickoff the background updaters for the client/server
@ -173,6 +172,10 @@ func InitConfigurationServerFrontendOnly(source ConfigurationSource) *Server {
// and instead only relies on the DB.
DefaultClient().passthrough = source
// Notify the default client of updates to the source to ensure updates
// propagate quickly.
DefaultClient().sourceUpdates = server.sourceWrites
go DefaultClient().continuouslyUpdate(nil)
close(configurationServerFrontendOnlyInitialized)

286
internal/conf/mocks_test.go Normal file
View File

@ -0,0 +1,286 @@
// Code generated by go-mockgen 1.3.3; DO NOT EDIT.
//
// This file was generated by running `sg generate` (or `go-mockgen`) at the root of
// this repository. To add additional mocks to this or another package, add a new entry
// to the mockgen.yaml file in the root of this repository.
package conf
import (
"context"
"sync"
conftypes "github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
)
// MockConfigurationSource is a mock implementation of the
// ConfigurationSource interface (from the package
// github.com/sourcegraph/sourcegraph/internal/conf) used for unit testing.
type MockConfigurationSource struct {
// ReadFunc is an instance of a mock function object controlling the
// behavior of the method Read.
ReadFunc *ConfigurationSourceReadFunc
// WriteFunc is an instance of a mock function object controlling the
// behavior of the method Write.
WriteFunc *ConfigurationSourceWriteFunc
}
// NewMockConfigurationSource creates a new mock of the ConfigurationSource
// interface. All methods return zero values for all results, unless
// overwritten.
func NewMockConfigurationSource() *MockConfigurationSource {
return &MockConfigurationSource{
ReadFunc: &ConfigurationSourceReadFunc{
defaultHook: func(context.Context) (r0 conftypes.RawUnified, r1 error) {
return
},
},
WriteFunc: &ConfigurationSourceWriteFunc{
defaultHook: func(context.Context, conftypes.RawUnified) (r0 error) {
return
},
},
}
}
// NewStrictMockConfigurationSource creates a new mock of the
// ConfigurationSource interface. All methods panic on invocation, unless
// overwritten.
func NewStrictMockConfigurationSource() *MockConfigurationSource {
return &MockConfigurationSource{
ReadFunc: &ConfigurationSourceReadFunc{
defaultHook: func(context.Context) (conftypes.RawUnified, error) {
panic("unexpected invocation of MockConfigurationSource.Read")
},
},
WriteFunc: &ConfigurationSourceWriteFunc{
defaultHook: func(context.Context, conftypes.RawUnified) error {
panic("unexpected invocation of MockConfigurationSource.Write")
},
},
}
}
// NewMockConfigurationSourceFrom creates a new mock of the
// MockConfigurationSource interface. All methods delegate to the given
// implementation, unless overwritten.
func NewMockConfigurationSourceFrom(i ConfigurationSource) *MockConfigurationSource {
return &MockConfigurationSource{
ReadFunc: &ConfigurationSourceReadFunc{
defaultHook: i.Read,
},
WriteFunc: &ConfigurationSourceWriteFunc{
defaultHook: i.Write,
},
}
}
// ConfigurationSourceReadFunc describes the behavior when the Read method
// of the parent MockConfigurationSource instance is invoked.
type ConfigurationSourceReadFunc struct {
defaultHook func(context.Context) (conftypes.RawUnified, error)
hooks []func(context.Context) (conftypes.RawUnified, error)
history []ConfigurationSourceReadFuncCall
mutex sync.Mutex
}
// Read delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockConfigurationSource) Read(v0 context.Context) (conftypes.RawUnified, error) {
r0, r1 := m.ReadFunc.nextHook()(v0)
m.ReadFunc.appendCall(ConfigurationSourceReadFuncCall{v0, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the Read method of the
// parent MockConfigurationSource instance is invoked and the hook queue is
// empty.
func (f *ConfigurationSourceReadFunc) SetDefaultHook(hook func(context.Context) (conftypes.RawUnified, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Read method of the parent MockConfigurationSource instance invokes the
// hook at the front of the queue and discards it. After the queue is empty,
// the default hook function is invoked for any future action.
func (f *ConfigurationSourceReadFunc) PushHook(hook func(context.Context) (conftypes.RawUnified, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *ConfigurationSourceReadFunc) SetDefaultReturn(r0 conftypes.RawUnified, r1 error) {
f.SetDefaultHook(func(context.Context) (conftypes.RawUnified, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *ConfigurationSourceReadFunc) PushReturn(r0 conftypes.RawUnified, r1 error) {
f.PushHook(func(context.Context) (conftypes.RawUnified, error) {
return r0, r1
})
}
func (f *ConfigurationSourceReadFunc) nextHook() func(context.Context) (conftypes.RawUnified, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *ConfigurationSourceReadFunc) appendCall(r0 ConfigurationSourceReadFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of ConfigurationSourceReadFuncCall objects
// describing the invocations of this function.
func (f *ConfigurationSourceReadFunc) History() []ConfigurationSourceReadFuncCall {
f.mutex.Lock()
history := make([]ConfigurationSourceReadFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// ConfigurationSourceReadFuncCall is an object that describes an invocation
// of method Read on an instance of MockConfigurationSource.
type ConfigurationSourceReadFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 conftypes.RawUnified
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c ConfigurationSourceReadFuncCall) Args() []interface{} {
return []interface{}{c.Arg0}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c ConfigurationSourceReadFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// ConfigurationSourceWriteFunc describes the behavior when the Write method
// of the parent MockConfigurationSource instance is invoked.
type ConfigurationSourceWriteFunc struct {
defaultHook func(context.Context, conftypes.RawUnified) error
hooks []func(context.Context, conftypes.RawUnified) error
history []ConfigurationSourceWriteFuncCall
mutex sync.Mutex
}
// Write delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockConfigurationSource) Write(v0 context.Context, v1 conftypes.RawUnified) error {
r0 := m.WriteFunc.nextHook()(v0, v1)
m.WriteFunc.appendCall(ConfigurationSourceWriteFuncCall{v0, v1, r0})
return r0
}
// SetDefaultHook sets function that is called when the Write method of the
// parent MockConfigurationSource instance is invoked and the hook queue is
// empty.
func (f *ConfigurationSourceWriteFunc) SetDefaultHook(hook func(context.Context, conftypes.RawUnified) error) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Write method of the parent MockConfigurationSource instance invokes the
// hook at the front of the queue and discards it. After the queue is empty,
// the default hook function is invoked for any future action.
func (f *ConfigurationSourceWriteFunc) PushHook(hook func(context.Context, conftypes.RawUnified) error) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *ConfigurationSourceWriteFunc) SetDefaultReturn(r0 error) {
f.SetDefaultHook(func(context.Context, conftypes.RawUnified) error {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *ConfigurationSourceWriteFunc) PushReturn(r0 error) {
f.PushHook(func(context.Context, conftypes.RawUnified) error {
return r0
})
}
func (f *ConfigurationSourceWriteFunc) nextHook() func(context.Context, conftypes.RawUnified) error {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *ConfigurationSourceWriteFunc) appendCall(r0 ConfigurationSourceWriteFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of ConfigurationSourceWriteFuncCall objects
// describing the invocations of this function.
func (f *ConfigurationSourceWriteFunc) History() []ConfigurationSourceWriteFuncCall {
f.mutex.Lock()
history := make([]ConfigurationSourceWriteFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// ConfigurationSourceWriteFuncCall is an object that describes an
// invocation of method Write on an instance of MockConfigurationSource.
type ConfigurationSourceWriteFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 conftypes.RawUnified
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c ConfigurationSourceWriteFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c ConfigurationSourceWriteFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}

View File

@ -2,10 +2,7 @@ package conf
import (
"context"
"log"
"math/rand"
"sync"
"time"
"github.com/sourcegraph/jsonx"
@ -23,19 +20,16 @@ type ConfigurationSource interface {
// Server provides access and manages modifications to the site configuration.
type Server struct {
Source ConfigurationSource
store *store
source ConfigurationSource
// sourceWrites signals when our app writes to the configuration source. The
// received channel should be closed when server.Raw() would return the new
// configuration that has been written to disk.
sourceWrites chan chan struct{}
needRestartMu sync.RWMutex
needRestart bool
// fileWrite signals when our app writes to the configuration file. The
// secondary channel is closed when server.Raw() would return the new
// configuration that has been written to disk.
fileWrite chan chan struct{}
once sync.Once
startOnce sync.Once
}
// NewServer returns a new Server instance that mangages the site config file
@ -43,21 +37,13 @@ type Server struct {
//
// The server must be started with Start() before it can handle requests.
func NewServer(source ConfigurationSource) *Server {
fileWrite := make(chan chan struct{}, 1)
return &Server{
Source: source,
store: defaultStore,
fileWrite: fileWrite,
source: source,
sourceWrites: make(chan chan struct{}, 1),
}
}
// Raw returns the raw text of the configuration file.
func (s *Server) Raw() conftypes.RawUnified {
return s.store.Raw()
}
// Write writes the JSON config file to the config file's path. If the JSON configuration is
// invalid, an error is returned.
// Write validates and writes input to the server's source.
func (s *Server) Write(ctx context.Context, input conftypes.RawUnified) error {
// Parse the configuration so that we can diff it (this also validates it
// is proper JSON).
@ -66,7 +52,7 @@ func (s *Server) Write(ctx context.Context, input conftypes.RawUnified) error {
return err
}
err = s.Source.Write(ctx, input)
err = s.source.Write(ctx, input)
if err != nil {
return err
}
@ -75,7 +61,10 @@ func (s *Server) Write(ctx context.Context, input conftypes.RawUnified) error {
// we would return to the caller earlier than server.Raw() would return the
// new configuration.
doneReading := make(chan struct{}, 1)
s.fileWrite <- doneReading
// Notify that we've written an update
s.sourceWrites <- doneReading
// Get notified that the update has been read (it gets closed) - don't write
// until this is done.
<-doneReading
return nil
@ -99,8 +88,9 @@ func (s *Server) Edit(ctx context.Context, computeEdits func(current *Unified, r
// TODO@ggilmore: There is a race condition here (also present in the existing library).
// Current and raw could be inconsistent. Another thing to offload to configStore?
// Snapshot method?
current := s.store.LastValid()
raw := s.store.Raw()
client := DefaultClient()
current := client.store.LastValid()
raw := client.Raw()
// Compute edits.
edits, err := computeEdits(current, raw)
@ -126,66 +116,33 @@ func (s *Server) Edit(ctx context.Context, computeEdits func(current *Unified, r
// Start initializes the server instance.
func (s *Server) Start() {
s.once.Do(func() {
go s.watchSource()
s.startOnce.Do(func() {
// We prepare to watch for config updates in order to mark the config server as
// needing a restart (or not). This must be in a goroutine, since Watch must
// happen after conf initialization (which may have not happened yet)
go func() {
var oldConfig *Unified
Watch(func() {
// Don't indicate restarts if this is the first update (initial configuration
// after service startup).
if oldConfig == nil {
oldConfig = Get()
return
}
// Update global "needs restart" state.
newConfig := Get()
if NeedRestartToApply(oldConfig, newConfig) {
s.markNeedServerRestart()
}
// Update old value
oldConfig = newConfig
})
}()
})
}
// watchSource reloads the configuration from the source at least every five seconds or whenever
// server.Write() is called.
func (s *Server) watchSource() {
ctx := context.Background()
for {
err := s.updateFromSource(ctx)
if err != nil {
log.Printf("failed to read configuration: %s. Fix your Sourcegraph configuration to resolve this error. Visit https://docs.sourcegraph.com/ to learn more.", err)
}
jitter := time.Duration(rand.Int63n(5 * int64(time.Second)))
var signalDoneReading chan struct{}
select {
case signalDoneReading = <-s.fileWrite:
// File was changed on FS, so check now.
case <-time.After(jitter):
// File possibly changed on FS, so check now.
}
if signalDoneReading != nil {
close(signalDoneReading)
}
}
}
func (s *Server) updateFromSource(ctx context.Context) error {
rawConfig, err := s.Source.Read(ctx)
if err != nil {
return errors.Wrap(err, "unable to read configuration")
}
configChange, err := s.store.MaybeUpdate(rawConfig)
if err != nil {
return err
}
// Don't need to restart if the configuration hasn't changed.
if !configChange.Changed {
return nil
}
// Don't restart if the configuration was empty before (this only occurs during initialization).
if configChange.Old == nil {
return nil
}
// Update global "needs restart" state.
if NeedRestartToApply(configChange.Old, configChange.New) {
s.markNeedServerRestart()
}
return nil
}
// NeedServerRestart tells if the server needs to restart for pending configuration
// changes to take effect.
func (s *Server) NeedServerRestart() bool {

View File

@ -27,13 +27,6 @@ type store struct {
once sync.Once
}
// defaultStore is shared between client and server in the same process,
// so we can make sure our writes in backend integration tests are immediately
// effectual. Without a shared store, the client will asynchronously poll
// for updates from the database, and we have no way to know when it's done,
// since the state served from the GraphQL API is the one in the server store.
var defaultStore = newStore()
// newStore returns a new configuration store.
func newStore() *store {
return &store{

View File

@ -208,3 +208,7 @@
path: github.com/sourcegraph/sourcegraph/enterprise/internal/authz/github
interfaces:
- client
- filename: internal/conf/mocks_test.go
path: github.com/sourcegraph/sourcegraph/internal/conf
interfaces:
- ConfigurationSource