batches: Refactor changeset syncer (#23654)

This PR refactors the changeset syncer to get it a step closer to being able to run outside of repo-updater.
- Removes the hook based syncer startup/shutdown in favor of polling the database for a list of code hosts every once in a while
- Removes global initialization of metrics
- Makes SyncRegistry fulfil the goroutine.BackgroundRoutine interface
- Migrates to go-mockgen in the syncer tests, it's simple to use and made it easier for me to fix the tests
This commit is contained in:
Erik Seliger 2021-08-06 12:01:13 +02:00 committed by GitHub
parent 420f538102
commit 6778dd414c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1790 additions and 269 deletions

View File

@ -52,9 +52,6 @@ type Server struct {
ChangesetSyncRegistry interface {
// EnqueueChangesetSyncs will queue the supplied changesets to sync ASAP.
EnqueueChangesetSyncs(ctx context.Context, ids []int64) error
// HandleExternalServiceSync should be called when an external service changes so that
// the registry can start or stop the syncer associated with the service
HandleExternalServiceSync(es api.ExternalService)
}
RateLimitSyncer interface {
// SyncRateLimiters should be called when an external service changes so that
@ -245,9 +242,6 @@ func (s *Server) handleExternalServiceSync(w http.ResponseWriter, r *http.Reques
log15.Warn("Handling rate limiter sync", "err", err)
}
}
if s.ChangesetSyncRegistry != nil {
s.ChangesetSyncRegistry.HandleExternalServiceSync(req.ExternalService)
}
log15.Info("server.external-service-sync", "synced", req.ExternalService.Kind)
respond(w, http.StatusOK, &protocol.ExternalServiceSyncResult{

View File

@ -11,7 +11,6 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/syncer"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/encryption"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
@ -30,18 +29,7 @@ func InitBackgroundJobs(
) interface {
// EnqueueChangesetSyncs will queue the supplied changesets to sync ASAP.
EnqueueChangesetSyncs(ctx context.Context, ids []int64) error
// HandleExternalServiceSync should be called when an external service changes so that
// the registry can start or stop the syncer associated with the service
HandleExternalServiceSync(es api.ExternalService)
} {
observationContext := &observation.Context{
Logger: log15.Root(),
Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()},
Registerer: prometheus.DefaultRegisterer,
}
cstore := store.New(db, observationContext, key)
// We use an internal actor so that we can freely load dependencies from
// the database without repository permissions being enforced.
// We do check for repository permissions consciously in the Rewirer when
@ -49,9 +37,20 @@ func InitBackgroundJobs(
// host, we manually check for BatchChangesCredentials.
ctx = actor.WithInternalActor(ctx)
syncRegistry := syncer.NewSyncRegistry(ctx, cstore, cf)
observationContext := &observation.Context{
Logger: log15.Root(),
Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()},
Registerer: prometheus.DefaultRegisterer,
}
bstore := store.New(db, observationContext, key)
go goroutine.MonitorBackgroundRoutines(ctx, background.Routines(ctx, cstore, cf, observationContext)...)
syncRegistry := syncer.NewSyncRegistry(ctx, bstore, cf, observationContext)
routines := background.Routines(ctx, bstore, cf, observationContext)
routines = append(routines, syncRegistry)
go goroutine.MonitorBackgroundRoutines(ctx, routines...)
return syncRegistry
}

View File

@ -0,0 +1,3 @@
package syncer
//go:generate ../../../../dev/mockgen.sh github.com/sourcegraph/sourcegraph/enterprise/internal/batches/syncer -i SyncStore -o mock_iface_test.go

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,27 @@
package syncer
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
)
type SyncStore interface {
ListCodeHosts(ctx context.Context, opts store.ListCodeHostsOpts) ([]*btypes.CodeHost, error)
ListChangesetSyncData(context.Context, store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error)
GetChangeset(context.Context, store.GetChangesetOpts) (*btypes.Changeset, error)
UpdateChangesetCodeHostState(ctx context.Context, cs *btypes.Changeset) error
UpsertChangesetEvents(ctx context.Context, cs ...*btypes.ChangesetEvent) error
GetSiteCredential(ctx context.Context, opts store.GetSiteCredentialOpts) (*btypes.SiteCredential, error)
Transact(context.Context) (*store.Store, error)
Repos() *database.RepoStore
ExternalServices() *database.ExternalServiceStore
Clock() func() time.Time
DB() dbutil.DB
GetExternalServiceIDs(ctx context.Context, opts store.GetExternalServiceIDsOpts) ([]int64, error)
UserCredentials() *database.UserCredentialsStore
}

View File

@ -9,25 +9,29 @@ import (
"github.com/cockroachdb/errors"
"github.com/inconshreveable/log15"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/sources"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/state"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/httpcli"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/types"
)
// externalServiceSyncerInterval is the time in between synchronizations with the
// database to start/stop syncers as needed.
const externalServiceSyncerInterval = 1 * time.Minute
// SyncRegistry manages a changesetSyncer per code host
type SyncRegistry struct {
ctx context.Context
cancel context.CancelFunc
syncStore SyncStore
httpFactory *httpcli.Factory
metrics *syncerMetrics
// Used to receive high priority sync requests
priorityNotify chan []int64
@ -37,45 +41,64 @@ type SyncRegistry struct {
syncers map[string]*changesetSyncer
}
type SyncStore interface {
ListCodeHosts(ctx context.Context, opts store.ListCodeHostsOpts) ([]*btypes.CodeHost, error)
ListChangesetSyncData(context.Context, store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error)
GetChangeset(context.Context, store.GetChangesetOpts) (*btypes.Changeset, error)
UpdateChangesetCodeHostState(ctx context.Context, cs *btypes.Changeset) error
UpsertChangesetEvents(ctx context.Context, cs ...*btypes.ChangesetEvent) error
GetSiteCredential(ctx context.Context, opts store.GetSiteCredentialOpts) (*btypes.SiteCredential, error)
Transact(context.Context) (*store.Store, error)
Repos() *database.RepoStore
ExternalServices() *database.ExternalServiceStore
Clock() func() time.Time
DB() dbutil.DB
GetExternalServiceIDs(ctx context.Context, opts store.GetExternalServiceIDsOpts) ([]int64, error)
UserCredentials() *database.UserCredentialsStore
}
var _ goroutine.BackgroundRoutine = &SyncRegistry{}
// NewSyncRegistry creates a new sync registry which starts a syncer for each code host and will update them
// when external services are changed, added or removed.
func NewSyncRegistry(ctx context.Context, cstore SyncStore, cf *httpcli.Factory) *SyncRegistry {
r := &SyncRegistry{
func NewSyncRegistry(ctx context.Context, bstore SyncStore, cf *httpcli.Factory, observationContext *observation.Context) *SyncRegistry {
ctx, cancel := context.WithCancel(ctx)
return &SyncRegistry{
ctx: ctx,
syncStore: cstore,
cancel: cancel,
syncStore: bstore,
httpFactory: cf,
priorityNotify: make(chan []int64, 500),
syncers: make(map[string]*changesetSyncer),
metrics: makeMetrics(observationContext),
}
}
if err := r.syncCodeHosts(ctx); err != nil {
func (s *SyncRegistry) Start() {
// Fetch initial list of syncers.
if err := s.syncCodeHosts(s.ctx); err != nil {
log15.Error("Fetching initial list of code hosts", "err", err)
}
go r.handlePriorityItems()
goroutine.Go(func() {
s.handlePriorityItems()
})
return r
externalServiceSyncer := goroutine.NewPeriodicGoroutine(
s.ctx,
externalServiceSyncerInterval,
goroutine.NewHandlerWithErrorMessage("Batch Changes syncer external service sync", func(ctx context.Context) error {
return s.syncCodeHosts(ctx)
}),
)
goroutine.MonitorBackgroundRoutines(s.ctx, externalServiceSyncer)
}
// Add adds a syncer for the code host associated with the supplied code host if the syncer hasn't
func (s *SyncRegistry) Stop() {
s.cancel()
}
// EnqueueChangesetSyncs will enqueue the changesets with the supplied ids for high priority syncing.
// An error indicates that no changesets have been enqueued.
func (s *SyncRegistry) EnqueueChangesetSyncs(ctx context.Context, ids []int64) error {
// The channel below is buffered so we'll usually send without blocking.
// It is important not to block here as this method is called from the UI
select {
case s.priorityNotify <- ids:
default:
return errors.New("high priority sync capacity reached")
}
return nil
}
// addCodeHostSyncer adds a syncer for the code host associated with the supplied code host if the syncer hasn't
// already been added and starts it.
func (s *SyncRegistry) Add(codeHost *btypes.CodeHost) {
func (s *SyncRegistry) addCodeHostSyncer(codeHost *btypes.CodeHost) {
// This should never happen since the store does the filtering for us, but let's be super duper extra cautious.
if !codeHost.IsSupported() {
log15.Info("Code host not support by batch changes", "type", codeHost.ExternalServiceType, "url", codeHost.ExternalServiceID)
@ -101,33 +124,14 @@ func (s *SyncRegistry) Add(codeHost *btypes.CodeHost) {
codeHostURL: syncerKey,
cancel: cancel,
priorityNotify: make(chan []int64, 500),
metrics: s.metrics,
}
s.syncers[syncerKey] = syncer
go syncer.Run(ctx)
}
// EnqueueChangesetSyncs will enqueue the changesets with the supplied ids for high priority syncing.
// An error indicates that no changesets have been enqueued.
func (s *SyncRegistry) EnqueueChangesetSyncs(ctx context.Context, ids []int64) error {
// The channel below is buffered so we'll usually send without blocking.
// It is important not to block here as this method is called from the UI
select {
case s.priorityNotify <- ids:
default:
return errors.New("high priority sync capacity reached")
}
return nil
}
// HandleExternalServiceSync handles changes to external services.
func (s *SyncRegistry) HandleExternalServiceSync(es api.ExternalService) {
if btypes.IsKindSupported(es.Kind) {
if err := s.syncCodeHosts(s.ctx); err != nil {
log15.Error("Syncing on change of code hosts", "err", err)
}
}
goroutine.Go(func() {
syncer.Run(ctx)
})
}
// handlePriorityItems fetches changesets in the priority queue from the database and passes them
@ -189,7 +193,7 @@ func (s *SyncRegistry) syncCodeHosts(ctx context.Context) error {
// Add and start syncers
for _, host := range codeHosts {
codeHostsByExternalServiceID[host.ExternalServiceID] = host
s.Add(host)
s.addCodeHostSyncer(host)
}
s.mu.Lock()
@ -214,6 +218,8 @@ type changesetSyncer struct {
syncStore SyncStore
httpFactory *httpcli.Factory
metrics *syncerMetrics
codeHostURL string
// scheduleInterval determines how often a new schedule will be computed.
@ -230,42 +236,52 @@ type changesetSyncer struct {
cancel context.CancelFunc
}
var syncerMetrics = struct {
type syncerMetrics struct {
syncs *prometheus.CounterVec
priorityQueued *prometheus.CounterVec
syncDuration *prometheus.HistogramVec
computeScheduleDuration *prometheus.HistogramVec
scheduleSize *prometheus.GaugeVec
behindSchedule *prometheus.GaugeVec
}{}
}
func init() {
syncerMetrics.syncs = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "src_repoupdater_changeset_syncer_syncs",
Help: "Total number of changeset syncs",
}, []string{"codehost", "success"})
syncerMetrics.priorityQueued = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "src_repoupdater_changeset_syncer_priority_queued",
Help: "Total number of priority items added to queue",
}, []string{"codehost"})
syncerMetrics.syncDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "src_repoupdater_changeset_syncer_sync_duration_seconds",
Help: "Time spent syncing changesets",
Buckets: []float64{1, 2, 5, 10, 30, 60, 120},
}, []string{"codehost", "success"})
syncerMetrics.computeScheduleDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "src_repoupdater_changeset_syncer_compute_schedule_duration_seconds",
Help: "Time spent computing changeset schedule",
Buckets: []float64{1, 2, 5, 10, 30, 60, 120},
}, []string{"codehost", "success"})
syncerMetrics.scheduleSize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "src_repoupdater_changeset_syncer_schedule_size",
Help: "The number of changesets scheduled to sync",
}, []string{"codehost"})
syncerMetrics.behindSchedule = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "src_repoupdater_changeset_syncer_behind_schedule",
Help: "The number of changesets behind schedule",
}, []string{"codehost"})
func makeMetrics(observationContext *observation.Context) *syncerMetrics {
metrics := &syncerMetrics{
syncs: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "src_repoupdater_changeset_syncer_syncs",
Help: "Total number of changeset syncs",
}, []string{"codehost", "success"}),
priorityQueued: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "src_repoupdater_changeset_syncer_priority_queued",
Help: "Total number of priority items added to queue",
}, []string{"codehost"}),
syncDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "src_repoupdater_changeset_syncer_sync_duration_seconds",
Help: "Time spent syncing changesets",
Buckets: []float64{1, 2, 5, 10, 30, 60, 120},
}, []string{"codehost", "success"}),
computeScheduleDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "src_repoupdater_changeset_syncer_compute_schedule_duration_seconds",
Help: "Time spent computing changeset schedule",
Buckets: []float64{1, 2, 5, 10, 30, 60, 120},
}, []string{"codehost", "success"}),
scheduleSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "src_repoupdater_changeset_syncer_schedule_size",
Help: "The number of changesets scheduled to sync",
}, []string{"codehost"}),
behindSchedule: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "src_repoupdater_changeset_syncer_behind_schedule",
Help: "The number of changesets behind schedule",
}, []string{"codehost"}),
}
observationContext.Registerer.MustRegister(metrics.syncs)
observationContext.Registerer.MustRegister(metrics.priorityQueued)
observationContext.Registerer.MustRegister(metrics.syncDuration)
observationContext.Registerer.MustRegister(metrics.computeScheduleDuration)
observationContext.Registerer.MustRegister(metrics.scheduleSize)
observationContext.Registerer.MustRegister(metrics.behindSchedule)
return metrics
}
// Run will start the process of changeset syncing. It is long running
@ -330,12 +346,12 @@ func (s *changesetSyncer) Run(ctx context.Context) {
start := s.syncStore.Clock()()
schedule, err := s.computeSchedule(ctx)
labelValues := []string{s.codeHostURL, strconv.FormatBool(err == nil)}
syncerMetrics.computeScheduleDuration.WithLabelValues(labelValues...).Observe(s.syncStore.Clock()().Sub(start).Seconds())
s.metrics.computeScheduleDuration.WithLabelValues(labelValues...).Observe(s.syncStore.Clock()().Sub(start).Seconds())
if err != nil {
log15.Error("Computing queue", "err", err)
continue
}
syncerMetrics.scheduleSize.WithLabelValues(s.codeHostURL).Set(float64(len(schedule)))
s.metrics.scheduleSize.WithLabelValues(s.codeHostURL).Set(float64(len(schedule)))
s.queue.Upsert(schedule...)
var behindSchedule int
now := s.syncStore.Clock()()
@ -344,13 +360,13 @@ func (s *changesetSyncer) Run(ctx context.Context) {
behindSchedule++
}
}
syncerMetrics.behindSchedule.WithLabelValues(s.codeHostURL).Set(float64(behindSchedule))
s.metrics.behindSchedule.WithLabelValues(s.codeHostURL).Set(float64(behindSchedule))
case <-timerChan:
start := s.syncStore.Clock()()
err := s.syncFunc(ctx, next.changesetID)
labelValues := []string{s.codeHostURL, strconv.FormatBool(err == nil)}
syncerMetrics.syncDuration.WithLabelValues(labelValues...).Observe(s.syncStore.Clock()().Sub(start).Seconds())
syncerMetrics.syncs.WithLabelValues(labelValues...).Add(1)
s.metrics.syncDuration.WithLabelValues(labelValues...).Observe(s.syncStore.Clock()().Sub(start).Seconds())
s.metrics.syncs.WithLabelValues(labelValues...).Add(1)
if err != nil {
log15.Error("Syncing changeset", "err", err)
@ -359,7 +375,7 @@ func (s *changesetSyncer) Run(ctx context.Context) {
// Remove item now that it has been processed
s.queue.Remove(next.changesetID)
syncerMetrics.scheduleSize.WithLabelValues(s.codeHostURL).Dec()
s.metrics.scheduleSize.WithLabelValues(s.codeHostURL).Dec()
case ids := <-s.priorityNotify:
if timer != nil {
timer.Stop()
@ -377,9 +393,9 @@ func (s *changesetSyncer) Run(ctx context.Context) {
}
item.priority = priorityHigh
s.queue.Upsert(item)
syncerMetrics.scheduleSize.WithLabelValues(s.codeHostURL).Inc()
s.metrics.scheduleSize.WithLabelValues(s.codeHostURL).Inc()
}
syncerMetrics.priorityQueued.WithLabelValues(s.codeHostURL).Add(float64(len(ids)))
s.metrics.priorityQueued.WithLabelValues(s.codeHostURL).Add(float64(len(ids)))
}
}
}

