diff --git a/cmd/repo-updater/repoupdater/server.go b/cmd/repo-updater/repoupdater/server.go index d741201d226..27492463995 100644 --- a/cmd/repo-updater/repoupdater/server.go +++ b/cmd/repo-updater/repoupdater/server.go @@ -14,6 +14,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/authz" + "github.com/sourcegraph/sourcegraph/internal/batches" livedependencies "github.com/sourcegraph/sourcegraph/internal/codeintel/dependencies/live" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/errcode" @@ -39,11 +40,8 @@ type Server struct { GitserverClient interface { ListCloned(context.Context) ([]string, error) } - ChangesetSyncRegistry interface { - // EnqueueChangesetSyncs will queue the supplied changesets to sync ASAP. - EnqueueChangesetSyncs(ctx context.Context, ids []int64) error - } - RateLimitSyncer interface { + ChangesetSyncRegistry batches.ChangesetSyncRegistry + RateLimitSyncer interface { // SyncRateLimiters should be called when an external service changes so that // our internal rate limiters are kept in sync SyncRateLimiters(ctx context.Context, ids ...int64) error diff --git a/cmd/repo-updater/shared/main.go b/cmd/repo-updater/shared/main.go index d2803ed563e..250e348f2d6 100644 --- a/cmd/repo-updater/shared/main.go +++ b/cmd/repo-updater/shared/main.go @@ -27,6 +27,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/actor" "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/authz" + "github.com/sourcegraph/sourcegraph/internal/batches" livedependencies "github.com/sourcegraph/sourcegraph/internal/codeintel/dependencies/live" "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/conf/conftypes" @@ -174,7 +175,7 @@ func Main(enterpriseInit EnterpriseInit) { Registerer: prometheus.DefaultRegisterer, } - go watchSyncer(ctx, logger, syncer, updateScheduler, server.PermsSyncer) + go watchSyncer(ctx, logger, syncer, updateScheduler, server.PermsSyncer, server.ChangesetSyncRegistry) go func() { err := syncer.Run(ctx, store, repos.RunOptions{ EnqueueInterval: repos.ConfRepoListUpdateInterval, @@ -473,6 +474,7 @@ func watchSyncer( syncer *repos.Syncer, sched *repos.UpdateScheduler, permsSyncer permsSyncer, + changesetSyncer batches.UnarchivedChangesetSyncRegistry, ) { logger.Debug("started new repo syncer updates scheduler relay thread") @@ -491,6 +493,13 @@ func watchSyncer( // modified. permsSyncer.ScheduleRepos(ctx, getPrivateAddedOrModifiedRepos(diff)...) } + + // Similarly, changesetSyncer is only available in enterprise mode. + if changesetSyncer != nil && len(diff.ArchivedChanged) > 0 { + if err := changesetSyncer.EnqueueChangesetSyncsForRepos(ctx, diff.ArchivedChanged.IDs()); err != nil { + logger.Warn("error enqueuing changeset syncs for archived and unarchived repos", log.Error(err)) + } + } } } } diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/changeset_connection.go b/enterprise/cmd/frontend/internal/batches/resolvers/changeset_connection.go index b5796263194..1d139fcd726 100644 --- a/enterprise/cmd/frontend/internal/batches/resolvers/changeset_connection.go +++ b/enterprise/cmd/frontend/internal/batches/resolvers/changeset_connection.go @@ -74,7 +74,7 @@ func (r *changesetsConnectionResolver) TotalCount(ctx context.Context) (int32, e EnforceAuthz: !r.optsSafe, OnlyArchived: r.opts.OnlyArchived, IncludeArchived: r.opts.IncludeArchived, - RepoID: r.opts.RepoID, + RepoIDs: r.opts.RepoIDs, }) return int32(count), err } diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/resolver.go b/enterprise/cmd/frontend/internal/batches/resolvers/resolver.go index 1844f964398..0ac16b8fb05 100644 --- a/enterprise/cmd/frontend/internal/batches/resolvers/resolver.go +++ b/enterprise/cmd/frontend/internal/batches/resolvers/resolver.go @@ -17,6 +17,7 @@ import ( btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types" "github.com/sourcegraph/sourcegraph/enterprise/internal/licensing" "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/deviceid" "github.com/sourcegraph/sourcegraph/internal/encryption" @@ -938,7 +939,7 @@ func listChangesetOptsFromArgs(args *graphqlbackend.ListChangesetsArgs, batchCha if err != nil { return opts, false, errors.Wrap(err, "unmarshalling repo id") } - opts.RepoID = repoID + opts.RepoIDs = []api.RepoID{repoID} } return opts, safe, nil diff --git a/enterprise/cmd/frontend/internal/batches/resolvers/resolver_test.go b/enterprise/cmd/frontend/internal/batches/resolvers/resolver_test.go index 72988ce381c..cc76be4f466 100644 --- a/enterprise/cmd/frontend/internal/batches/resolvers/resolver_test.go +++ b/enterprise/cmd/frontend/internal/batches/resolvers/resolver_test.go @@ -1195,7 +1195,7 @@ func TestListChangesetOptsFromArgs(t *testing.T) { }, wantSafe: true, wantParsed: store.ListChangesetsOpts{ - RepoID: repoID, + RepoIDs: []api.RepoID{repoID}, }, }, // onlyClosable changesets diff --git a/enterprise/cmd/frontend/internal/batches/webhooks/bitbucketcloud.go b/enterprise/cmd/frontend/internal/batches/webhooks/bitbucketcloud.go index d5ad584d285..253f3ad4f84 100644 --- a/enterprise/cmd/frontend/internal/batches/webhooks/bitbucketcloud.go +++ b/enterprise/cmd/frontend/internal/batches/webhooks/bitbucketcloud.go @@ -204,7 +204,7 @@ func bitbucketCloudRepoCommitStatusEventPRs( // Now we can look up the changeset(s). changesets, _, err := bstore.ListChangesets(ctx, store.ListChangesetsOpts{ BitbucketCloudCommit: e.CommitStatus.Commit.Hash, - RepoID: repo.ID, + RepoIDs: []api.RepoID{repo.ID}, }) if err != nil { return nil, errors.Wrapf(err, "listing changesets matched to repo ID=%d", repo.ID) diff --git a/enterprise/internal/batches/background.go b/enterprise/internal/batches/background.go index ead94078941..a7e808d59fa 100644 --- a/enterprise/internal/batches/background.go +++ b/enterprise/internal/batches/background.go @@ -11,6 +11,7 @@ import ( "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store" "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/syncer" "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/batches" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/encryption" "github.com/sourcegraph/sourcegraph/internal/goroutine" @@ -26,10 +27,7 @@ func InitBackgroundJobs( db database.DB, key encryption.Key, cf *httpcli.Factory, -) interface { - // EnqueueChangesetSyncs will queue the supplied changesets to sync ASAP. - EnqueueChangesetSyncs(ctx context.Context, ids []int64) error -} { +) batches.ChangesetSyncRegistry { // We use an internal actor so that we can freely load dependencies from // the database without repository permissions being enforced. // We do check for repository permissions consciously in the Rewirer when diff --git a/enterprise/internal/batches/store/changesets.go b/enterprise/internal/batches/store/changesets.go index b2a22bae2cd..3a9ab8a20e6 100644 --- a/enterprise/internal/batches/store/changesets.go +++ b/enterprise/internal/batches/store/changesets.go @@ -263,7 +263,7 @@ type CountChangesetsOpts struct { PublicationState *btypes.ChangesetPublicationState TextSearch []search.TextSearchTerm EnforceAuthz bool - RepoID api.RepoID + RepoIDs []api.RepoID } // CountChangesets returns the number of changesets in the database. @@ -326,8 +326,8 @@ func countChangesetsQuery(opts *CountChangesetsOpts, authzConds *sqlf.Query) *sq if opts.EnforceAuthz { preds = append(preds, authzConds) } - if opts.RepoID != 0 { - preds = append(preds, sqlf.Sprintf("repo.id = %s", opts.RepoID)) + if len(opts.RepoIDs) > 0 { + preds = append(preds, sqlf.Sprintf("repo.id = ANY (%s)", pq.Array(opts.RepoIDs))) } join := sqlf.Sprintf("") @@ -524,7 +524,7 @@ type ListChangesetsOpts struct { OwnedByBatchChangeID int64 TextSearch []search.TextSearchTerm EnforceAuthz bool - RepoID api.RepoID + RepoIDs []api.RepoID BitbucketCloudCommit string } @@ -612,8 +612,8 @@ func listChangesetsQuery(opts *ListChangesetsOpts, authzConds *sqlf.Query) *sqlf if opts.EnforceAuthz { preds = append(preds, authzConds) } - if opts.RepoID != 0 { - preds = append(preds, sqlf.Sprintf("repo.id = %s", opts.RepoID)) + if len(opts.RepoIDs) > 0 { + preds = append(preds, sqlf.Sprintf("repo.id = ANY (%s)", pq.Array(opts.RepoIDs))) } if len(opts.BitbucketCloudCommit) >= 12 { // Bitbucket Cloud commit hashes in PR objects are generally truncated diff --git a/enterprise/internal/batches/store/changesets_test.go b/enterprise/internal/batches/store/changesets_test.go index 4964dedf79c..78259135bd1 100644 --- a/enterprise/internal/batches/store/changesets_test.go +++ b/enterprise/internal/batches/store/changesets_test.go @@ -11,6 +11,8 @@ import ( "github.com/google/go-cmp/cmp" "github.com/keegancsmith/sqlf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/sourcegraph/log/logtest" @@ -27,7 +29,6 @@ import ( "github.com/sourcegraph/sourcegraph/internal/extsvc/gitlab" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/sourcegraph/sourcegraph/internal/types" - "github.com/sourcegraph/sourcegraph/internal/types/typestest" batcheslib "github.com/sourcegraph/sourcegraph/lib/batches" ) @@ -58,11 +59,11 @@ func testStoreChangesets(t *testing.T, ctx context.Context, s *Store, clock ct.C repo := ct.TestRepo(t, es, extsvc.KindGitHub) otherRepo := ct.TestRepo(t, es, extsvc.KindGitHub) gitlabRepo := ct.TestRepo(t, es, extsvc.KindGitLab) + deletedRepo := ct.TestRepo(t, es, extsvc.KindBitbucketCloud) - if err := rs.Create(ctx, repo, otherRepo, gitlabRepo); err != nil { + if err := rs.Create(ctx, repo, otherRepo, gitlabRepo, deletedRepo); err != nil { t.Fatal(err) } - deletedRepo := otherRepo.With(typestest.Opt.RepoDeletedAt(clock.Now())) if err := rs.Delete(ctx, deletedRepo.ID); err != nil { t.Fatal(err) } @@ -694,6 +695,47 @@ func testStoreChangesets(t *testing.T, ctx context.Context, s *Store, clock ct.C } }) + t.Run("RepoIDs", func(t *testing.T) { + // Insert two changesets temporarily that are attached to other repos. + createRepoChangeset := func(repo *types.Repo, baseChangeset *btypes.Changeset) *btypes.Changeset { + t.Helper() + + c := baseChangeset.Clone() + c.RepoID = repo.ID + require.NoError(t, s.CreateChangeset(ctx, c)) + t.Cleanup(func() { s.DeleteChangeset(ctx, c.ID) }) + + return c + } + + otherChangeset := createRepoChangeset(otherRepo, changesets[1]) + gitlabChangeset := createRepoChangeset(gitlabRepo, changesets[1]) + + t.Run("single repo", func(t *testing.T) { + have, _, err := s.ListChangesets(ctx, ListChangesetsOpts{ + RepoIDs: []api.RepoID{repo.ID}, + }) + assert.NoError(t, err) + assert.ElementsMatch(t, changesets, have) + }) + + t.Run("multiple repos", func(t *testing.T) { + have, _, err := s.ListChangesets(ctx, ListChangesetsOpts{ + RepoIDs: []api.RepoID{otherRepo.ID, gitlabRepo.ID}, + }) + assert.NoError(t, err) + assert.ElementsMatch(t, []*btypes.Changeset{otherChangeset, gitlabChangeset}, have) + }) + + t.Run("repo without changesets", func(t *testing.T) { + have, _, err := s.ListChangesets(ctx, ListChangesetsOpts{ + RepoIDs: []api.RepoID{deletedRepo.ID}, + }) + assert.NoError(t, err) + assert.ElementsMatch(t, []*btypes.Changeset{}, have) + }) + }) + statePublished := btypes.ChangesetPublicationStatePublished stateUnpublished := btypes.ChangesetPublicationStateUnpublished stateQueued := btypes.ReconcilerStateQueued diff --git a/enterprise/internal/batches/syncer/mocks_test.go b/enterprise/internal/batches/syncer/mocks_test.go index cdd1faf3e97..40bb88579cb 100644 --- a/enterprise/internal/batches/syncer/mocks_test.go +++ b/enterprise/internal/batches/syncer/mocks_test.go @@ -45,6 +45,9 @@ type MockSyncStore struct { // ListChangesetSyncDataFunc is an instance of a mock function object // controlling the behavior of the method ListChangesetSyncData. ListChangesetSyncDataFunc *SyncStoreListChangesetSyncDataFunc + // ListChangesetsFunc is an instance of a mock function object + // controlling the behavior of the method ListChangesets. + ListChangesetsFunc *SyncStoreListChangesetsFunc // ListCodeHostsFunc is an instance of a mock function object // controlling the behavior of the method ListCodeHosts. ListCodeHostsFunc *SyncStoreListCodeHostsFunc @@ -110,6 +113,11 @@ func NewMockSyncStore() *MockSyncStore { return }, }, + ListChangesetsFunc: &SyncStoreListChangesetsFunc{ + defaultHook: func(context.Context, store.ListChangesetsOpts) (r0 types.Changesets, r1 int64, r2 error) { + return + }, + }, ListCodeHostsFunc: &SyncStoreListCodeHostsFunc{ defaultHook: func(context.Context, store.ListCodeHostsOpts) (r0 []*types.CodeHost, r1 error) { return @@ -187,6 +195,11 @@ func NewStrictMockSyncStore() *MockSyncStore { panic("unexpected invocation of MockSyncStore.ListChangesetSyncData") }, }, + ListChangesetsFunc: &SyncStoreListChangesetsFunc{ + defaultHook: func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) { + panic("unexpected invocation of MockSyncStore.ListChangesets") + }, + }, ListCodeHostsFunc: &SyncStoreListCodeHostsFunc{ defaultHook: func(context.Context, store.ListCodeHostsOpts) ([]*types.CodeHost, error) { panic("unexpected invocation of MockSyncStore.ListCodeHosts") @@ -248,6 +261,9 @@ func NewMockSyncStoreFrom(i SyncStore) *MockSyncStore { ListChangesetSyncDataFunc: &SyncStoreListChangesetSyncDataFunc{ defaultHook: i.ListChangesetSyncData, }, + ListChangesetsFunc: &SyncStoreListChangesetsFunc{ + defaultHook: i.ListChangesets, + }, ListCodeHostsFunc: &SyncStoreListCodeHostsFunc{ defaultHook: i.ListCodeHosts, }, @@ -1108,6 +1124,117 @@ func (c SyncStoreListChangesetSyncDataFuncCall) Results() []interface{} { return []interface{}{c.Result0, c.Result1} } +// SyncStoreListChangesetsFunc describes the behavior when the +// ListChangesets method of the parent MockSyncStore instance is invoked. +type SyncStoreListChangesetsFunc struct { + defaultHook func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) + hooks []func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) + history []SyncStoreListChangesetsFuncCall + mutex sync.Mutex +} + +// ListChangesets delegates to the next hook function in the queue and +// stores the parameter and result values of this invocation. +func (m *MockSyncStore) ListChangesets(v0 context.Context, v1 store.ListChangesetsOpts) (types.Changesets, int64, error) { + r0, r1, r2 := m.ListChangesetsFunc.nextHook()(v0, v1) + m.ListChangesetsFunc.appendCall(SyncStoreListChangesetsFuncCall{v0, v1, r0, r1, r2}) + return r0, r1, r2 +} + +// SetDefaultHook sets function that is called when the ListChangesets +// method of the parent MockSyncStore instance is invoked and the hook queue +// is empty. +func (f *SyncStoreListChangesetsFunc) SetDefaultHook(hook func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error)) { + f.defaultHook = hook +} + +// PushHook adds a function to the end of hook queue. Each invocation of the +// ListChangesets method of the parent MockSyncStore instance invokes the +// hook at the front of the queue and discards it. After the queue is empty, +// the default hook function is invoked for any future action. +func (f *SyncStoreListChangesetsFunc) PushHook(hook func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error)) { + f.mutex.Lock() + f.hooks = append(f.hooks, hook) + f.mutex.Unlock() +} + +// SetDefaultReturn calls SetDefaultHook with a function that returns the +// given values. +func (f *SyncStoreListChangesetsFunc) SetDefaultReturn(r0 types.Changesets, r1 int64, r2 error) { + f.SetDefaultHook(func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) { + return r0, r1, r2 + }) +} + +// PushReturn calls PushHook with a function that returns the given values. +func (f *SyncStoreListChangesetsFunc) PushReturn(r0 types.Changesets, r1 int64, r2 error) { + f.PushHook(func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) { + return r0, r1, r2 + }) +} + +func (f *SyncStoreListChangesetsFunc) nextHook() func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + if len(f.hooks) == 0 { + return f.defaultHook + } + + hook := f.hooks[0] + f.hooks = f.hooks[1:] + return hook +} + +func (f *SyncStoreListChangesetsFunc) appendCall(r0 SyncStoreListChangesetsFuncCall) { + f.mutex.Lock() + f.history = append(f.history, r0) + f.mutex.Unlock() +} + +// History returns a sequence of SyncStoreListChangesetsFuncCall objects +// describing the invocations of this function. +func (f *SyncStoreListChangesetsFunc) History() []SyncStoreListChangesetsFuncCall { + f.mutex.Lock() + history := make([]SyncStoreListChangesetsFuncCall, len(f.history)) + copy(history, f.history) + f.mutex.Unlock() + + return history +} + +// SyncStoreListChangesetsFuncCall is an object that describes an invocation +// of method ListChangesets on an instance of MockSyncStore. +type SyncStoreListChangesetsFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context + // Arg1 is the value of the 2nd argument passed to this method + // invocation. + Arg1 store.ListChangesetsOpts + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 types.Changesets + // Result1 is the value of the 2nd result returned from this method + // invocation. + Result1 int64 + // Result2 is the value of the 3rd result returned from this method + // invocation. + Result2 error +} + +// Args returns an interface slice containing the arguments of this +// invocation. +func (c SyncStoreListChangesetsFuncCall) Args() []interface{} { + return []interface{}{c.Arg0, c.Arg1} +} + +// Results returns an interface slice containing the results of this +// invocation. +func (c SyncStoreListChangesetsFuncCall) Results() []interface{} { + return []interface{}{c.Result0, c.Result1, c.Result2} +} + // SyncStoreListCodeHostsFunc describes the behavior when the ListCodeHosts // method of the parent MockSyncStore instance is invoked. type SyncStoreListCodeHostsFunc struct { diff --git a/enterprise/internal/batches/syncer/store.go b/enterprise/internal/batches/syncer/store.go index 20184fb304c..ddb29b618a4 100644 --- a/enterprise/internal/batches/syncer/store.go +++ b/enterprise/internal/batches/syncer/store.go @@ -11,6 +11,7 @@ import ( type SyncStore interface { ListCodeHosts(ctx context.Context, opts store.ListCodeHostsOpts) ([]*btypes.CodeHost, error) + ListChangesets(ctx context.Context, opts store.ListChangesetsOpts) (btypes.Changesets, int64, error) ListChangesetSyncData(context.Context, store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error) GetChangeset(context.Context, store.GetChangesetOpts) (*btypes.Changeset, error) UpdateChangesetCodeHostState(ctx context.Context, cs *btypes.Changeset) error diff --git a/enterprise/internal/batches/syncer/syncer.go b/enterprise/internal/batches/syncer/syncer.go index f24c17aa7fd..e6a1afbfd16 100644 --- a/enterprise/internal/batches/syncer/syncer.go +++ b/enterprise/internal/batches/syncer/syncer.go @@ -14,6 +14,8 @@ import ( "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/state" "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store" btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types" + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/batches" "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/goroutine" "github.com/sourcegraph/sourcegraph/internal/httpcli" @@ -44,7 +46,10 @@ type SyncRegistry struct { syncers map[string]*changesetSyncer } -var _ goroutine.BackgroundRoutine = &SyncRegistry{} +var ( + _ batches.ChangesetSyncRegistry = &SyncRegistry{} + _ goroutine.BackgroundRoutine = &SyncRegistry{} +) // NewSyncRegistry creates a new sync registry which starts a syncer for each code host and will update them // when external services are changed, added or removed. @@ -101,6 +106,30 @@ func (s *SyncRegistry) EnqueueChangesetSyncs(ctx context.Context, ids []int64) e return nil } +func (s *SyncRegistry) EnqueueChangesetSyncsForRepos(ctx context.Context, repoIDs []api.RepoID) error { + cs, _, err := s.syncStore.ListChangesets(ctx, store.ListChangesetsOpts{ + RepoIDs: repoIDs, + }) + if err != nil { + return errors.Wrapf(err, "listing changesets for repos %v", repoIDs) + } else if len(cs) == 0 { + return nil + } + + ids := make([]int64, len(cs)) + for i, c := range cs { + ids[i] = c.ID + } + + s.logger.Debug( + "enqueuing syncs for changesets on repos", + log.Int("repo count", len(repoIDs)), + log.Int("changeset count", len(ids)), + ) + + return s.EnqueueChangesetSyncs(ctx, ids) +} + // addCodeHostSyncer adds a syncer for the code host associated with the supplied code host if the syncer hasn't // already been added and starts it. func (s *SyncRegistry) addCodeHostSyncer(codeHost *btypes.CodeHost) { diff --git a/enterprise/internal/batches/syncer/syncer_test.go b/enterprise/internal/batches/syncer/syncer_test.go index c78c2fbbc53..069195fb5db 100644 --- a/enterprise/internal/batches/syncer/syncer_test.go +++ b/enterprise/internal/batches/syncer/syncer_test.go @@ -270,6 +270,59 @@ func TestSyncRegistry_EnqueueChangesetSyncs(t *testing.T) { } } +func TestSyncRegistry_EnqueueChangesetSyncsForRepos(t *testing.T) { + ctx := context.Background() + + t.Run("store error", func(t *testing.T) { + bstore := NewMockSyncStore() + want := errors.New("expected") + bstore.ListChangesetsFunc.SetDefaultReturn(nil, 0, want) + + s := &SyncRegistry{ + syncStore: bstore, + } + + err := s.EnqueueChangesetSyncsForRepos(ctx, []api.RepoID{}) + assert.ErrorIs(t, err, want) + }) + + t.Run("no changesets", func(t *testing.T) { + bstore := NewMockSyncStore() + bstore.ListChangesetsFunc.SetDefaultHook(func(ctx context.Context, opts store.ListChangesetsOpts) (btypes.Changesets, int64, error) { + assert.Equal(t, []api.RepoID{1}, opts.RepoIDs) + return []*btypes.Changeset{}, 0, nil + }) + + s := &SyncRegistry{ + syncStore: bstore, + } + + assert.NoError(t, s.EnqueueChangesetSyncsForRepos(ctx, []api.RepoID{1})) + }) + + t.Run("success", func(t *testing.T) { + cs := []*btypes.Changeset{ + {ID: 1}, + {ID: 2}, + } + + bstore := NewMockSyncStore() + bstore.ListChangesetsFunc.SetDefaultHook(func(ctx context.Context, opts store.ListChangesetsOpts) (btypes.Changesets, int64, error) { + assert.Equal(t, []api.RepoID{1}, opts.RepoIDs) + return cs, 0, nil + }) + + s := &SyncRegistry{ + logger: logtest.Scoped(t), + priorityNotify: make(chan []int64, 1), + syncStore: bstore, + } + + assert.NoError(t, s.EnqueueChangesetSyncsForRepos(ctx, []api.RepoID{1})) + assert.ElementsMatch(t, []int64{1, 2}, <-s.priorityNotify) + }) +} + func TestLoadChangesetSource(t *testing.T) { ctx := context.Background() cf := httpcli.NewFactory( diff --git a/internal/batches/syncer.go b/internal/batches/syncer.go new file mode 100644 index 00000000000..0c362f3262a --- /dev/null +++ b/internal/batches/syncer.go @@ -0,0 +1,23 @@ +// Package batches specifies interfaces that are called by Sourcegraph OSS, but +// implemented in enterprise code in enterprise builds. +// +// No actual Batch Changes functionality is provided in this package. +package batches + +import ( + "context" + + "github.com/sourcegraph/sourcegraph/internal/api" +) + +type ChangesetSyncRegistry interface { + UnarchivedChangesetSyncRegistry + // EnqueueChangesetSyncs will queue the supplied changesets to sync ASAP. + EnqueueChangesetSyncs(ctx context.Context, ids []int64) error +} + +type UnarchivedChangesetSyncRegistry interface { + // EnqueueChangesetSyncsForRepos will queue a sync for every changeset in + // every given repo ASAP. + EnqueueChangesetSyncsForRepos(ctx context.Context, repoIDs []api.RepoID) error +} diff --git a/internal/repos/syncer.go b/internal/repos/syncer.go index 58b9706e69b..81be9e588d4 100644 --- a/internal/repos/syncer.go +++ b/internal/repos/syncer.go @@ -210,6 +210,15 @@ type Diff struct { Deleted types.Repos Modified types.Repos Unmodified types.Repos + + // ArchivedChanged contains repositories that have been archived or + // unarchived on the code host between the previous sync and the current one. + // This is required for Batch Changes to migrate changesets on those + // repositories in and out of the read-only state. + // + // This field is always a strict subset of Modified, and is therefore not + // counted in Len() or iterated over in Repos(). + ArchivedChanged types.Repos } // Sort sorts all Diff elements by Repo.IDs. @@ -219,6 +228,7 @@ func (d *Diff) Sort() { d.Deleted, d.Modified, d.Unmodified, + d.ArchivedChanged, } { sort.Sort(ds) } @@ -720,6 +730,8 @@ func (s *Syncer) sync(ctx context.Context, svc *types.ExternalService, sourced * stored = types.Repos{existing} fallthrough case 1: // Existing repo, update. + wasArchived := stored[0].Archived + if !stored[0].Update(sourced) { d.Unmodified = append(d.Unmodified, stored[0]) break @@ -731,6 +743,11 @@ func (s *Syncer) sync(ctx context.Context, svc *types.ExternalService, sourced * *sourced = *stored[0] d.Modified = append(d.Modified, stored[0]) + + if (wasArchived == true && stored[0].Archived == false) || + (wasArchived == false && stored[0].Archived == true) { + d.ArchivedChanged = append(d.ArchivedChanged, stored[0]) + } case 0: // New repo, create. if !svc.IsSiteOwned() { // enforce user and org repo limits siteAdded, err := tx.CountNamespacedRepos(ctx, 0, 0) diff --git a/internal/repos/syncer_test.go b/internal/repos/syncer_test.go index e129475ca84..3ba87af601d 100644 --- a/internal/repos/syncer_test.go +++ b/internal/repos/syncer_test.go @@ -601,7 +601,7 @@ func testSyncerSync(s repos.Store) func(*testing.T) { } var want, have types.Repos - want.Concat(tc.diff.Added, tc.diff.Modified, tc.diff.Unmodified) + want.Concat(tc.diff.Added, tc.diff.Modified, tc.diff.Unmodified, tc.diff.ArchivedChanged) have, _ = st.RepoStore().List(ctx, database.ReposListOptions{}) want = want.With(typestest.Opt.RepoID(0)) @@ -655,45 +655,92 @@ func testSyncRepo(s repos.Store) func(*testing.T) { }) testCases := []struct { - name string - repo api.RepoName - background bool - before, after types.Repos - returned *types.Repo + name string + repo api.RepoName + background bool // whether to run SyncRepo in the background + before types.Repos // the repos to insert into the database before syncing + sourced *types.Repo // the repo that is returned by the fake sourcer + returned *types.Repo // the expected return value from SyncRepo (which changes meaning depending on background) + after types.Repos // the expected database repos after syncing + diff repos.Diff // the expected repos.Diff sent by the syncer }{{ name: "insert", repo: repo.Name, background: true, + sourced: repo.Clone(), returned: repo, after: types.Repos{repo}, + diff: repos.Diff{ + Added: types.Repos{repo}, + }, }, { name: "update", repo: repo.Name, background: true, before: types.Repos{oldRepo}, + sourced: repo.Clone(), returned: oldRepo, after: types.Repos{repo}, + diff: repos.Diff{ + Modified: types.Repos{repo}, + }, }, { name: "blocking update", repo: repo.Name, background: false, before: types.Repos{oldRepo}, + sourced: repo.Clone(), returned: repo, after: types.Repos{repo}, + diff: repos.Diff{ + Modified: types.Repos{repo}, + }, }, { name: "update name", repo: repo.Name, background: true, before: types.Repos{repo.With(typestest.Opt.RepoName("old/name"))}, + sourced: repo.Clone(), returned: repo, after: types.Repos{repo}, + diff: repos.Diff{ + Modified: types.Repos{repo}, + }, + }, { + name: "archived", + repo: repo.Name, + background: true, + before: types.Repos{repo}, + sourced: repo.With(typestest.Opt.RepoArchived(true)), + returned: repo, + after: types.Repos{repo.With(typestest.Opt.RepoArchived(true))}, + diff: repos.Diff{ + Modified: types.Repos{repo.With(typestest.Opt.RepoArchived(true))}, + ArchivedChanged: types.Repos{repo.With(typestest.Opt.RepoArchived(true))}, + }, + }, { + name: "unarchived", + repo: repo.Name, + background: true, + before: types.Repos{repo.With(typestest.Opt.RepoArchived(true))}, + sourced: repo.Clone(), + returned: repo.With(typestest.Opt.RepoArchived(true)), + after: types.Repos{repo}, + diff: repos.Diff{ + Modified: types.Repos{repo}, + ArchivedChanged: types.Repos{repo}, + }, }, { name: "delete conflicting name", repo: repo.Name, background: true, before: types.Repos{repo.With(typestest.Opt.RepoExternalID("old id"))}, + sourced: repo.Clone(), returned: repo.With(typestest.Opt.RepoExternalID("old id")), after: types.Repos{repo}, + diff: repos.Diff{ + Modified: types.Repos{repo}, + }, }, { name: "rename and delete conflicting name", repo: repo.Name, @@ -702,8 +749,12 @@ func testSyncRepo(s repos.Store) func(*testing.T) { repo.With(typestest.Opt.RepoExternalID("old id")), repo.With(typestest.Opt.RepoName("old name")), }, + sourced: repo.Clone(), returned: repo.With(typestest.Opt.RepoExternalID("old id")), after: types.Repos{repo}, + diff: repos.Diff{ + Modified: types.Repos{repo}, + }, }} for _, tc := range testCases { @@ -729,7 +780,7 @@ func testSyncRepo(s repos.Store) func(*testing.T) { Store: s, Synced: make(chan repos.Diff, 1), Sourcer: repos.NewFakeSourcer(nil, - repos.NewFakeSource(servicesPerKind[extsvc.KindGitHub], nil, repo.Clone()), + repos.NewFakeSource(servicesPerKind[extsvc.KindGitHub], nil, tc.sourced), ), } @@ -747,7 +798,9 @@ func testSyncRepo(s repos.Store) func(*testing.T) { t.Errorf("returned mismatch: (-have, +want):\n%s", diff) } - <-syncer.Synced + if diff := cmp.Diff(<-syncer.Synced, tc.diff, opt); diff != "" { + t.Errorf("diff mismatch: (-have, +want):\n%s", diff) + } after, err := s.RepoStore().List(ctx, database.ReposListOptions{}) if err != nil { @@ -1001,6 +1054,9 @@ func testSyncerMultipleServices(store repos.Store) func(t *testing.T) { if len(diff.Unmodified) != 0 { t.Fatalf("Expected 0 Unmodified repos. got %d", len(diff.Added)) } + if len(diff.ArchivedChanged) != 0 { + t.Fatalf("Expected 0 Archived repos. got %d", len(diff.Added)) + } } var jobsCompleted int diff --git a/internal/types/typestest/typestest.go b/internal/types/typestest/typestest.go index fd92756f5f5..0c115b5621e 100644 --- a/internal/types/typestest/typestest.go +++ b/internal/types/typestest/typestest.go @@ -265,6 +265,7 @@ var Opt = struct { RepoDeletedAt func(time.Time) func(*types.Repo) RepoSources func(...string) func(*types.Repo) RepoMetadata func(any) func(*types.Repo) + RepoArchived func(bool) func(*types.Repo) RepoExternalID func(string) func(*types.Repo) }{ ExternalServiceID: func(n int64) func(*types.ExternalService) { @@ -327,6 +328,11 @@ var Opt = struct { r.Metadata = md } }, + RepoArchived: func(b bool) func(*types.Repo) { + return func(r *types.Repo) { + r.Archived = b + } + }, RepoExternalID: func(id string) func(*types.Repo) { return func(r *types.Repo) { r.ExternalRepo.ID = id