diff --git a/cmd/frontend/graphqlbackend/site.go b/cmd/frontend/graphqlbackend/site.go index dfad192b9e1..17e2e003bda 100644 --- a/cmd/frontend/graphqlbackend/site.go +++ b/cmd/frontend/graphqlbackend/site.go @@ -140,7 +140,7 @@ func (r *siteConfigurationResolver) EffectiveContents(ctx context.Context) (JSON if err := backend.CheckCurrentUserIsSiteAdmin(ctx, r.db); err != nil { return "", err } - siteConfig, err := conf.RedactSecrets(globals.ConfigurationServerFrontendOnly.Raw()) + siteConfig, err := conf.RedactSecrets(conf.Raw()) return JSONCString(siteConfig.Site), err } @@ -174,7 +174,7 @@ func (r *schemaResolver) UpdateSiteConfiguration(ctx context.Context, args *stru return false, errors.Errorf("site configuration is invalid: %s", strings.Join(problems, ",")) } - prev := globals.ConfigurationServerFrontendOnly.Raw() + prev := conf.Raw() unredacted, err := conf.UnredactSecrets(args.Input, prev) if err != nil { return false, errors.Errorf("error unredacting secrets: %s", err) diff --git a/cmd/frontend/graphqlbackend/site_alerts.go b/cmd/frontend/graphqlbackend/site_alerts.go index 39872e15219..ad728eee2a5 100644 --- a/cmd/frontend/graphqlbackend/site_alerts.go +++ b/cmd/frontend/graphqlbackend/site_alerts.go @@ -11,7 +11,6 @@ import ( "github.com/inconshreveable/log15" "github.com/sourcegraph/sourcegraph/cmd/frontend/backend" - "github.com/sourcegraph/sourcegraph/cmd/frontend/globals" "github.com/sourcegraph/sourcegraph/cmd/frontend/internal/app/updatecheck" "github.com/sourcegraph/sourcegraph/internal/actor" "github.com/sourcegraph/sourcegraph/internal/conf" @@ -127,7 +126,7 @@ func init() { return nil } - problems, err := conf.Validate(globals.ConfigurationServerFrontendOnly.Raw()) + problems, err := conf.Validate(conf.Raw()) if err != nil { return []*Alert{ { diff --git a/cmd/frontend/internal/cli/config.go b/cmd/frontend/internal/cli/config.go index 652ba0feb42..75304a5e76f 100644 --- a/cmd/frontend/internal/cli/config.go +++ b/cmd/frontend/internal/cli/config.go @@ -17,7 +17,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/sourcegraph/sourcegraph/cmd/frontend/globals" "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/conf/conftypes" @@ -33,7 +32,7 @@ import ( ) func printConfigValidation() { - messages, err := conf.Validate(globals.ConfigurationServerFrontendOnly.Raw()) + messages, err := conf.Validate(conf.Raw()) if err != nil { log.Printf("Warning: Unable to validate Sourcegraph site configuration: %s", err) return diff --git a/cmd/frontend/internal/httpapi/internal.go b/cmd/frontend/internal/httpapi/internal.go index 60be675ef5b..181103dfdef 100644 --- a/cmd/frontend/internal/httpapi/internal.go +++ b/cmd/frontend/internal/httpapi/internal.go @@ -15,6 +15,7 @@ import ( "github.com/sourcegraph/sourcegraph/cmd/frontend/backend" "github.com/sourcegraph/sourcegraph/cmd/frontend/globals" "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/gitserver" "github.com/sourcegraph/sourcegraph/internal/jsonc" @@ -143,12 +144,9 @@ func serveExternalServicesList(db database.DB) func(w http.ResponseWriter, r *ht } } -func serveConfiguration(w http.ResponseWriter, r *http.Request) error { - raw, err := globals.ConfigurationServerFrontendOnly.Source.Read(r.Context()) - if err != nil { - return err - } - err = json.NewEncoder(w).Encode(raw) +func serveConfiguration(w http.ResponseWriter, _ *http.Request) error { + raw := conf.Raw() + err := json.NewEncoder(w).Encode(raw) if err != nil { return errors.Wrap(err, "Encode") } diff --git a/cmd/gitserver/main.go b/cmd/gitserver/main.go index ab98ba9f2be..a86b29e4e19 100644 --- a/cmd/gitserver/main.go +++ b/cmd/gitserver/main.go @@ -84,8 +84,6 @@ func main() { env.Lock() env.HandleHelpFlag() - - conf.Init() logging.Init() liblog := log.Init(log.Resource{ @@ -94,6 +92,8 @@ func main() { InstanceID: hostname.Get(), }, log.NewSentrySinkWithOptions(sentrylib.ClientOptions{SampleRate: 0.2})) // Experimental: DevX is observing how sampling affects the errors signal defer liblog.Sync() + + conf.Init() go conf.Watch(liblog.Update(conf.GetLogSinks)) tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient()) diff --git a/cmd/repo-updater/shared/main.go b/cmd/repo-updater/shared/main.go index 88cfdd14566..4eb50281701 100644 --- a/cmd/repo-updater/shared/main.go +++ b/cmd/repo-updater/shared/main.go @@ -77,14 +77,14 @@ func Main(enterpriseInit EnterpriseInit) { env.Lock() env.HandleHelpFlag() - conf.Init() liblog := log.Init(log.Resource{ Name: env.MyName, Version: version.Version(), InstanceID: hostname.Get(), }, log.NewSentrySinkWithOptions(sentrylib.ClientOptions{SampleRate: 0.2})) // Experimental: DevX is observing how sampling affects the errors signal - defer liblog.Sync() + + conf.Init() go conf.Watch(liblog.Update(conf.GetLogSinks)) tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient()) diff --git a/cmd/searcher/main.go b/cmd/searcher/main.go index 3507ce26ea7..25b86522730 100644 --- a/cmd/searcher/main.go +++ b/cmd/searcher/main.go @@ -228,15 +228,15 @@ func main() { env.Lock() env.HandleHelpFlag() stdlog.SetFlags(0) - conf.Init() logging.Init() liblog := log.Init(log.Resource{ Name: env.MyName, Version: version.Version(), InstanceID: hostname.Get(), }, log.NewSentrySinkWithOptions(sentrylib.ClientOptions{SampleRate: 0.2})) // Experimental: DevX is observing how sampling affects the errors signal - defer liblog.Sync() + + conf.Init() go conf.Watch(liblog.Update(conf.GetLogSinks)) tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient()) trace.Init() diff --git a/cmd/symbols/shared/main.go b/cmd/symbols/shared/main.go index 4d496fd60b0..322b3855faf 100644 --- a/cmd/symbols/shared/main.go +++ b/cmd/symbols/shared/main.go @@ -39,15 +39,15 @@ type SetupFunc func(observationContext *observation.Context, gitserverClient git func Main(setup SetupFunc) { // Initialization env.HandleHelpFlag() - conf.Init() logging.Init() liblog := log.Init(log.Resource{ Name: env.MyName, Version: version.Version(), InstanceID: hostname.Get(), }, log.NewSentrySinkWithOptions(sentrylib.ClientOptions{SampleRate: 0.2})) // Experimental: DevX is observing how sampling affects the errors signal - defer liblog.Sync() + + conf.Init() go conf.Watch(liblog.Update(conf.GetLogSinks)) tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient()) trace.Init() diff --git a/enterprise/cmd/precise-code-intel-worker/main.go b/enterprise/cmd/precise-code-intel-worker/main.go index 09567515d8a..b9981e05711 100644 --- a/enterprise/cmd/precise-code-intel-worker/main.go +++ b/enterprise/cmd/precise-code-intel-worker/main.go @@ -51,7 +51,6 @@ func main() { env.Lock() env.HandleHelpFlag() - conf.Init() logging.Init() liblog := log.Init(log.Resource{ Name: env.MyName, @@ -59,6 +58,8 @@ func main() { InstanceID: hostname.Get(), }) defer liblog.Sync() + + conf.Init() go conf.Watch(liblog.Update(conf.GetLogSinks)) tracer.Init(log.Scoped("tracer", "internal tracer package"), conf.DefaultClient()) trace.Init() diff --git a/internal/conf/client.go b/internal/conf/client.go index 2b53e034bc4..07dade733e4 100644 --- a/internal/conf/client.go +++ b/internal/conf/client.go @@ -2,13 +2,14 @@ package conf import ( "context" - "log" "math/rand" "net" "sync" "sync/atomic" "time" + "github.com/sourcegraph/log" + "github.com/sourcegraph/sourcegraph/internal/api/internalapi" "github.com/sourcegraph/sourcegraph/internal/conf/conftypes" "github.com/sourcegraph/sourcegraph/lib/errors" @@ -20,6 +21,12 @@ type client struct { passthrough ConfigurationSource watchersMu sync.Mutex watchers []chan struct{} + + // sourceUpdates receives events that indicate the configuration source has been + // updated. It should prompt the client to update the store, and the received channel + // should be closed when future queries to the client returns the most up to date + // configuration. + sourceUpdates <-chan chan struct{} } var _ conftypes.UnifiedQuerier = &client{} @@ -175,7 +182,7 @@ func (c *client) Cached(f func() any) (wrapped func() any) { } // notifyWatchers runs all the callbacks registered via client.Watch() whenever -// the configuration has changed. +// the configuration has changed. It does not block on individual sends. func (c *client) notifyWatchers() { c.watchersMu.Lock() defer c.watchersMu.Unlock() @@ -199,8 +206,8 @@ type continuousUpdateOptions struct { // contact the frontend for configuration) start up before the frontend. delayBeforeUnreachableLog time.Duration - log func(format string, v ...any) // log.Printf equivalent - sleep func() // sleep between updates + logger log.Logger + sleepBetweenUpdates func() // sleep between updates } // continuouslyUpdate runs (*client).fetchAndUpdate in an infinite loop, with error logging and @@ -209,16 +216,16 @@ type continuousUpdateOptions struct { // The optOnlySetByTests parameter is ONLY customized by tests. All callers in main code should pass // nil (so that the same defaults are used). func (c *client) continuouslyUpdate(optOnlySetByTests *continuousUpdateOptions) { - opt := optOnlySetByTests - if opt == nil { + opts := optOnlySetByTests + if opts == nil { // Apply defaults. - opt = &continuousUpdateOptions{ + opts = &continuousUpdateOptions{ // This needs to be long enough to allow the frontend to fully migrate the PostgreSQL // database in most cases, to avoid log spam when running sourcegraph/server for the // first time. delayBeforeUnreachableLog: 15 * time.Second, - log: log.Printf, - sleep: func() { + logger: log.Scoped("conf.client", "configuration client"), + sleepBetweenUpdates: func() { jitter := time.Duration(rand.Int63n(5 * int64(time.Second))) time.Sleep(jitter) }, @@ -230,15 +237,43 @@ func (c *client) continuouslyUpdate(optOnlySetByTests *continuousUpdateOptions) return errors.As(err, &e) && e.Op == "dial" } + waitForSleep := func() <-chan struct{} { + c := make(chan struct{}, 1) + go func() { + opts.sleepBetweenUpdates() + close(c) + }() + return c + } + + // Make an initial fetch an update - this is likely to error, so just discard the + // error on this initial attempt. + _ = c.fetchAndUpdate(opts.logger) + start := time.Now() for { - err := c.fetchAndUpdate() + logger := opts.logger + + // signalDoneReading, if set, indicates that we were prompted to update because + // the source has been updated. + var signalDoneReading chan struct{} + select { + case signalDoneReading = <-c.sourceUpdates: + // Config was changed at source, so let's check now + logger = logger.With(log.String("triggered_by", "sourceUpdates")) + case <-waitForSleep(): + // File possibly changed at source, so check now. + logger = logger.With(log.String("triggered_by", "waitForSleep")) + } + + logger.Debug("checking for updates") + err := c.fetchAndUpdate(logger) if err != nil { // Suppress log messages for errors caused by the frontend being unreachable until we've // given the frontend enough time to initialize (in case other services start up before // the frontend), to reduce log spam. - if time.Since(start) > opt.delayBeforeUnreachableLog || !isFrontendUnreachableError(err) { - opt.log("received error during background config update, err: %s", err) + if time.Since(start) > opts.delayBeforeUnreachableLog || !isFrontendUnreachableError(err) { + logger.Error("received error during background config update", log.Error(err)) } } else { // We successfully fetched the config, we reset the timer to give @@ -246,13 +281,17 @@ func (c *client) continuouslyUpdate(optOnlySetByTests *continuousUpdateOptions) start = time.Now() } - opt.sleep() + // Indicate that we are done reading, if we were prompted to update by the updates + // channel + if signalDoneReading != nil { + close(signalDoneReading) + } } } -func (c *client) fetchAndUpdate() error { - ctx := context.Background() +func (c *client) fetchAndUpdate(logger log.Logger) error { var ( + ctx = context.Background() newConfig conftypes.RawUnified err error ) @@ -271,7 +310,11 @@ func (c *client) fetchAndUpdate() error { } if configChange.Changed { + logger.Info("config changed, notifying watchers", + log.Int("watchers", len(c.watchers))) c.notifyWatchers() + } else { + logger.Debug("no config changes detected") } return nil diff --git a/internal/conf/client_test.go b/internal/conf/client_test.go index cd44beba93f..73d24e5e5c7 100644 --- a/internal/conf/client_test.go +++ b/internal/conf/client_test.go @@ -1,20 +1,23 @@ package conf import ( - "fmt" "net" "net/url" "runtime" - "strings" "testing" "time" + "github.com/sourcegraph/log" + "github.com/sourcegraph/log/logtest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/sourcegraph/sourcegraph/internal/api/internalapi" "github.com/sourcegraph/sourcegraph/internal/conf/conftypes" "github.com/sourcegraph/sourcegraph/lib/errors" ) -func TestClient_continuouslyUpdate(t *testing.T) { +func TestClientContinuouslyUpdate(t *testing.T) { t.Run("suppresses errors due to temporarily unreachable frontend", func(t *testing.T) { internalapi.MockClientConfiguration = func() (conftypes.RawUnified, error) { return conftypes.RawUnified{}, &url.Error{ @@ -29,27 +32,26 @@ func TestClient_continuouslyUpdate(t *testing.T) { defer func() { internalapi.MockClientConfiguration = nil }() var client client - var logMessages []string + logger, exportLogs := logtest.Captured(t) done := make(chan struct{}) sleeps := 0 const delayBeforeUnreachableLog = 150 * time.Millisecond // assumes first loop iter executes within this time period go client.continuouslyUpdate(&continuousUpdateOptions{ delayBeforeUnreachableLog: delayBeforeUnreachableLog, - log: func(format string, v ...any) { - logMessages = append(logMessages, fmt.Sprintf(format, v...)) - }, - sleep: func() { + logger: logger, + sleepBetweenUpdates: func() { + logMessages := exportLogs() switch sleeps { case 0: - if len(logMessages) > 0 { - t.Errorf("got log messages (below), want no log before delayBeforeUnreachableLog\n\n%s", strings.Join(logMessages, "\n")) + for _, message := range logMessages { + require.NotEqual(t, message.Level, log.LevelError, "expected no error messages before delayBeforeUnreachableLog") } - time.Sleep(delayBeforeUnreachableLog) sleeps++ + time.Sleep(delayBeforeUnreachableLog) case 1: - if len(logMessages) != 1 { - t.Errorf("got %d log messages, want exactly 1 log after delayBeforeUnreachableLog", len(logMessages)) - } + require.Len(t, logMessages, 2) + assert.Contains(t, logMessages[0].Message, "checking") + assert.Contains(t, logMessages[1].Message, "received error") // Exit goroutine after this test is done. close(done) @@ -59,4 +61,55 @@ func TestClient_continuouslyUpdate(t *testing.T) { }) <-done }) + + t.Run("watchers are called on update", func(t *testing.T) { + updates := make(chan chan struct{}) + mockSource := NewMockConfigurationSource() + client := &client{ + store: newStore(), + passthrough: mockSource, + sourceUpdates: updates, + } + client.store.initialize() + + mockSource.ReadFunc.PushReturn(conftypes.RawUnified{ + Site: ``, + }, nil) + mockSource.ReadFunc.PushReturn(conftypes.RawUnified{ + Site: `{"log":{}}`, + }, nil) + mockSource.ReadFunc.PushReturn(conftypes.RawUnified{ + Site: `{}`, + }, nil) + + done := make(chan struct{}) + go client.continuouslyUpdate(&continuousUpdateOptions{ + delayBeforeUnreachableLog: 0, + logger: logtest.Scoped(t), + // sleepBetweenUpdates never returns - this behaviour is tested above in the + // other test + sleepBetweenUpdates: func() { + <-done + runtime.Goexit() + }, + }) + + called := make(chan string, 1) + client.Watch(func() { + called <- client.Raw().Site + }) + assert.Equal(t, ``, <-called) // watch makes initial call with initial conf + + update := make(chan struct{}) + updates <- update + <-update + assert.Equal(t, `{"log":{}}`, <-called) + + update2 := make(chan struct{}) + updates <- update2 + <-update2 + assert.Equal(t, `{}`, <-called) + + close(done) + }) } diff --git a/internal/conf/conf.go b/internal/conf/conf.go index a419dc4d2cd..a401a69a2b5 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -89,8 +89,7 @@ func getMode() configurationMode { var configurationServerFrontendOnlyInitialized = make(chan struct{}) func initDefaultClient() *client { - clientStore := defaultStore - defaultClient := &client{store: clientStore} + defaultClient := &client{store: newStore()} mode := getMode() // Don't kickoff the background updaters for the client/server @@ -173,6 +172,10 @@ func InitConfigurationServerFrontendOnly(source ConfigurationSource) *Server { // and instead only relies on the DB. DefaultClient().passthrough = source + // Notify the default client of updates to the source to ensure updates + // propagate quickly. + DefaultClient().sourceUpdates = server.sourceWrites + go DefaultClient().continuouslyUpdate(nil) close(configurationServerFrontendOnlyInitialized) diff --git a/internal/conf/mocks_test.go b/internal/conf/mocks_test.go new file mode 100644 index 00000000000..0c45077cba5 --- /dev/null +++ b/internal/conf/mocks_test.go @@ -0,0 +1,286 @@ +// Code generated by go-mockgen 1.3.3; DO NOT EDIT. +// +// This file was generated by running `sg generate` (or `go-mockgen`) at the root of +// this repository. To add additional mocks to this or another package, add a new entry +// to the mockgen.yaml file in the root of this repository. + +package conf + +import ( + "context" + "sync" + + conftypes "github.com/sourcegraph/sourcegraph/internal/conf/conftypes" +) + +// MockConfigurationSource is a mock implementation of the +// ConfigurationSource interface (from the package +// github.com/sourcegraph/sourcegraph/internal/conf) used for unit testing. +type MockConfigurationSource struct { + // ReadFunc is an instance of a mock function object controlling the + // behavior of the method Read. + ReadFunc *ConfigurationSourceReadFunc + // WriteFunc is an instance of a mock function object controlling the + // behavior of the method Write. + WriteFunc *ConfigurationSourceWriteFunc +} + +// NewMockConfigurationSource creates a new mock of the ConfigurationSource +// interface. All methods return zero values for all results, unless +// overwritten. +func NewMockConfigurationSource() *MockConfigurationSource { + return &MockConfigurationSource{ + ReadFunc: &ConfigurationSourceReadFunc{ + defaultHook: func(context.Context) (r0 conftypes.RawUnified, r1 error) { + return + }, + }, + WriteFunc: &ConfigurationSourceWriteFunc{ + defaultHook: func(context.Context, conftypes.RawUnified) (r0 error) { + return + }, + }, + } +} + +// NewStrictMockConfigurationSource creates a new mock of the +// ConfigurationSource interface. All methods panic on invocation, unless +// overwritten. +func NewStrictMockConfigurationSource() *MockConfigurationSource { + return &MockConfigurationSource{ + ReadFunc: &ConfigurationSourceReadFunc{ + defaultHook: func(context.Context) (conftypes.RawUnified, error) { + panic("unexpected invocation of MockConfigurationSource.Read") + }, + }, + WriteFunc: &ConfigurationSourceWriteFunc{ + defaultHook: func(context.Context, conftypes.RawUnified) error { + panic("unexpected invocation of MockConfigurationSource.Write") + }, + }, + } +} + +// NewMockConfigurationSourceFrom creates a new mock of the +// MockConfigurationSource interface. All methods delegate to the given +// implementation, unless overwritten. +func NewMockConfigurationSourceFrom(i ConfigurationSource) *MockConfigurationSource { + return &MockConfigurationSource{ + ReadFunc: &ConfigurationSourceReadFunc{ + defaultHook: i.Read, + }, + WriteFunc: &ConfigurationSourceWriteFunc{ + defaultHook: i.Write, + }, + } +} + +// ConfigurationSourceReadFunc describes the behavior when the Read method +// of the parent MockConfigurationSource instance is invoked. +type ConfigurationSourceReadFunc struct { + defaultHook func(context.Context) (conftypes.RawUnified, error) + hooks []func(context.Context) (conftypes.RawUnified, error) + history []ConfigurationSourceReadFuncCall + mutex sync.Mutex +} + +// Read delegates to the next hook function in the queue and stores the +// parameter and result values of this invocation. +func (m *MockConfigurationSource) Read(v0 context.Context) (conftypes.RawUnified, error) { + r0, r1 := m.ReadFunc.nextHook()(v0) + m.ReadFunc.appendCall(ConfigurationSourceReadFuncCall{v0, r0, r1}) + return r0, r1 +} + +// SetDefaultHook sets function that is called when the Read method of the +// parent MockConfigurationSource instance is invoked and the hook queue is +// empty. +func (f *ConfigurationSourceReadFunc) SetDefaultHook(hook func(context.Context) (conftypes.RawUnified, error)) { + f.defaultHook = hook +} + +// PushHook adds a function to the end of hook queue. Each invocation of the +// Read method of the parent MockConfigurationSource instance invokes the +// hook at the front of the queue and discards it. After the queue is empty, +// the default hook function is invoked for any future action. +func (f *ConfigurationSourceReadFunc) PushHook(hook func(context.Context) (conftypes.RawUnified, error)) { + f.mutex.Lock() + f.hooks = append(f.hooks, hook) + f.mutex.Unlock() +} + +// SetDefaultReturn calls SetDefaultHook with a function that returns the +// given values. +func (f *ConfigurationSourceReadFunc) SetDefaultReturn(r0 conftypes.RawUnified, r1 error) { + f.SetDefaultHook(func(context.Context) (conftypes.RawUnified, error) { + return r0, r1 + }) +} + +// PushReturn calls PushHook with a function that returns the given values. +func (f *ConfigurationSourceReadFunc) PushReturn(r0 conftypes.RawUnified, r1 error) { + f.PushHook(func(context.Context) (conftypes.RawUnified, error) { + return r0, r1 + }) +} + +func (f *ConfigurationSourceReadFunc) nextHook() func(context.Context) (conftypes.RawUnified, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + if len(f.hooks) == 0 { + return f.defaultHook + } + + hook := f.hooks[0] + f.hooks = f.hooks[1:] + return hook +} + +func (f *ConfigurationSourceReadFunc) appendCall(r0 ConfigurationSourceReadFuncCall) { + f.mutex.Lock() + f.history = append(f.history, r0) + f.mutex.Unlock() +} + +// History returns a sequence of ConfigurationSourceReadFuncCall objects +// describing the invocations of this function. +func (f *ConfigurationSourceReadFunc) History() []ConfigurationSourceReadFuncCall { + f.mutex.Lock() + history := make([]ConfigurationSourceReadFuncCall, len(f.history)) + copy(history, f.history) + f.mutex.Unlock() + + return history +} + +// ConfigurationSourceReadFuncCall is an object that describes an invocation +// of method Read on an instance of MockConfigurationSource. +type ConfigurationSourceReadFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 conftypes.RawUnified + // Result1 is the value of the 2nd result returned from this method + // invocation. + Result1 error +} + +// Args returns an interface slice containing the arguments of this +// invocation. +func (c ConfigurationSourceReadFuncCall) Args() []interface{} { + return []interface{}{c.Arg0} +} + +// Results returns an interface slice containing the results of this +// invocation. +func (c ConfigurationSourceReadFuncCall) Results() []interface{} { + return []interface{}{c.Result0, c.Result1} +} + +// ConfigurationSourceWriteFunc describes the behavior when the Write method +// of the parent MockConfigurationSource instance is invoked. +type ConfigurationSourceWriteFunc struct { + defaultHook func(context.Context, conftypes.RawUnified) error + hooks []func(context.Context, conftypes.RawUnified) error + history []ConfigurationSourceWriteFuncCall + mutex sync.Mutex +} + +// Write delegates to the next hook function in the queue and stores the +// parameter and result values of this invocation. +func (m *MockConfigurationSource) Write(v0 context.Context, v1 conftypes.RawUnified) error { + r0 := m.WriteFunc.nextHook()(v0, v1) + m.WriteFunc.appendCall(ConfigurationSourceWriteFuncCall{v0, v1, r0}) + return r0 +} + +// SetDefaultHook sets function that is called when the Write method of the +// parent MockConfigurationSource instance is invoked and the hook queue is +// empty. +func (f *ConfigurationSourceWriteFunc) SetDefaultHook(hook func(context.Context, conftypes.RawUnified) error) { + f.defaultHook = hook +} + +// PushHook adds a function to the end of hook queue. Each invocation of the +// Write method of the parent MockConfigurationSource instance invokes the +// hook at the front of the queue and discards it. After the queue is empty, +// the default hook function is invoked for any future action. +func (f *ConfigurationSourceWriteFunc) PushHook(hook func(context.Context, conftypes.RawUnified) error) { + f.mutex.Lock() + f.hooks = append(f.hooks, hook) + f.mutex.Unlock() +} + +// SetDefaultReturn calls SetDefaultHook with a function that returns the +// given values. +func (f *ConfigurationSourceWriteFunc) SetDefaultReturn(r0 error) { + f.SetDefaultHook(func(context.Context, conftypes.RawUnified) error { + return r0 + }) +} + +// PushReturn calls PushHook with a function that returns the given values. +func (f *ConfigurationSourceWriteFunc) PushReturn(r0 error) { + f.PushHook(func(context.Context, conftypes.RawUnified) error { + return r0 + }) +} + +func (f *ConfigurationSourceWriteFunc) nextHook() func(context.Context, conftypes.RawUnified) error { + f.mutex.Lock() + defer f.mutex.Unlock() + + if len(f.hooks) == 0 { + return f.defaultHook + } + + hook := f.hooks[0] + f.hooks = f.hooks[1:] + return hook +} + +func (f *ConfigurationSourceWriteFunc) appendCall(r0 ConfigurationSourceWriteFuncCall) { + f.mutex.Lock() + f.history = append(f.history, r0) + f.mutex.Unlock() +} + +// History returns a sequence of ConfigurationSourceWriteFuncCall objects +// describing the invocations of this function. +func (f *ConfigurationSourceWriteFunc) History() []ConfigurationSourceWriteFuncCall { + f.mutex.Lock() + history := make([]ConfigurationSourceWriteFuncCall, len(f.history)) + copy(history, f.history) + f.mutex.Unlock() + + return history +} + +// ConfigurationSourceWriteFuncCall is an object that describes an +// invocation of method Write on an instance of MockConfigurationSource. +type ConfigurationSourceWriteFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context + // Arg1 is the value of the 2nd argument passed to this method + // invocation. + Arg1 conftypes.RawUnified + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 error +} + +// Args returns an interface slice containing the arguments of this +// invocation. +func (c ConfigurationSourceWriteFuncCall) Args() []interface{} { + return []interface{}{c.Arg0, c.Arg1} +} + +// Results returns an interface slice containing the results of this +// invocation. +func (c ConfigurationSourceWriteFuncCall) Results() []interface{} { + return []interface{}{c.Result0} +} diff --git a/internal/conf/server.go b/internal/conf/server.go index 8bf3d986f93..f35ab00b94d 100644 --- a/internal/conf/server.go +++ b/internal/conf/server.go @@ -2,10 +2,7 @@ package conf import ( "context" - "log" - "math/rand" "sync" - "time" "github.com/sourcegraph/jsonx" @@ -23,19 +20,16 @@ type ConfigurationSource interface { // Server provides access and manages modifications to the site configuration. type Server struct { - Source ConfigurationSource - - store *store + source ConfigurationSource + // sourceWrites signals when our app writes to the configuration source. The + // received channel should be closed when server.Raw() would return the new + // configuration that has been written to disk. + sourceWrites chan chan struct{} needRestartMu sync.RWMutex needRestart bool - // fileWrite signals when our app writes to the configuration file. The - // secondary channel is closed when server.Raw() would return the new - // configuration that has been written to disk. - fileWrite chan chan struct{} - - once sync.Once + startOnce sync.Once } // NewServer returns a new Server instance that mangages the site config file @@ -43,21 +37,13 @@ type Server struct { // // The server must be started with Start() before it can handle requests. func NewServer(source ConfigurationSource) *Server { - fileWrite := make(chan chan struct{}, 1) return &Server{ - Source: source, - store: defaultStore, - fileWrite: fileWrite, + source: source, + sourceWrites: make(chan chan struct{}, 1), } } -// Raw returns the raw text of the configuration file. -func (s *Server) Raw() conftypes.RawUnified { - return s.store.Raw() -} - -// Write writes the JSON config file to the config file's path. If the JSON configuration is -// invalid, an error is returned. +// Write validates and writes input to the server's source. func (s *Server) Write(ctx context.Context, input conftypes.RawUnified) error { // Parse the configuration so that we can diff it (this also validates it // is proper JSON). @@ -66,7 +52,7 @@ func (s *Server) Write(ctx context.Context, input conftypes.RawUnified) error { return err } - err = s.Source.Write(ctx, input) + err = s.source.Write(ctx, input) if err != nil { return err } @@ -75,7 +61,10 @@ func (s *Server) Write(ctx context.Context, input conftypes.RawUnified) error { // we would return to the caller earlier than server.Raw() would return the // new configuration. doneReading := make(chan struct{}, 1) - s.fileWrite <- doneReading + // Notify that we've written an update + s.sourceWrites <- doneReading + // Get notified that the update has been read (it gets closed) - don't write + // until this is done. <-doneReading return nil @@ -99,8 +88,9 @@ func (s *Server) Edit(ctx context.Context, computeEdits func(current *Unified, r // TODO@ggilmore: There is a race condition here (also present in the existing library). // Current and raw could be inconsistent. Another thing to offload to configStore? // Snapshot method? - current := s.store.LastValid() - raw := s.store.Raw() + client := DefaultClient() + current := client.store.LastValid() + raw := client.Raw() // Compute edits. edits, err := computeEdits(current, raw) @@ -126,66 +116,33 @@ func (s *Server) Edit(ctx context.Context, computeEdits func(current *Unified, r // Start initializes the server instance. func (s *Server) Start() { - s.once.Do(func() { - go s.watchSource() + s.startOnce.Do(func() { + // We prepare to watch for config updates in order to mark the config server as + // needing a restart (or not). This must be in a goroutine, since Watch must + // happen after conf initialization (which may have not happened yet) + go func() { + var oldConfig *Unified + Watch(func() { + // Don't indicate restarts if this is the first update (initial configuration + // after service startup). + if oldConfig == nil { + oldConfig = Get() + return + } + + // Update global "needs restart" state. + newConfig := Get() + if NeedRestartToApply(oldConfig, newConfig) { + s.markNeedServerRestart() + } + + // Update old value + oldConfig = newConfig + }) + }() }) } -// watchSource reloads the configuration from the source at least every five seconds or whenever -// server.Write() is called. -func (s *Server) watchSource() { - ctx := context.Background() - for { - err := s.updateFromSource(ctx) - if err != nil { - log.Printf("failed to read configuration: %s. Fix your Sourcegraph configuration to resolve this error. Visit https://docs.sourcegraph.com/ to learn more.", err) - } - - jitter := time.Duration(rand.Int63n(5 * int64(time.Second))) - - var signalDoneReading chan struct{} - select { - case signalDoneReading = <-s.fileWrite: - // File was changed on FS, so check now. - case <-time.After(jitter): - // File possibly changed on FS, so check now. - } - - if signalDoneReading != nil { - close(signalDoneReading) - } - } -} - -func (s *Server) updateFromSource(ctx context.Context) error { - rawConfig, err := s.Source.Read(ctx) - if err != nil { - return errors.Wrap(err, "unable to read configuration") - } - - configChange, err := s.store.MaybeUpdate(rawConfig) - if err != nil { - return err - } - - // Don't need to restart if the configuration hasn't changed. - if !configChange.Changed { - return nil - } - - // Don't restart if the configuration was empty before (this only occurs during initialization). - if configChange.Old == nil { - return nil - } - - // Update global "needs restart" state. - if NeedRestartToApply(configChange.Old, configChange.New) { - s.markNeedServerRestart() - } - - return nil -} - // NeedServerRestart tells if the server needs to restart for pending configuration // changes to take effect. func (s *Server) NeedServerRestart() bool { diff --git a/internal/conf/store.go b/internal/conf/store.go index 34fff7019cb..8e1247c1857 100644 --- a/internal/conf/store.go +++ b/internal/conf/store.go @@ -27,13 +27,6 @@ type store struct { once sync.Once } -// defaultStore is shared between client and server in the same process, -// so we can make sure our writes in backend integration tests are immediately -// effectual. Without a shared store, the client will asynchronously poll -// for updates from the database, and we have no way to know when it's done, -// since the state served from the GraphQL API is the one in the server store. -var defaultStore = newStore() - // newStore returns a new configuration store. func newStore() *store { return &store{ diff --git a/mockgen.test.yaml b/mockgen.test.yaml index 5e523eca5c7..b2110e9fe0c 100644 --- a/mockgen.test.yaml +++ b/mockgen.test.yaml @@ -208,3 +208,7 @@ path: github.com/sourcegraph/sourcegraph/enterprise/internal/authz/github interfaces: - client +- filename: internal/conf/mocks_test.go + path: github.com/sourcegraph/sourcegraph/internal/conf + interfaces: + - ConfigurationSource