View File

@ -12,33 +12,37 @@ import (
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbtesting"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/extsvc"
"github.com/sourcegraph/sourcegraph/internal/extsvc/auth"
"github.com/sourcegraph/sourcegraph/internal/httpcli"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
"github.com/sourcegraph/sourcegraph/internal/types"
)
func newTestStore() *MockSyncStore {
s := NewMockSyncStore()
s.ClockFunc.SetDefaultReturn(timeutil.Now)
return s
}
func TestSyncerRun(t *testing.T) {
t.Parallel()
t.Run("Sync due", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
now := time.Now()
syncStore := MockSyncStore{
listChangesetSyncData: func(ctx context.Context, opts store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error) {
return []*btypes.ChangesetSyncData{
{
ChangesetID: 1,
UpdatedAt: now.Add(-2 * maxSyncDelay),
LatestEvent: now.Add(-2 * maxSyncDelay),
ExternalUpdatedAt: now.Add(-2 * maxSyncDelay),
},
}, nil
syncStore := newTestStore()
syncStore.ListChangesetSyncDataFunc.SetDefaultReturn([]*btypes.ChangesetSyncData{
{
ChangesetID: 1,
UpdatedAt: now.Add(-2 * maxSyncDelay),
LatestEvent: now.Add(-2 * maxSyncDelay),
ExternalUpdatedAt: now.Add(-2 * maxSyncDelay),
},
}
}, nil)
syncFunc := func(ctx context.Context, ids int64) error {
cancel()
return nil
@ -47,6 +51,7 @@ func TestSyncerRun(t *testing.T) {
syncStore: syncStore,
scheduleInterval: 10 * time.Minute,
syncFunc: syncFunc,
metrics: makeMetrics(&observation.TestContext),
}
go syncer.Run(ctx)
select {
@ -61,34 +66,31 @@ func TestSyncerRun(t *testing.T) {
defer cancel()
now := time.Now()
updateCalled := false
syncStore := MockSyncStore{
getChangeset: func(context.Context, store.GetChangesetOpts) (*btypes.Changeset, error) {
// Return ErrNoResults, which is the result you get when the changeset preconditions aren't met anymore.
// The sync data checks for the reconciler state and if it changed since the sync data was loaded,
// we don't get back the changeset here and skip it.
//
// If we don't return ErrNoResults, the rest of the test will fail, because not all
// methods of sync store are mocked.
return nil, store.ErrNoResults
syncStore := newTestStore()
// Return ErrNoResults, which is the result you get when the changeset preconditions aren't met anymore.
// The sync data checks for the reconciler state and if it changed since the sync data was loaded,
// we don't get back the changeset here and skip it.
//
// If we don't return ErrNoResults, the rest of the test will fail, because not all
// methods of sync store are mocked.
syncStore.GetChangesetFunc.SetDefaultReturn(nil, store.ErrNoResults)
syncStore.UpdateChangesetCodeHostStateFunc.SetDefaultHook(func(context.Context, *btypes.Changeset) error {
updateCalled = true
return nil
})
syncStore.ListChangesetSyncDataFunc.SetDefaultReturn([]*btypes.ChangesetSyncData{
{
ChangesetID: 1,
UpdatedAt: now.Add(-2 * maxSyncDelay),
LatestEvent: now.Add(-2 * maxSyncDelay),
ExternalUpdatedAt: now.Add(-2 * maxSyncDelay),
},
updateChangesetCodeHostState: func(context.Context, *btypes.Changeset) error {
updateCalled = true
return nil
},
listChangesetSyncData: func(ctx context.Context, opts store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error) {
return []*btypes.ChangesetSyncData{
{
ChangesetID: 1,
UpdatedAt: now.Add(-2 * maxSyncDelay),
LatestEvent: now.Add(-2 * maxSyncDelay),
ExternalUpdatedAt: now.Add(-2 * maxSyncDelay),
},
}, nil
},
}
}, nil)
syncer := &changesetSyncer{
syncStore: syncStore,
scheduleInterval: 10 * time.Minute,
metrics: makeMetrics(&observation.TestContext),
}
syncer.Run(ctx)
if updateCalled {
@ -100,18 +102,16 @@ func TestSyncerRun(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
now := time.Now()
syncStore := MockSyncStore{
listChangesetSyncData: func(ctx context.Context, opts store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error) {
return []*btypes.ChangesetSyncData{
{
ChangesetID: 1,
UpdatedAt: now,
LatestEvent: now,
ExternalUpdatedAt: now,
},
}, nil
syncStore := newTestStore()
syncStore.ListChangesetSyncDataFunc.SetDefaultReturn([]*btypes.ChangesetSyncData{
{
ChangesetID: 1,
UpdatedAt: now,
LatestEvent: now,
ExternalUpdatedAt: now,
},
}
}, nil)
var syncCalled bool
syncFunc := func(ctx context.Context, ids int64) error {
syncCalled = true
@ -121,6 +121,7 @@ func TestSyncerRun(t *testing.T) {
syncStore: syncStore,
scheduleInterval: 10 * time.Minute,
syncFunc: syncFunc,
metrics: makeMetrics(&observation.TestContext),
}
syncer.Run(ctx)
if syncCalled {
@ -131,20 +132,17 @@ func TestSyncerRun(t *testing.T) {
t.Run("Priority added", func(t *testing.T) {
// Empty schedule but then we add an item
ctx, cancel := context.WithCancel(context.Background())
syncStore := MockSyncStore{
listChangesetSyncData: func(ctx context.Context, opts store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error) {
return []*btypes.ChangesetSyncData{}, nil
},
}
syncFunc := func(ctx context.Context, ids int64) error {
cancel()
return nil
}
syncer := &changesetSyncer{
syncStore: syncStore,
syncStore: newTestStore(),
scheduleInterval: 10 * time.Minute,
syncFunc: syncFunc,
priorityNotify: make(chan []int64, 1),
metrics: makeMetrics(&observation.TestContext),
}
syncer.priorityNotify <- []int64{1}
go syncer.Run(ctx)
@ -168,24 +166,27 @@ func TestSyncRegistry(t *testing.T) {
codeHosts := []*btypes.CodeHost{{ExternalServiceID: externalServiceID, ExternalServiceType: extsvc.TypeGitHub}}
syncStore := MockSyncStore{
listChangesetSyncData: func(ctx context.Context, opts store.ListChangesetSyncDataOpts) (data []*btypes.ChangesetSyncData, err error) {
return []*btypes.ChangesetSyncData{
{
ChangesetID: 1,
UpdatedAt: now,
RepoExternalServiceID: externalServiceID,
},
}, nil
syncStore := newTestStore()
syncStore.ListChangesetSyncDataFunc.SetDefaultReturn([]*btypes.ChangesetSyncData{
{
ChangesetID: 1,
UpdatedAt: now,
RepoExternalServiceID: externalServiceID,
},
listCodeHosts: func(c context.Context, lcho store.ListCodeHostsOpts) ([]*btypes.CodeHost, error) {
return codeHosts, nil
},
}
}, nil)
syncStore.ListCodeHostsFunc.SetDefaultHook(func(c context.Context, lcho store.ListCodeHostsOpts) ([]*btypes.CodeHost, error) {
return codeHosts, nil
})
r := NewSyncRegistry(ctx, syncStore, nil)
r := NewSyncRegistry(ctx, syncStore, nil, &observation.TestContext)
go r.Start()
t.Cleanup(r.Stop)
r.syncCodeHosts(ctx)
assertSyncerCount := func(t *testing.T, want int) {
t.Helper()
assertSyncerCount := func(want int) {
r.mu.Lock()
if len(r.syncers) != want {
t.Fatalf("Expected %d syncer, got %d", want, len(r.syncers))
@ -193,30 +194,22 @@ func TestSyncRegistry(t *testing.T) {
r.mu.Unlock()
}
assertSyncerCount(1)
assertSyncerCount(t, 1)
// Adding it again should have no effect
r.Add(&btypes.CodeHost{ExternalServiceID: "https://example.com/", ExternalServiceType: extsvc.TypeGitHub})
assertSyncerCount(1)
r.addCodeHostSyncer(&btypes.CodeHost{ExternalServiceID: externalServiceID, ExternalServiceType: extsvc.TypeGitHub})
assertSyncerCount(t, 1)
// Simulate a service being removed
oldCodeHosts := codeHosts
codeHosts = []*btypes.CodeHost{}
r.HandleExternalServiceSync(api.ExternalService{
ID: 1,
Kind: extsvc.KindGitHub,
Config: `{"url": "https://example.com/"}`,
DeletedAt: now,
})
assertSyncerCount(0)
r.syncCodeHosts(ctx)
assertSyncerCount(t, 0)
codeHosts = oldCodeHosts
// And added again
r.HandleExternalServiceSync(api.ExternalService{
ID: 1,
Kind: extsvc.KindGitHub,
})
assertSyncerCount(1)
r.syncCodeHosts(ctx)
assertSyncerCount(t, 1)
syncChan := make(chan int64, 1)
@ -230,6 +223,7 @@ func TestSyncRegistry(t *testing.T) {
return nil
},
priorityNotify: make(chan []int64, 1),
metrics: makeMetrics(&observation.TestContext),
}
go syncer.Run(ctx)
@ -297,17 +291,17 @@ func TestLoadChangesetSource(t *testing.T) {
t.Cleanup(func() {
database.Mocks.ExternalServices.List = nil
})
hasCredential := false
syncStore := &MockSyncStore{
getSiteCredential: func(ctx context.Context, opts store.GetSiteCredentialOpts) (*btypes.SiteCredential, error) {
if hasCredential {
cred := &btypes.SiteCredential{}
cred.SetAuthenticator(ctx, &auth.OAuthBearerToken{Token: "456"})
return cred, nil
}
return nil, store.ErrNoResults
},
}
syncStore := newTestStore()
syncStore.GetSiteCredentialFunc.SetDefaultHook(func(ctx context.Context, opts store.GetSiteCredentialOpts) (*btypes.SiteCredential, error) {
if hasCredential {
cred := &btypes.SiteCredential{}
cred.SetAuthenticator(ctx, &auth.OAuthBearerToken{Token: "456"})
return cred, nil
}
return nil, store.ErrNoResults
})
// If no site-credential exists, the token from the external service should be used.
src, err := loadChangesetSource(ctx, cf, syncStore, repo)
@ -332,70 +326,3 @@ func TestLoadChangesetSource(t *testing.T) {
t.Fatalf("invalid token used, want=%q have=%q", want, have)
}
}
type MockSyncStore struct {
listCodeHosts func(context.Context, store.ListCodeHostsOpts) ([]*btypes.CodeHost, error)
listChangesetSyncData func(context.Context, store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error)
getChangeset func(context.Context, store.GetChangesetOpts) (*btypes.Changeset, error)
updateChangesetCodeHostState func(context.Context, *btypes.Changeset) error
upsertChangesetEvents func(context.Context, ...*btypes.ChangesetEvent) error
getSiteCredential func(ctx context.Context, opts store.GetSiteCredentialOpts) (*btypes.SiteCredential, error)
getExternalServiceIDs func(ctx context.Context, opts store.GetExternalServiceIDsOpts) ([]int64, error)
transact func(context.Context) (*store.Store, error)
}
func (m MockSyncStore) ListChangesetSyncData(ctx context.Context, opts store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error) {
return m.listChangesetSyncData(ctx, opts)
}
func (m MockSyncStore) GetChangeset(ctx context.Context, opts store.GetChangesetOpts) (*btypes.Changeset, error) {
return m.getChangeset(ctx, opts)
}
func (m MockSyncStore) UpdateChangesetCodeHostState(ctx context.Context, c *btypes.Changeset) error {
return m.updateChangesetCodeHostState(ctx, c)
}
func (m MockSyncStore) UpsertChangesetEvents(ctx context.Context, cs ...*btypes.ChangesetEvent) error {
return m.upsertChangesetEvents(ctx, cs...)
}
func (m MockSyncStore) GetSiteCredential(ctx context.Context, opts store.GetSiteCredentialOpts) (*btypes.SiteCredential, error) {
return m.getSiteCredential(ctx, opts)
}
func (m MockSyncStore) GetExternalServiceIDs(ctx context.Context, opts store.GetExternalServiceIDsOpts) ([]int64, error) {
return m.getExternalServiceIDs(ctx, opts)
}
func (m MockSyncStore) Transact(ctx context.Context) (*store.Store, error) {
return m.transact(ctx)
}
func (m MockSyncStore) Repos() *database.RepoStore {
// Return a RepoStore with a nil DB, so tests will fail when a mock is missing.
return database.Repos(&dbtesting.MockDB{})
}
func (m MockSyncStore) ExternalServices() *database.ExternalServiceStore {
// Return a ExternalServiceStore with a nil DB, so tests will fail when a mock is missing.
return database.ExternalServices(&dbtesting.MockDB{})
}
func (m MockSyncStore) UserCredentials() *database.UserCredentialsStore {
// Return a UserCredentialsStore with a nil DB, so tests will fail when a mock is missing.
return database.UserCredentials(&dbtesting.MockDB{}, nil)
}
func (m MockSyncStore) DB() dbutil.DB {
// Return a nil DB, so tests will fail when a mock is missing.
return nil
}
func (m MockSyncStore) Clock() func() time.Time {
return timeutil.Now
}
func (m MockSyncStore) ListCodeHosts(ctx context.Context, opts store.ListCodeHostsOpts) ([]*btypes.CodeHost, error) {
return m.listCodeHosts(ctx, opts)
}