mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 18:51:59 +00:00
repo-updater: queue changeset syncs when the archived flag changes (#38702)
This commit is contained in:
parent
9ae7fff92b
commit
4e3fdb37d5
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/authz"
|
||||
"github.com/sourcegraph/sourcegraph/internal/batches"
|
||||
livedependencies "github.com/sourcegraph/sourcegraph/internal/codeintel/dependencies/live"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/errcode"
|
||||
@ -39,11 +40,8 @@ type Server struct {
|
||||
GitserverClient interface {
|
||||
ListCloned(context.Context) ([]string, error)
|
||||
}
|
||||
ChangesetSyncRegistry interface {
|
||||
// EnqueueChangesetSyncs will queue the supplied changesets to sync ASAP.
|
||||
EnqueueChangesetSyncs(ctx context.Context, ids []int64) error
|
||||
}
|
||||
RateLimitSyncer interface {
|
||||
ChangesetSyncRegistry batches.ChangesetSyncRegistry
|
||||
RateLimitSyncer interface {
|
||||
// SyncRateLimiters should be called when an external service changes so that
|
||||
// our internal rate limiters are kept in sync
|
||||
SyncRateLimiters(ctx context.Context, ids ...int64) error
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/internal/actor"
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/authz"
|
||||
"github.com/sourcegraph/sourcegraph/internal/batches"
|
||||
livedependencies "github.com/sourcegraph/sourcegraph/internal/codeintel/dependencies/live"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
|
||||
@ -174,7 +175,7 @@ func Main(enterpriseInit EnterpriseInit) {
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
}
|
||||
|
||||
go watchSyncer(ctx, logger, syncer, updateScheduler, server.PermsSyncer)
|
||||
go watchSyncer(ctx, logger, syncer, updateScheduler, server.PermsSyncer, server.ChangesetSyncRegistry)
|
||||
go func() {
|
||||
err := syncer.Run(ctx, store, repos.RunOptions{
|
||||
EnqueueInterval: repos.ConfRepoListUpdateInterval,
|
||||
@ -473,6 +474,7 @@ func watchSyncer(
|
||||
syncer *repos.Syncer,
|
||||
sched *repos.UpdateScheduler,
|
||||
permsSyncer permsSyncer,
|
||||
changesetSyncer batches.UnarchivedChangesetSyncRegistry,
|
||||
) {
|
||||
logger.Debug("started new repo syncer updates scheduler relay thread")
|
||||
|
||||
@ -491,6 +493,13 @@ func watchSyncer(
|
||||
// modified.
|
||||
permsSyncer.ScheduleRepos(ctx, getPrivateAddedOrModifiedRepos(diff)...)
|
||||
}
|
||||
|
||||
// Similarly, changesetSyncer is only available in enterprise mode.
|
||||
if changesetSyncer != nil && len(diff.ArchivedChanged) > 0 {
|
||||
if err := changesetSyncer.EnqueueChangesetSyncsForRepos(ctx, diff.ArchivedChanged.IDs()); err != nil {
|
||||
logger.Warn("error enqueuing changeset syncs for archived and unarchived repos", log.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,7 +74,7 @@ func (r *changesetsConnectionResolver) TotalCount(ctx context.Context) (int32, e
|
||||
EnforceAuthz: !r.optsSafe,
|
||||
OnlyArchived: r.opts.OnlyArchived,
|
||||
IncludeArchived: r.opts.IncludeArchived,
|
||||
RepoID: r.opts.RepoID,
|
||||
RepoIDs: r.opts.RepoIDs,
|
||||
})
|
||||
return int32(count), err
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
|
||||
"github.com/sourcegraph/sourcegraph/enterprise/internal/licensing"
|
||||
"github.com/sourcegraph/sourcegraph/internal/actor"
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/deviceid"
|
||||
"github.com/sourcegraph/sourcegraph/internal/encryption"
|
||||
@ -938,7 +939,7 @@ func listChangesetOptsFromArgs(args *graphqlbackend.ListChangesetsArgs, batchCha
|
||||
if err != nil {
|
||||
return opts, false, errors.Wrap(err, "unmarshalling repo id")
|
||||
}
|
||||
opts.RepoID = repoID
|
||||
opts.RepoIDs = []api.RepoID{repoID}
|
||||
}
|
||||
|
||||
return opts, safe, nil
|
||||
|
||||
@ -1195,7 +1195,7 @@ func TestListChangesetOptsFromArgs(t *testing.T) {
|
||||
},
|
||||
wantSafe: true,
|
||||
wantParsed: store.ListChangesetsOpts{
|
||||
RepoID: repoID,
|
||||
RepoIDs: []api.RepoID{repoID},
|
||||
},
|
||||
},
|
||||
// onlyClosable changesets
|
||||
|
||||
@ -204,7 +204,7 @@ func bitbucketCloudRepoCommitStatusEventPRs(
|
||||
// Now we can look up the changeset(s).
|
||||
changesets, _, err := bstore.ListChangesets(ctx, store.ListChangesetsOpts{
|
||||
BitbucketCloudCommit: e.CommitStatus.Commit.Hash,
|
||||
RepoID: repo.ID,
|
||||
RepoIDs: []api.RepoID{repo.ID},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "listing changesets matched to repo ID=%d", repo.ID)
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
|
||||
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/syncer"
|
||||
"github.com/sourcegraph/sourcegraph/internal/actor"
|
||||
"github.com/sourcegraph/sourcegraph/internal/batches"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/encryption"
|
||||
"github.com/sourcegraph/sourcegraph/internal/goroutine"
|
||||
@ -26,10 +27,7 @@ func InitBackgroundJobs(
|
||||
db database.DB,
|
||||
key encryption.Key,
|
||||
cf *httpcli.Factory,
|
||||
) interface {
|
||||
// EnqueueChangesetSyncs will queue the supplied changesets to sync ASAP.
|
||||
EnqueueChangesetSyncs(ctx context.Context, ids []int64) error
|
||||
} {
|
||||
) batches.ChangesetSyncRegistry {
|
||||
// We use an internal actor so that we can freely load dependencies from
|
||||
// the database without repository permissions being enforced.
|
||||
// We do check for repository permissions consciously in the Rewirer when
|
||||
|
||||
@ -263,7 +263,7 @@ type CountChangesetsOpts struct {
|
||||
PublicationState *btypes.ChangesetPublicationState
|
||||
TextSearch []search.TextSearchTerm
|
||||
EnforceAuthz bool
|
||||
RepoID api.RepoID
|
||||
RepoIDs []api.RepoID
|
||||
}
|
||||
|
||||
// CountChangesets returns the number of changesets in the database.
|
||||
@ -326,8 +326,8 @@ func countChangesetsQuery(opts *CountChangesetsOpts, authzConds *sqlf.Query) *sq
|
||||
if opts.EnforceAuthz {
|
||||
preds = append(preds, authzConds)
|
||||
}
|
||||
if opts.RepoID != 0 {
|
||||
preds = append(preds, sqlf.Sprintf("repo.id = %s", opts.RepoID))
|
||||
if len(opts.RepoIDs) > 0 {
|
||||
preds = append(preds, sqlf.Sprintf("repo.id = ANY (%s)", pq.Array(opts.RepoIDs)))
|
||||
}
|
||||
|
||||
join := sqlf.Sprintf("")
|
||||
@ -524,7 +524,7 @@ type ListChangesetsOpts struct {
|
||||
OwnedByBatchChangeID int64
|
||||
TextSearch []search.TextSearchTerm
|
||||
EnforceAuthz bool
|
||||
RepoID api.RepoID
|
||||
RepoIDs []api.RepoID
|
||||
BitbucketCloudCommit string
|
||||
}
|
||||
|
||||
@ -612,8 +612,8 @@ func listChangesetsQuery(opts *ListChangesetsOpts, authzConds *sqlf.Query) *sqlf
|
||||
if opts.EnforceAuthz {
|
||||
preds = append(preds, authzConds)
|
||||
}
|
||||
if opts.RepoID != 0 {
|
||||
preds = append(preds, sqlf.Sprintf("repo.id = %s", opts.RepoID))
|
||||
if len(opts.RepoIDs) > 0 {
|
||||
preds = append(preds, sqlf.Sprintf("repo.id = ANY (%s)", pq.Array(opts.RepoIDs)))
|
||||
}
|
||||
if len(opts.BitbucketCloudCommit) >= 12 {
|
||||
// Bitbucket Cloud commit hashes in PR objects are generally truncated
|
||||
|
||||
@ -11,6 +11,8 @@ import (
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/keegancsmith/sqlf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/sourcegraph/log/logtest"
|
||||
|
||||
@ -27,7 +29,6 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/internal/extsvc/gitlab"
|
||||
"github.com/sourcegraph/sourcegraph/internal/observation"
|
||||
"github.com/sourcegraph/sourcegraph/internal/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/types/typestest"
|
||||
batcheslib "github.com/sourcegraph/sourcegraph/lib/batches"
|
||||
)
|
||||
|
||||
@ -58,11 +59,11 @@ func testStoreChangesets(t *testing.T, ctx context.Context, s *Store, clock ct.C
|
||||
repo := ct.TestRepo(t, es, extsvc.KindGitHub)
|
||||
otherRepo := ct.TestRepo(t, es, extsvc.KindGitHub)
|
||||
gitlabRepo := ct.TestRepo(t, es, extsvc.KindGitLab)
|
||||
deletedRepo := ct.TestRepo(t, es, extsvc.KindBitbucketCloud)
|
||||
|
||||
if err := rs.Create(ctx, repo, otherRepo, gitlabRepo); err != nil {
|
||||
if err := rs.Create(ctx, repo, otherRepo, gitlabRepo, deletedRepo); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
deletedRepo := otherRepo.With(typestest.Opt.RepoDeletedAt(clock.Now()))
|
||||
if err := rs.Delete(ctx, deletedRepo.ID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -694,6 +695,47 @@ func testStoreChangesets(t *testing.T, ctx context.Context, s *Store, clock ct.C
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("RepoIDs", func(t *testing.T) {
|
||||
// Insert two changesets temporarily that are attached to other repos.
|
||||
createRepoChangeset := func(repo *types.Repo, baseChangeset *btypes.Changeset) *btypes.Changeset {
|
||||
t.Helper()
|
||||
|
||||
c := baseChangeset.Clone()
|
||||
c.RepoID = repo.ID
|
||||
require.NoError(t, s.CreateChangeset(ctx, c))
|
||||
t.Cleanup(func() { s.DeleteChangeset(ctx, c.ID) })
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
otherChangeset := createRepoChangeset(otherRepo, changesets[1])
|
||||
gitlabChangeset := createRepoChangeset(gitlabRepo, changesets[1])
|
||||
|
||||
t.Run("single repo", func(t *testing.T) {
|
||||
have, _, err := s.ListChangesets(ctx, ListChangesetsOpts{
|
||||
RepoIDs: []api.RepoID{repo.ID},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.ElementsMatch(t, changesets, have)
|
||||
})
|
||||
|
||||
t.Run("multiple repos", func(t *testing.T) {
|
||||
have, _, err := s.ListChangesets(ctx, ListChangesetsOpts{
|
||||
RepoIDs: []api.RepoID{otherRepo.ID, gitlabRepo.ID},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.ElementsMatch(t, []*btypes.Changeset{otherChangeset, gitlabChangeset}, have)
|
||||
})
|
||||
|
||||
t.Run("repo without changesets", func(t *testing.T) {
|
||||
have, _, err := s.ListChangesets(ctx, ListChangesetsOpts{
|
||||
RepoIDs: []api.RepoID{deletedRepo.ID},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.ElementsMatch(t, []*btypes.Changeset{}, have)
|
||||
})
|
||||
})
|
||||
|
||||
statePublished := btypes.ChangesetPublicationStatePublished
|
||||
stateUnpublished := btypes.ChangesetPublicationStateUnpublished
|
||||
stateQueued := btypes.ReconcilerStateQueued
|
||||
|
||||
@ -45,6 +45,9 @@ type MockSyncStore struct {
|
||||
// ListChangesetSyncDataFunc is an instance of a mock function object
|
||||
// controlling the behavior of the method ListChangesetSyncData.
|
||||
ListChangesetSyncDataFunc *SyncStoreListChangesetSyncDataFunc
|
||||
// ListChangesetsFunc is an instance of a mock function object
|
||||
// controlling the behavior of the method ListChangesets.
|
||||
ListChangesetsFunc *SyncStoreListChangesetsFunc
|
||||
// ListCodeHostsFunc is an instance of a mock function object
|
||||
// controlling the behavior of the method ListCodeHosts.
|
||||
ListCodeHostsFunc *SyncStoreListCodeHostsFunc
|
||||
@ -110,6 +113,11 @@ func NewMockSyncStore() *MockSyncStore {
|
||||
return
|
||||
},
|
||||
},
|
||||
ListChangesetsFunc: &SyncStoreListChangesetsFunc{
|
||||
defaultHook: func(context.Context, store.ListChangesetsOpts) (r0 types.Changesets, r1 int64, r2 error) {
|
||||
return
|
||||
},
|
||||
},
|
||||
ListCodeHostsFunc: &SyncStoreListCodeHostsFunc{
|
||||
defaultHook: func(context.Context, store.ListCodeHostsOpts) (r0 []*types.CodeHost, r1 error) {
|
||||
return
|
||||
@ -187,6 +195,11 @@ func NewStrictMockSyncStore() *MockSyncStore {
|
||||
panic("unexpected invocation of MockSyncStore.ListChangesetSyncData")
|
||||
},
|
||||
},
|
||||
ListChangesetsFunc: &SyncStoreListChangesetsFunc{
|
||||
defaultHook: func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) {
|
||||
panic("unexpected invocation of MockSyncStore.ListChangesets")
|
||||
},
|
||||
},
|
||||
ListCodeHostsFunc: &SyncStoreListCodeHostsFunc{
|
||||
defaultHook: func(context.Context, store.ListCodeHostsOpts) ([]*types.CodeHost, error) {
|
||||
panic("unexpected invocation of MockSyncStore.ListCodeHosts")
|
||||
@ -248,6 +261,9 @@ func NewMockSyncStoreFrom(i SyncStore) *MockSyncStore {
|
||||
ListChangesetSyncDataFunc: &SyncStoreListChangesetSyncDataFunc{
|
||||
defaultHook: i.ListChangesetSyncData,
|
||||
},
|
||||
ListChangesetsFunc: &SyncStoreListChangesetsFunc{
|
||||
defaultHook: i.ListChangesets,
|
||||
},
|
||||
ListCodeHostsFunc: &SyncStoreListCodeHostsFunc{
|
||||
defaultHook: i.ListCodeHosts,
|
||||
},
|
||||
@ -1108,6 +1124,117 @@ func (c SyncStoreListChangesetSyncDataFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0, c.Result1}
|
||||
}
|
||||
|
||||
// SyncStoreListChangesetsFunc describes the behavior when the
|
||||
// ListChangesets method of the parent MockSyncStore instance is invoked.
|
||||
type SyncStoreListChangesetsFunc struct {
|
||||
defaultHook func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error)
|
||||
hooks []func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error)
|
||||
history []SyncStoreListChangesetsFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// ListChangesets delegates to the next hook function in the queue and
|
||||
// stores the parameter and result values of this invocation.
|
||||
func (m *MockSyncStore) ListChangesets(v0 context.Context, v1 store.ListChangesetsOpts) (types.Changesets, int64, error) {
|
||||
r0, r1, r2 := m.ListChangesetsFunc.nextHook()(v0, v1)
|
||||
m.ListChangesetsFunc.appendCall(SyncStoreListChangesetsFuncCall{v0, v1, r0, r1, r2})
|
||||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the ListChangesets
|
||||
// method of the parent MockSyncStore instance is invoked and the hook queue
|
||||
// is empty.
|
||||
func (f *SyncStoreListChangesetsFunc) SetDefaultHook(hook func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error)) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
// PushHook adds a function to the end of hook queue. Each invocation of the
|
||||
// ListChangesets method of the parent MockSyncStore instance invokes the
|
||||
// hook at the front of the queue and discards it. After the queue is empty,
|
||||
// the default hook function is invoked for any future action.
|
||||
func (f *SyncStoreListChangesetsFunc) PushHook(hook func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error)) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *SyncStoreListChangesetsFunc) SetDefaultReturn(r0 types.Changesets, r1 int64, r2 error) {
|
||||
f.SetDefaultHook(func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) {
|
||||
return r0, r1, r2
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *SyncStoreListChangesetsFunc) PushReturn(r0 types.Changesets, r1 int64, r2 error) {
|
||||
f.PushHook(func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) {
|
||||
return r0, r1, r2
|
||||
})
|
||||
}
|
||||
|
||||
func (f *SyncStoreListChangesetsFunc) nextHook() func(context.Context, store.ListChangesetsOpts) (types.Changesets, int64, error) {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
if len(f.hooks) == 0 {
|
||||
return f.defaultHook
|
||||
}
|
||||
|
||||
hook := f.hooks[0]
|
||||
f.hooks = f.hooks[1:]
|
||||
return hook
|
||||
}
|
||||
|
||||
func (f *SyncStoreListChangesetsFunc) appendCall(r0 SyncStoreListChangesetsFuncCall) {
|
||||
f.mutex.Lock()
|
||||
f.history = append(f.history, r0)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// History returns a sequence of SyncStoreListChangesetsFuncCall objects
|
||||
// describing the invocations of this function.
|
||||
func (f *SyncStoreListChangesetsFunc) History() []SyncStoreListChangesetsFuncCall {
|
||||
f.mutex.Lock()
|
||||
history := make([]SyncStoreListChangesetsFuncCall, len(f.history))
|
||||
copy(history, f.history)
|
||||
f.mutex.Unlock()
|
||||
|
||||
return history
|
||||
}
|
||||
|
||||
// SyncStoreListChangesetsFuncCall is an object that describes an invocation
|
||||
// of method ListChangesets on an instance of MockSyncStore.
|
||||
type SyncStoreListChangesetsFuncCall struct {
|
||||
// Arg0 is the value of the 1st argument passed to this method
|
||||
// invocation.
|
||||
Arg0 context.Context
|
||||
// Arg1 is the value of the 2nd argument passed to this method
|
||||
// invocation.
|
||||
Arg1 store.ListChangesetsOpts
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 types.Changesets
|
||||
// Result1 is the value of the 2nd result returned from this method
|
||||
// invocation.
|
||||
Result1 int64
|
||||
// Result2 is the value of the 3rd result returned from this method
|
||||
// invocation.
|
||||
Result2 error
|
||||
}
|
||||
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c SyncStoreListChangesetsFuncCall) Args() []interface{} {
|
||||
return []interface{}{c.Arg0, c.Arg1}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
// invocation.
|
||||
func (c SyncStoreListChangesetsFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0, c.Result1, c.Result2}
|
||||
}
|
||||
|
||||
// SyncStoreListCodeHostsFunc describes the behavior when the ListCodeHosts
|
||||
// method of the parent MockSyncStore instance is invoked.
|
||||
type SyncStoreListCodeHostsFunc struct {
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
type SyncStore interface {
|
||||
ListCodeHosts(ctx context.Context, opts store.ListCodeHostsOpts) ([]*btypes.CodeHost, error)
|
||||
ListChangesets(ctx context.Context, opts store.ListChangesetsOpts) (btypes.Changesets, int64, error)
|
||||
ListChangesetSyncData(context.Context, store.ListChangesetSyncDataOpts) ([]*btypes.ChangesetSyncData, error)
|
||||
GetChangeset(context.Context, store.GetChangesetOpts) (*btypes.Changeset, error)
|
||||
UpdateChangesetCodeHostState(ctx context.Context, cs *btypes.Changeset) error
|
||||
|
||||
@ -14,6 +14,8 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/state"
|
||||
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
|
||||
btypes "github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/batches"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/goroutine"
|
||||
"github.com/sourcegraph/sourcegraph/internal/httpcli"
|
||||
@ -44,7 +46,10 @@ type SyncRegistry struct {
|
||||
syncers map[string]*changesetSyncer
|
||||
}
|
||||
|
||||
var _ goroutine.BackgroundRoutine = &SyncRegistry{}
|
||||
var (
|
||||
_ batches.ChangesetSyncRegistry = &SyncRegistry{}
|
||||
_ goroutine.BackgroundRoutine = &SyncRegistry{}
|
||||
)
|
||||
|
||||
// NewSyncRegistry creates a new sync registry which starts a syncer for each code host and will update them
|
||||
// when external services are changed, added or removed.
|
||||
@ -101,6 +106,30 @@ func (s *SyncRegistry) EnqueueChangesetSyncs(ctx context.Context, ids []int64) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SyncRegistry) EnqueueChangesetSyncsForRepos(ctx context.Context, repoIDs []api.RepoID) error {
|
||||
cs, _, err := s.syncStore.ListChangesets(ctx, store.ListChangesetsOpts{
|
||||
RepoIDs: repoIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "listing changesets for repos %v", repoIDs)
|
||||
} else if len(cs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ids := make([]int64, len(cs))
|
||||
for i, c := range cs {
|
||||
ids[i] = c.ID
|
||||
}
|
||||
|
||||
s.logger.Debug(
|
||||
"enqueuing syncs for changesets on repos",
|
||||
log.Int("repo count", len(repoIDs)),
|
||||
log.Int("changeset count", len(ids)),
|
||||
)
|
||||
|
||||
return s.EnqueueChangesetSyncs(ctx, ids)
|
||||
}
|
||||
|
||||
// addCodeHostSyncer adds a syncer for the code host associated with the supplied code host if the syncer hasn't
|
||||
// already been added and starts it.
|
||||
func (s *SyncRegistry) addCodeHostSyncer(codeHost *btypes.CodeHost) {
|
||||
|
||||
@ -270,6 +270,59 @@ func TestSyncRegistry_EnqueueChangesetSyncs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncRegistry_EnqueueChangesetSyncsForRepos(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("store error", func(t *testing.T) {
|
||||
bstore := NewMockSyncStore()
|
||||
want := errors.New("expected")
|
||||
bstore.ListChangesetsFunc.SetDefaultReturn(nil, 0, want)
|
||||
|
||||
s := &SyncRegistry{
|
||||
syncStore: bstore,
|
||||
}
|
||||
|
||||
err := s.EnqueueChangesetSyncsForRepos(ctx, []api.RepoID{})
|
||||
assert.ErrorIs(t, err, want)
|
||||
})
|
||||
|
||||
t.Run("no changesets", func(t *testing.T) {
|
||||
bstore := NewMockSyncStore()
|
||||
bstore.ListChangesetsFunc.SetDefaultHook(func(ctx context.Context, opts store.ListChangesetsOpts) (btypes.Changesets, int64, error) {
|
||||
assert.Equal(t, []api.RepoID{1}, opts.RepoIDs)
|
||||
return []*btypes.Changeset{}, 0, nil
|
||||
})
|
||||
|
||||
s := &SyncRegistry{
|
||||
syncStore: bstore,
|
||||
}
|
||||
|
||||
assert.NoError(t, s.EnqueueChangesetSyncsForRepos(ctx, []api.RepoID{1}))
|
||||
})
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
cs := []*btypes.Changeset{
|
||||
{ID: 1},
|
||||
{ID: 2},
|
||||
}
|
||||
|
||||
bstore := NewMockSyncStore()
|
||||
bstore.ListChangesetsFunc.SetDefaultHook(func(ctx context.Context, opts store.ListChangesetsOpts) (btypes.Changesets, int64, error) {
|
||||
assert.Equal(t, []api.RepoID{1}, opts.RepoIDs)
|
||||
return cs, 0, nil
|
||||
})
|
||||
|
||||
s := &SyncRegistry{
|
||||
logger: logtest.Scoped(t),
|
||||
priorityNotify: make(chan []int64, 1),
|
||||
syncStore: bstore,
|
||||
}
|
||||
|
||||
assert.NoError(t, s.EnqueueChangesetSyncsForRepos(ctx, []api.RepoID{1}))
|
||||
assert.ElementsMatch(t, []int64{1, 2}, <-s.priorityNotify)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLoadChangesetSource(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cf := httpcli.NewFactory(
|
||||
|
||||
23
internal/batches/syncer.go
Normal file
23
internal/batches/syncer.go
Normal file
@ -0,0 +1,23 @@
|
||||
// Package batches specifies interfaces that are called by Sourcegraph OSS, but
|
||||
// implemented in enterprise code in enterprise builds.
|
||||
//
|
||||
// No actual Batch Changes functionality is provided in this package.
|
||||
package batches
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
)
|
||||
|
||||
type ChangesetSyncRegistry interface {
|
||||
UnarchivedChangesetSyncRegistry
|
||||
// EnqueueChangesetSyncs will queue the supplied changesets to sync ASAP.
|
||||
EnqueueChangesetSyncs(ctx context.Context, ids []int64) error
|
||||
}
|
||||
|
||||
type UnarchivedChangesetSyncRegistry interface {
|
||||
// EnqueueChangesetSyncsForRepos will queue a sync for every changeset in
|
||||
// every given repo ASAP.
|
||||
EnqueueChangesetSyncsForRepos(ctx context.Context, repoIDs []api.RepoID) error
|
||||
}
|
||||
@ -210,6 +210,15 @@ type Diff struct {
|
||||
Deleted types.Repos
|
||||
Modified types.Repos
|
||||
Unmodified types.Repos
|
||||
|
||||
// ArchivedChanged contains repositories that have been archived or
|
||||
// unarchived on the code host between the previous sync and the current one.
|
||||
// This is required for Batch Changes to migrate changesets on those
|
||||
// repositories in and out of the read-only state.
|
||||
//
|
||||
// This field is always a strict subset of Modified, and is therefore not
|
||||
// counted in Len() or iterated over in Repos().
|
||||
ArchivedChanged types.Repos
|
||||
}
|
||||
|
||||
// Sort sorts all Diff elements by Repo.IDs.
|
||||
@ -219,6 +228,7 @@ func (d *Diff) Sort() {
|
||||
d.Deleted,
|
||||
d.Modified,
|
||||
d.Unmodified,
|
||||
d.ArchivedChanged,
|
||||
} {
|
||||
sort.Sort(ds)
|
||||
}
|
||||
@ -720,6 +730,8 @@ func (s *Syncer) sync(ctx context.Context, svc *types.ExternalService, sourced *
|
||||
stored = types.Repos{existing}
|
||||
fallthrough
|
||||
case 1: // Existing repo, update.
|
||||
wasArchived := stored[0].Archived
|
||||
|
||||
if !stored[0].Update(sourced) {
|
||||
d.Unmodified = append(d.Unmodified, stored[0])
|
||||
break
|
||||
@ -731,6 +743,11 @@ func (s *Syncer) sync(ctx context.Context, svc *types.ExternalService, sourced *
|
||||
|
||||
*sourced = *stored[0]
|
||||
d.Modified = append(d.Modified, stored[0])
|
||||
|
||||
if (wasArchived == true && stored[0].Archived == false) ||
|
||||
(wasArchived == false && stored[0].Archived == true) {
|
||||
d.ArchivedChanged = append(d.ArchivedChanged, stored[0])
|
||||
}
|
||||
case 0: // New repo, create.
|
||||
if !svc.IsSiteOwned() { // enforce user and org repo limits
|
||||
siteAdded, err := tx.CountNamespacedRepos(ctx, 0, 0)
|
||||
|
||||
@ -601,7 +601,7 @@ func testSyncerSync(s repos.Store) func(*testing.T) {
|
||||
}
|
||||
|
||||
var want, have types.Repos
|
||||
want.Concat(tc.diff.Added, tc.diff.Modified, tc.diff.Unmodified)
|
||||
want.Concat(tc.diff.Added, tc.diff.Modified, tc.diff.Unmodified, tc.diff.ArchivedChanged)
|
||||
have, _ = st.RepoStore().List(ctx, database.ReposListOptions{})
|
||||
|
||||
want = want.With(typestest.Opt.RepoID(0))
|
||||
@ -655,45 +655,92 @@ func testSyncRepo(s repos.Store) func(*testing.T) {
|
||||
})
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
repo api.RepoName
|
||||
background bool
|
||||
before, after types.Repos
|
||||
returned *types.Repo
|
||||
name string
|
||||
repo api.RepoName
|
||||
background bool // whether to run SyncRepo in the background
|
||||
before types.Repos // the repos to insert into the database before syncing
|
||||
sourced *types.Repo // the repo that is returned by the fake sourcer
|
||||
returned *types.Repo // the expected return value from SyncRepo (which changes meaning depending on background)
|
||||
after types.Repos // the expected database repos after syncing
|
||||
diff repos.Diff // the expected repos.Diff sent by the syncer
|
||||
}{{
|
||||
name: "insert",
|
||||
repo: repo.Name,
|
||||
background: true,
|
||||
sourced: repo.Clone(),
|
||||
returned: repo,
|
||||
after: types.Repos{repo},
|
||||
diff: repos.Diff{
|
||||
Added: types.Repos{repo},
|
||||
},
|
||||
}, {
|
||||
name: "update",
|
||||
repo: repo.Name,
|
||||
background: true,
|
||||
before: types.Repos{oldRepo},
|
||||
sourced: repo.Clone(),
|
||||
returned: oldRepo,
|
||||
after: types.Repos{repo},
|
||||
diff: repos.Diff{
|
||||
Modified: types.Repos{repo},
|
||||
},
|
||||
}, {
|
||||
name: "blocking update",
|
||||
repo: repo.Name,
|
||||
background: false,
|
||||
before: types.Repos{oldRepo},
|
||||
sourced: repo.Clone(),
|
||||
returned: repo,
|
||||
after: types.Repos{repo},
|
||||
diff: repos.Diff{
|
||||
Modified: types.Repos{repo},
|
||||
},
|
||||
}, {
|
||||
name: "update name",
|
||||
repo: repo.Name,
|
||||
background: true,
|
||||
before: types.Repos{repo.With(typestest.Opt.RepoName("old/name"))},
|
||||
sourced: repo.Clone(),
|
||||
returned: repo,
|
||||
after: types.Repos{repo},
|
||||
diff: repos.Diff{
|
||||
Modified: types.Repos{repo},
|
||||
},
|
||||
}, {
|
||||
name: "archived",
|
||||
repo: repo.Name,
|
||||
background: true,
|
||||
before: types.Repos{repo},
|
||||
sourced: repo.With(typestest.Opt.RepoArchived(true)),
|
||||
returned: repo,
|
||||
after: types.Repos{repo.With(typestest.Opt.RepoArchived(true))},
|
||||
diff: repos.Diff{
|
||||
Modified: types.Repos{repo.With(typestest.Opt.RepoArchived(true))},
|
||||
ArchivedChanged: types.Repos{repo.With(typestest.Opt.RepoArchived(true))},
|
||||
},
|
||||
}, {
|
||||
name: "unarchived",
|
||||
repo: repo.Name,
|
||||
background: true,
|
||||
before: types.Repos{repo.With(typestest.Opt.RepoArchived(true))},
|
||||
sourced: repo.Clone(),
|
||||
returned: repo.With(typestest.Opt.RepoArchived(true)),
|
||||
after: types.Repos{repo},
|
||||
diff: repos.Diff{
|
||||
Modified: types.Repos{repo},
|
||||
ArchivedChanged: types.Repos{repo},
|
||||
},
|
||||
}, {
|
||||
name: "delete conflicting name",
|
||||
repo: repo.Name,
|
||||
background: true,
|
||||
before: types.Repos{repo.With(typestest.Opt.RepoExternalID("old id"))},
|
||||
sourced: repo.Clone(),
|
||||
returned: repo.With(typestest.Opt.RepoExternalID("old id")),
|
||||
after: types.Repos{repo},
|
||||
diff: repos.Diff{
|
||||
Modified: types.Repos{repo},
|
||||
},
|
||||
}, {
|
||||
name: "rename and delete conflicting name",
|
||||
repo: repo.Name,
|
||||
@ -702,8 +749,12 @@ func testSyncRepo(s repos.Store) func(*testing.T) {
|
||||
repo.With(typestest.Opt.RepoExternalID("old id")),
|
||||
repo.With(typestest.Opt.RepoName("old name")),
|
||||
},
|
||||
sourced: repo.Clone(),
|
||||
returned: repo.With(typestest.Opt.RepoExternalID("old id")),
|
||||
after: types.Repos{repo},
|
||||
diff: repos.Diff{
|
||||
Modified: types.Repos{repo},
|
||||
},
|
||||
}}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -729,7 +780,7 @@ func testSyncRepo(s repos.Store) func(*testing.T) {
|
||||
Store: s,
|
||||
Synced: make(chan repos.Diff, 1),
|
||||
Sourcer: repos.NewFakeSourcer(nil,
|
||||
repos.NewFakeSource(servicesPerKind[extsvc.KindGitHub], nil, repo.Clone()),
|
||||
repos.NewFakeSource(servicesPerKind[extsvc.KindGitHub], nil, tc.sourced),
|
||||
),
|
||||
}
|
||||
|
||||
@ -747,7 +798,9 @@ func testSyncRepo(s repos.Store) func(*testing.T) {
|
||||
t.Errorf("returned mismatch: (-have, +want):\n%s", diff)
|
||||
}
|
||||
|
||||
<-syncer.Synced
|
||||
if diff := cmp.Diff(<-syncer.Synced, tc.diff, opt); diff != "" {
|
||||
t.Errorf("diff mismatch: (-have, +want):\n%s", diff)
|
||||
}
|
||||
|
||||
after, err := s.RepoStore().List(ctx, database.ReposListOptions{})
|
||||
if err != nil {
|
||||
@ -1001,6 +1054,9 @@ func testSyncerMultipleServices(store repos.Store) func(t *testing.T) {
|
||||
if len(diff.Unmodified) != 0 {
|
||||
t.Fatalf("Expected 0 Unmodified repos. got %d", len(diff.Added))
|
||||
}
|
||||
if len(diff.ArchivedChanged) != 0 {
|
||||
t.Fatalf("Expected 0 Archived repos. got %d", len(diff.Added))
|
||||
}
|
||||
}
|
||||
|
||||
var jobsCompleted int
|
||||
|
||||
@ -265,6 +265,7 @@ var Opt = struct {
|
||||
RepoDeletedAt func(time.Time) func(*types.Repo)
|
||||
RepoSources func(...string) func(*types.Repo)
|
||||
RepoMetadata func(any) func(*types.Repo)
|
||||
RepoArchived func(bool) func(*types.Repo)
|
||||
RepoExternalID func(string) func(*types.Repo)
|
||||
}{
|
||||
ExternalServiceID: func(n int64) func(*types.ExternalService) {
|
||||
@ -327,6 +328,11 @@ var Opt = struct {
|
||||
r.Metadata = md
|
||||
}
|
||||
},
|
||||
RepoArchived: func(b bool) func(*types.Repo) {
|
||||
return func(r *types.Repo) {
|
||||
r.Archived = b
|
||||
}
|
||||
},
|
||||
RepoExternalID: func(id string) func(*types.Repo) {
|
||||
return func(r *types.Repo) {
|
||||
r.ExternalRepo.ID = id
|
||||
|
||||
Loading…
Reference in New Issue
Block a user