[codenav]: refactor of auto-indexing background jobs (#40771)

This commit is contained in:
Cesar Jimenez 2022-08-25 13:22:58 -04:00 committed by GitHub
parent aacc491007
commit 02fb2e85cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 890 additions and 1085 deletions

View File

@ -12,7 +12,8 @@ import (
workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/background/scheduler"
policies "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
"github.com/sourcegraph/sourcegraph/internal/codeintel/policies"
policiesEnterprise "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/env"
@ -51,11 +52,6 @@ func (j *autoindexingScheduler) Routines(ctx context.Context, logger log.Logger)
}
databaseDB := database.NewDB(logger, db)
dbStore, err := codeintel.InitDBStore()
if err != nil {
return nil, err
}
lsifStore, err := codeintel.InitLSIFStore()
if err != nil {
return nil, err
@ -67,13 +63,15 @@ func (j *autoindexingScheduler) Routines(ctx context.Context, logger log.Logger)
return nil, err
}
repoUpdater := codeintel.InitRepoUpdaterClient()
policyMatcher := policies.NewMatcher(gitserverClient, policies.IndexingExtractor, false, true)
policyMatcher := policiesEnterprise.NewMatcher(gitserverClient, policiesEnterprise.IndexingExtractor, false, true)
// Initialize services
uploadSvc := uploads.GetService(databaseDB, database.NewDBWith(logger, lsifStore), gitserverClient)
autoindexingSvc := autoindexing.GetService(databaseDB, uploadSvc, gitserverClient, repoUpdater)
policySvc := policies.GetService(databaseDB, uploadSvc, gitserverClient)
// Initialize services
return []goroutine.BackgroundRoutine{
scheduler.NewScheduler(autoindexingSvc, dbStore, policyMatcher, observationContext),
scheduler.NewScheduler(autoindexingSvc, policySvc, uploadSvc, policyMatcher, observationContext),
}, nil
}

View File

@ -27,7 +27,6 @@ type DBStore interface {
InsertCloneableDependencyRepo(ctx context.Context, dependency precise.Package) (bool, error)
InsertDependencyIndexingJob(ctx context.Context, uploadID int, externalServiceKind string, syncTime time.Time) (int, error)
GetConfigurationPolicies(ctx context.Context, opts dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error)
SelectRepositoriesForIndexScan(ctx context.Context, table, column string, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int) ([]int, error)
}
type DBStoreShim struct {

View File

@ -53,10 +53,6 @@ type MockDBStore struct {
// ReferencesForUploadFunc is an instance of a mock function object
// controlling the behavior of the method ReferencesForUpload.
ReferencesForUploadFunc *DBStoreReferencesForUploadFunc
// SelectRepositoriesForIndexScanFunc is an instance of a mock function
// object controlling the behavior of the method
// SelectRepositoriesForIndexScan.
SelectRepositoriesForIndexScanFunc *DBStoreSelectRepositoriesForIndexScanFunc
// WithFunc is an instance of a mock function object controlling the
// behavior of the method With.
WithFunc *DBStoreWithFunc
@ -96,11 +92,6 @@ func NewMockDBStore() *MockDBStore {
return
},
},
SelectRepositoriesForIndexScanFunc: &DBStoreSelectRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, string, string, time.Duration, bool, *int, int) (r0 []int, r1 error) {
return
},
},
WithFunc: &DBStoreWithFunc{
defaultHook: func(basestore.ShareableStore) (r0 DBStore) {
return
@ -143,11 +134,6 @@ func NewStrictMockDBStore() *MockDBStore {
panic("unexpected invocation of MockDBStore.ReferencesForUpload")
},
},
SelectRepositoriesForIndexScanFunc: &DBStoreSelectRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error) {
panic("unexpected invocation of MockDBStore.SelectRepositoriesForIndexScan")
},
},
WithFunc: &DBStoreWithFunc{
defaultHook: func(basestore.ShareableStore) DBStore {
panic("unexpected invocation of MockDBStore.With")
@ -178,9 +164,6 @@ func NewMockDBStoreFrom(i DBStore) *MockDBStore {
ReferencesForUploadFunc: &DBStoreReferencesForUploadFunc{
defaultHook: i.ReferencesForUpload,
},
SelectRepositoriesForIndexScanFunc: &DBStoreSelectRepositoriesForIndexScanFunc{
defaultHook: i.SelectRepositoriesForIndexScan,
},
WithFunc: &DBStoreWithFunc{
defaultHook: i.With,
},
@ -859,133 +842,6 @@ func (c DBStoreReferencesForUploadFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// DBStoreSelectRepositoriesForIndexScanFunc describes the behavior when the
// SelectRepositoriesForIndexScan method of the parent MockDBStore instance
// is invoked.
type DBStoreSelectRepositoriesForIndexScanFunc struct {
defaultHook func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error)
hooks []func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error)
history []DBStoreSelectRepositoriesForIndexScanFuncCall
mutex sync.Mutex
}
// SelectRepositoriesForIndexScan delegates to the next hook function in the
// queue and stores the parameter and result values of this invocation.
func (m *MockDBStore) SelectRepositoriesForIndexScan(v0 context.Context, v1 string, v2 string, v3 time.Duration, v4 bool, v5 *int, v6 int) ([]int, error) {
r0, r1 := m.SelectRepositoriesForIndexScanFunc.nextHook()(v0, v1, v2, v3, v4, v5, v6)
m.SelectRepositoriesForIndexScanFunc.appendCall(DBStoreSelectRepositoriesForIndexScanFuncCall{v0, v1, v2, v3, v4, v5, v6, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the
// SelectRepositoriesForIndexScan method of the parent MockDBStore instance
// is invoked and the hook queue is empty.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) SetDefaultHook(hook func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// SelectRepositoriesForIndexScan method of the parent MockDBStore instance
// invokes the hook at the front of the queue and discards it. After the
// queue is empty, the default hook function is invoked for any future
// action.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) PushHook(hook func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error) {
return r0, r1
})
}
func (f *DBStoreSelectRepositoriesForIndexScanFunc) nextHook() func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *DBStoreSelectRepositoriesForIndexScanFunc) appendCall(r0 DBStoreSelectRepositoriesForIndexScanFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of
// DBStoreSelectRepositoriesForIndexScanFuncCall objects describing the
// invocations of this function.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) History() []DBStoreSelectRepositoriesForIndexScanFuncCall {
f.mutex.Lock()
history := make([]DBStoreSelectRepositoriesForIndexScanFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// DBStoreSelectRepositoriesForIndexScanFuncCall is an object that describes
// an invocation of method SelectRepositoriesForIndexScan on an instance of
// MockDBStore.
type DBStoreSelectRepositoriesForIndexScanFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 string
// Arg2 is the value of the 3rd argument passed to this method
// invocation.
Arg2 string
// Arg3 is the value of the 4th argument passed to this method
// invocation.
Arg3 time.Duration
// Arg4 is the value of the 5th argument passed to this method
// invocation.
Arg4 bool
// Arg5 is the value of the 6th argument passed to this method
// invocation.
Arg5 *int
// Arg6 is the value of the 7th argument passed to this method
// invocation.
Arg6 int
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c DBStoreSelectRepositoriesForIndexScanFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3, c.Arg4, c.Arg5, c.Arg6}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c DBStoreSelectRepositoriesForIndexScanFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// DBStoreWithFunc describes the behavior when the With method of the parent
// MockDBStore instance is invoked.
type DBStoreWithFunc struct {

View File

@ -6,18 +6,16 @@ import (
policies "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
"github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/stores/dbstore"
)
type DBStore interface {
GetConfigurationPolicies(ctx context.Context, opts dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error)
SelectRepositoriesForIndexScan(ctx context.Context, table, column string, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int) ([]int, error)
}
type PolicyMatcher interface {
CommitsDescribedByPolicy(ctx context.Context, repositoryID int, policies []dbstore.ConfigurationPolicy, now time.Time, filterCommits ...string) (map[string][]policies.PolicyMatch, error)
CommitsDescribedByPolicyInternal(ctx context.Context, repositoryID int, policies []shared.ConfigurationPolicy, now time.Time, filterCommits ...string) (map[string][]policies.PolicyMatch, error)
}
type PolicyService interface {
GetConfigurationPolicies(ctx context.Context, opts shared.GetConfigurationPoliciesOptions) ([]shared.ConfigurationPolicy, int, error)
}
type UploadService interface {
GetRepositoriesForIndexScan(ctx context.Context, table, column string, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) (_ []int, err error)
}

View File

@ -1,114 +0,0 @@
package scheduler
import (
"context"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/codeintel/stores/dbstore"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// For mocking in tests
var autoIndexingEnabled = conf.CodeIntelAutoIndexingEnabled
func (s *scheduler) handle(ctx context.Context) (err error) {
if !autoIndexingEnabled() {
return nil
}
var repositoryMatchLimit *int
if val := conf.CodeIntelAutoIndexingPolicyRepositoryMatchLimit(); val != -1 {
repositoryMatchLimit = &val
}
// Get the batch of repositories that we'll handle in this invocation of the periodic goroutine. This
// set should contain repositories that have yet to be updated, or that have been updated least recently.
// This allows us to update every repository reliably, even if it takes a long time to process through
// the backlog.
repositories, err := s.dbStore.SelectRepositoriesForIndexScan(
ctx,
"lsif_last_index_scan",
"last_index_scan_at",
ConfigInst.RepositoryProcessDelay,
conf.CodeIntelAutoIndexingAllowGlobalPolicies(),
repositoryMatchLimit,
ConfigInst.RepositoryBatchSize,
)
if err != nil {
return errors.Wrap(err, "dbstore.SelectRepositoriesForIndexScan")
}
if len(repositories) == 0 {
// All repositories updated recently enough
return nil
}
now := timeutil.Now()
for _, repositoryID := range repositories {
if repositoryErr := s.handleRepository(ctx, repositoryID, now); repositoryErr != nil {
if err == nil {
err = repositoryErr
} else {
err = errors.Append(err, repositoryErr)
}
}
}
return err
}
func (s *scheduler) handleError(err error) {
s.logger.Error("Failed to schedule index jobs", log.Error(err))
}
func (s *scheduler) handleRepository(
ctx context.Context,
repositoryID int,
now time.Time,
) error {
offset := 0
for {
// Retrieve the set of configuration policies that affect indexing for this repository.
policies, totalCount, err := s.dbStore.GetConfigurationPolicies(ctx, dbstore.GetConfigurationPoliciesOptions{
RepositoryID: repositoryID,
ForIndexing: true,
Limit: ConfigInst.PolicyBatchSize,
Offset: offset,
})
if err != nil {
return errors.Wrap(err, "dbstore.GetConfigurationPolicies")
}
offset += len(policies)
// Get the set of commits within this repository that match an indexing policy
commitMap, err := s.policyMatcher.CommitsDescribedByPolicy(ctx, repositoryID, policies, now)
if err != nil {
return errors.Wrap(err, "policies.CommitsDescribedByPolicy")
}
for commit, policyMatches := range commitMap {
if len(policyMatches) == 0 {
continue
}
// Attempt to queue an index if one does not exist for each of the matching commits
if _, err := s.autoindexingSvc.QueueIndexes(ctx, repositoryID, commit, "", false, false); err != nil {
if errors.HasType(err, &gitdomain.RevisionNotFoundError{}) {
continue
}
return errors.Wrap(err, "indexEnqueuer.QueueIndexes")
}
}
if len(policies) == 0 || offset >= totalCount {
return nil
}
}
}

View File

@ -11,7 +11,8 @@ import (
func NewScheduler(
autoindexingSvc *autoindexing.Service,
dbStore DBStore,
policySvc PolicyService,
uploadSvc UploadService,
policyMatcher PolicyMatcher,
observationContext *observation.Context,
) goroutine.BackgroundRoutine {
@ -30,7 +31,8 @@ func NewScheduler(
return goroutine.NewPeriodicGoroutineWithMetrics(context.Background(), ConfigInst.Interval, &scheduler{
autoindexingSvc: autoindexingSvc,
dbStore: dbStore,
policySvc: policySvc,
uploadSvc: uploadSvc,
policyMatcher: policyMatcher,
}, handleIndexScheduler)
}

View File

@ -12,327 +12,26 @@ import (
"time"
enterprise "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
dbstore "github.com/sourcegraph/sourcegraph/internal/codeintel/stores/dbstore"
shared "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared"
)
// MockDBStore is a mock implementation of the DBStore interface (from the
// package
// github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/background/scheduler)
// used for unit testing.
type MockDBStore struct {
// GetConfigurationPoliciesFunc is an instance of a mock function object
// controlling the behavior of the method GetConfigurationPolicies.
GetConfigurationPoliciesFunc *DBStoreGetConfigurationPoliciesFunc
// SelectRepositoriesForIndexScanFunc is an instance of a mock function
// object controlling the behavior of the method
// SelectRepositoriesForIndexScan.
SelectRepositoriesForIndexScanFunc *DBStoreSelectRepositoriesForIndexScanFunc
}
// NewMockDBStore creates a new mock of the DBStore interface. All methods
// return zero values for all results, unless overwritten.
func NewMockDBStore() *MockDBStore {
return &MockDBStore{
GetConfigurationPoliciesFunc: &DBStoreGetConfigurationPoliciesFunc{
defaultHook: func(context.Context, dbstore.GetConfigurationPoliciesOptions) (r0 []dbstore.ConfigurationPolicy, r1 int, r2 error) {
return
},
},
SelectRepositoriesForIndexScanFunc: &DBStoreSelectRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, string, string, time.Duration, bool, *int, int) (r0 []int, r1 error) {
return
},
},
}
}
// NewStrictMockDBStore creates a new mock of the DBStore interface. All
// methods panic on invocation, unless overwritten.
func NewStrictMockDBStore() *MockDBStore {
return &MockDBStore{
GetConfigurationPoliciesFunc: &DBStoreGetConfigurationPoliciesFunc{
defaultHook: func(context.Context, dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error) {
panic("unexpected invocation of MockDBStore.GetConfigurationPolicies")
},
},
SelectRepositoriesForIndexScanFunc: &DBStoreSelectRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error) {
panic("unexpected invocation of MockDBStore.SelectRepositoriesForIndexScan")
},
},
}
}
// NewMockDBStoreFrom creates a new mock of the MockDBStore interface. All
// methods delegate to the given implementation, unless overwritten.
func NewMockDBStoreFrom(i DBStore) *MockDBStore {
return &MockDBStore{
GetConfigurationPoliciesFunc: &DBStoreGetConfigurationPoliciesFunc{
defaultHook: i.GetConfigurationPolicies,
},
SelectRepositoriesForIndexScanFunc: &DBStoreSelectRepositoriesForIndexScanFunc{
defaultHook: i.SelectRepositoriesForIndexScan,
},
}
}
// DBStoreGetConfigurationPoliciesFunc describes the behavior when the
// GetConfigurationPolicies method of the parent MockDBStore instance is
// invoked.
type DBStoreGetConfigurationPoliciesFunc struct {
defaultHook func(context.Context, dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error)
hooks []func(context.Context, dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error)
history []DBStoreGetConfigurationPoliciesFuncCall
mutex sync.Mutex
}
// GetConfigurationPolicies delegates to the next hook function in the queue
// and stores the parameter and result values of this invocation.
func (m *MockDBStore) GetConfigurationPolicies(v0 context.Context, v1 dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error) {
r0, r1, r2 := m.GetConfigurationPoliciesFunc.nextHook()(v0, v1)
m.GetConfigurationPoliciesFunc.appendCall(DBStoreGetConfigurationPoliciesFuncCall{v0, v1, r0, r1, r2})
return r0, r1, r2
}
// SetDefaultHook sets function that is called when the
// GetConfigurationPolicies method of the parent MockDBStore instance is
// invoked and the hook queue is empty.
func (f *DBStoreGetConfigurationPoliciesFunc) SetDefaultHook(hook func(context.Context, dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// GetConfigurationPolicies method of the parent MockDBStore instance
// invokes the hook at the front of the queue and discards it. After the
// queue is empty, the default hook function is invoked for any future
// action.
func (f *DBStoreGetConfigurationPoliciesFunc) PushHook(hook func(context.Context, dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *DBStoreGetConfigurationPoliciesFunc) SetDefaultReturn(r0 []dbstore.ConfigurationPolicy, r1 int, r2 error) {
f.SetDefaultHook(func(context.Context, dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error) {
return r0, r1, r2
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *DBStoreGetConfigurationPoliciesFunc) PushReturn(r0 []dbstore.ConfigurationPolicy, r1 int, r2 error) {
f.PushHook(func(context.Context, dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error) {
return r0, r1, r2
})
}
func (f *DBStoreGetConfigurationPoliciesFunc) nextHook() func(context.Context, dbstore.GetConfigurationPoliciesOptions) ([]dbstore.ConfigurationPolicy, int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *DBStoreGetConfigurationPoliciesFunc) appendCall(r0 DBStoreGetConfigurationPoliciesFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of DBStoreGetConfigurationPoliciesFuncCall
// objects describing the invocations of this function.
func (f *DBStoreGetConfigurationPoliciesFunc) History() []DBStoreGetConfigurationPoliciesFuncCall {
f.mutex.Lock()
history := make([]DBStoreGetConfigurationPoliciesFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// DBStoreGetConfigurationPoliciesFuncCall is an object that describes an
// invocation of method GetConfigurationPolicies on an instance of
// MockDBStore.
type DBStoreGetConfigurationPoliciesFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 dbstore.GetConfigurationPoliciesOptions
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []dbstore.ConfigurationPolicy
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 int
// Result2 is the value of the 3rd result returned from this method
// invocation.
Result2 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c DBStoreGetConfigurationPoliciesFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c DBStoreGetConfigurationPoliciesFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1, c.Result2}
}
// DBStoreSelectRepositoriesForIndexScanFunc describes the behavior when the
// SelectRepositoriesForIndexScan method of the parent MockDBStore instance
// is invoked.
type DBStoreSelectRepositoriesForIndexScanFunc struct {
defaultHook func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error)
hooks []func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error)
history []DBStoreSelectRepositoriesForIndexScanFuncCall
mutex sync.Mutex
}
// SelectRepositoriesForIndexScan delegates to the next hook function in the
// queue and stores the parameter and result values of this invocation.
func (m *MockDBStore) SelectRepositoriesForIndexScan(v0 context.Context, v1 string, v2 string, v3 time.Duration, v4 bool, v5 *int, v6 int) ([]int, error) {
r0, r1 := m.SelectRepositoriesForIndexScanFunc.nextHook()(v0, v1, v2, v3, v4, v5, v6)
m.SelectRepositoriesForIndexScanFunc.appendCall(DBStoreSelectRepositoriesForIndexScanFuncCall{v0, v1, v2, v3, v4, v5, v6, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the
// SelectRepositoriesForIndexScan method of the parent MockDBStore instance
// is invoked and the hook queue is empty.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) SetDefaultHook(hook func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// SelectRepositoriesForIndexScan method of the parent MockDBStore instance
// invokes the hook at the front of the queue and discards it. After the
// queue is empty, the default hook function is invoked for any future
// action.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) PushHook(hook func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error) {
return r0, r1
})
}
func (f *DBStoreSelectRepositoriesForIndexScanFunc) nextHook() func(context.Context, string, string, time.Duration, bool, *int, int) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *DBStoreSelectRepositoriesForIndexScanFunc) appendCall(r0 DBStoreSelectRepositoriesForIndexScanFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of
// DBStoreSelectRepositoriesForIndexScanFuncCall objects describing the
// invocations of this function.
func (f *DBStoreSelectRepositoriesForIndexScanFunc) History() []DBStoreSelectRepositoriesForIndexScanFuncCall {
f.mutex.Lock()
history := make([]DBStoreSelectRepositoriesForIndexScanFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// DBStoreSelectRepositoriesForIndexScanFuncCall is an object that describes
// an invocation of method SelectRepositoriesForIndexScan on an instance of
// MockDBStore.
type DBStoreSelectRepositoriesForIndexScanFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 string
// Arg2 is the value of the 3rd argument passed to this method
// invocation.
Arg2 string
// Arg3 is the value of the 4th argument passed to this method
// invocation.
Arg3 time.Duration
// Arg4 is the value of the 5th argument passed to this method
// invocation.
Arg4 bool
// Arg5 is the value of the 6th argument passed to this method
// invocation.
Arg5 *int
// Arg6 is the value of the 7th argument passed to this method
// invocation.
Arg6 int
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c DBStoreSelectRepositoriesForIndexScanFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3, c.Arg4, c.Arg5, c.Arg6}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c DBStoreSelectRepositoriesForIndexScanFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// MockPolicyMatcher is a mock implementation of the PolicyMatcher interface
// (from the package
// github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/background/scheduler)
// used for unit testing.
type MockPolicyMatcher struct {
// CommitsDescribedByPolicyFunc is an instance of a mock function object
// controlling the behavior of the method CommitsDescribedByPolicy.
CommitsDescribedByPolicyFunc *PolicyMatcherCommitsDescribedByPolicyFunc
// CommitsDescribedByPolicyInternalFunc is an instance of a mock
// function object controlling the behavior of the method
// CommitsDescribedByPolicyInternal.
CommitsDescribedByPolicyInternalFunc *PolicyMatcherCommitsDescribedByPolicyInternalFunc
}
// NewMockPolicyMatcher creates a new mock of the PolicyMatcher interface.
// All methods return zero values for all results, unless overwritten.
func NewMockPolicyMatcher() *MockPolicyMatcher {
return &MockPolicyMatcher{
CommitsDescribedByPolicyFunc: &PolicyMatcherCommitsDescribedByPolicyFunc{
defaultHook: func(context.Context, int, []dbstore.ConfigurationPolicy, time.Time, ...string) (r0 map[string][]enterprise.PolicyMatch, r1 error) {
CommitsDescribedByPolicyInternalFunc: &PolicyMatcherCommitsDescribedByPolicyInternalFunc{
defaultHook: func(context.Context, int, []shared.ConfigurationPolicy, time.Time, ...string) (r0 map[string][]enterprise.PolicyMatch, r1 error) {
return
},
},
@ -343,9 +42,9 @@ func NewMockPolicyMatcher() *MockPolicyMatcher {
// interface. All methods panic on invocation, unless overwritten.
func NewStrictMockPolicyMatcher() *MockPolicyMatcher {
return &MockPolicyMatcher{
CommitsDescribedByPolicyFunc: &PolicyMatcherCommitsDescribedByPolicyFunc{
defaultHook: func(context.Context, int, []dbstore.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error) {
panic("unexpected invocation of MockPolicyMatcher.CommitsDescribedByPolicy")
CommitsDescribedByPolicyInternalFunc: &PolicyMatcherCommitsDescribedByPolicyInternalFunc{
defaultHook: func(context.Context, int, []shared.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error) {
panic("unexpected invocation of MockPolicyMatcher.CommitsDescribedByPolicyInternal")
},
},
}
@ -356,43 +55,43 @@ func NewStrictMockPolicyMatcher() *MockPolicyMatcher {
// overwritten.
func NewMockPolicyMatcherFrom(i PolicyMatcher) *MockPolicyMatcher {
return &MockPolicyMatcher{
CommitsDescribedByPolicyFunc: &PolicyMatcherCommitsDescribedByPolicyFunc{
defaultHook: i.CommitsDescribedByPolicy,
CommitsDescribedByPolicyInternalFunc: &PolicyMatcherCommitsDescribedByPolicyInternalFunc{
defaultHook: i.CommitsDescribedByPolicyInternal,
},
}
}
// PolicyMatcherCommitsDescribedByPolicyFunc describes the behavior when the
// CommitsDescribedByPolicy method of the parent MockPolicyMatcher instance
// is invoked.
type PolicyMatcherCommitsDescribedByPolicyFunc struct {
defaultHook func(context.Context, int, []dbstore.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error)
hooks []func(context.Context, int, []dbstore.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error)
history []PolicyMatcherCommitsDescribedByPolicyFuncCall
// PolicyMatcherCommitsDescribedByPolicyInternalFunc describes the behavior
// when the CommitsDescribedByPolicyInternal method of the parent
// MockPolicyMatcher instance is invoked.
type PolicyMatcherCommitsDescribedByPolicyInternalFunc struct {
defaultHook func(context.Context, int, []shared.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error)
hooks []func(context.Context, int, []shared.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error)
history []PolicyMatcherCommitsDescribedByPolicyInternalFuncCall
mutex sync.Mutex
}
// CommitsDescribedByPolicy delegates to the next hook function in the queue
// and stores the parameter and result values of this invocation.
func (m *MockPolicyMatcher) CommitsDescribedByPolicy(v0 context.Context, v1 int, v2 []dbstore.ConfigurationPolicy, v3 time.Time, v4 ...string) (map[string][]enterprise.PolicyMatch, error) {
r0, r1 := m.CommitsDescribedByPolicyFunc.nextHook()(v0, v1, v2, v3, v4...)
m.CommitsDescribedByPolicyFunc.appendCall(PolicyMatcherCommitsDescribedByPolicyFuncCall{v0, v1, v2, v3, v4, r0, r1})
// CommitsDescribedByPolicyInternal delegates to the next hook function in
// the queue and stores the parameter and result values of this invocation.
func (m *MockPolicyMatcher) CommitsDescribedByPolicyInternal(v0 context.Context, v1 int, v2 []shared.ConfigurationPolicy, v3 time.Time, v4 ...string) (map[string][]enterprise.PolicyMatch, error) {
r0, r1 := m.CommitsDescribedByPolicyInternalFunc.nextHook()(v0, v1, v2, v3, v4...)
m.CommitsDescribedByPolicyInternalFunc.appendCall(PolicyMatcherCommitsDescribedByPolicyInternalFuncCall{v0, v1, v2, v3, v4, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the
// CommitsDescribedByPolicy method of the parent MockPolicyMatcher instance
// is invoked and the hook queue is empty.
func (f *PolicyMatcherCommitsDescribedByPolicyFunc) SetDefaultHook(hook func(context.Context, int, []dbstore.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error)) {
// CommitsDescribedByPolicyInternal method of the parent MockPolicyMatcher
// instance is invoked and the hook queue is empty.
func (f *PolicyMatcherCommitsDescribedByPolicyInternalFunc) SetDefaultHook(hook func(context.Context, int, []shared.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// CommitsDescribedByPolicy method of the parent MockPolicyMatcher instance
// invokes the hook at the front of the queue and discards it. After the
// queue is empty, the default hook function is invoked for any future
// action.
func (f *PolicyMatcherCommitsDescribedByPolicyFunc) PushHook(hook func(context.Context, int, []dbstore.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error)) {
// CommitsDescribedByPolicyInternal method of the parent MockPolicyMatcher
// instance invokes the hook at the front of the queue and discards it.
// After the queue is empty, the default hook function is invoked for any
// future action.
func (f *PolicyMatcherCommitsDescribedByPolicyInternalFunc) PushHook(hook func(context.Context, int, []shared.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
@ -400,20 +99,20 @@ func (f *PolicyMatcherCommitsDescribedByPolicyFunc) PushHook(hook func(context.C
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *PolicyMatcherCommitsDescribedByPolicyFunc) SetDefaultReturn(r0 map[string][]enterprise.PolicyMatch, r1 error) {
f.SetDefaultHook(func(context.Context, int, []dbstore.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error) {
func (f *PolicyMatcherCommitsDescribedByPolicyInternalFunc) SetDefaultReturn(r0 map[string][]enterprise.PolicyMatch, r1 error) {
f.SetDefaultHook(func(context.Context, int, []shared.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *PolicyMatcherCommitsDescribedByPolicyFunc) PushReturn(r0 map[string][]enterprise.PolicyMatch, r1 error) {
f.PushHook(func(context.Context, int, []dbstore.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error) {
func (f *PolicyMatcherCommitsDescribedByPolicyInternalFunc) PushReturn(r0 map[string][]enterprise.PolicyMatch, r1 error) {
f.PushHook(func(context.Context, int, []shared.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error) {
return r0, r1
})
}
func (f *PolicyMatcherCommitsDescribedByPolicyFunc) nextHook() func(context.Context, int, []dbstore.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error) {
func (f *PolicyMatcherCommitsDescribedByPolicyInternalFunc) nextHook() func(context.Context, int, []shared.ConfigurationPolicy, time.Time, ...string) (map[string][]enterprise.PolicyMatch, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
@ -426,28 +125,28 @@ func (f *PolicyMatcherCommitsDescribedByPolicyFunc) nextHook() func(context.Cont
return hook
}
func (f *PolicyMatcherCommitsDescribedByPolicyFunc) appendCall(r0 PolicyMatcherCommitsDescribedByPolicyFuncCall) {
func (f *PolicyMatcherCommitsDescribedByPolicyInternalFunc) appendCall(r0 PolicyMatcherCommitsDescribedByPolicyInternalFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of
// PolicyMatcherCommitsDescribedByPolicyFuncCall objects describing the
// invocations of this function.
func (f *PolicyMatcherCommitsDescribedByPolicyFunc) History() []PolicyMatcherCommitsDescribedByPolicyFuncCall {
// PolicyMatcherCommitsDescribedByPolicyInternalFuncCall objects describing
// the invocations of this function.
func (f *PolicyMatcherCommitsDescribedByPolicyInternalFunc) History() []PolicyMatcherCommitsDescribedByPolicyInternalFuncCall {
f.mutex.Lock()
history := make([]PolicyMatcherCommitsDescribedByPolicyFuncCall, len(f.history))
history := make([]PolicyMatcherCommitsDescribedByPolicyInternalFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// PolicyMatcherCommitsDescribedByPolicyFuncCall is an object that describes
// an invocation of method CommitsDescribedByPolicy on an instance of
// MockPolicyMatcher.
type PolicyMatcherCommitsDescribedByPolicyFuncCall struct {
// PolicyMatcherCommitsDescribedByPolicyInternalFuncCall is an object that
// describes an invocation of method CommitsDescribedByPolicyInternal on an
// instance of MockPolicyMatcher.
type PolicyMatcherCommitsDescribedByPolicyInternalFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
@ -456,7 +155,7 @@ type PolicyMatcherCommitsDescribedByPolicyFuncCall struct {
Arg1 int
// Arg2 is the value of the 3rd argument passed to this method
// invocation.
Arg2 []dbstore.ConfigurationPolicy
Arg2 []shared.ConfigurationPolicy
// Arg3 is the value of the 4th argument passed to this method
// invocation.
Arg3 time.Time
@ -475,7 +174,7 @@ type PolicyMatcherCommitsDescribedByPolicyFuncCall struct {
// invocation. The variadic slice argument is flattened in this array such
// that one positional argument and three variadic arguments would result in
// a slice of four, not two.
func (c PolicyMatcherCommitsDescribedByPolicyFuncCall) Args() []interface{} {
func (c PolicyMatcherCommitsDescribedByPolicyInternalFuncCall) Args() []interface{} {
trailing := []interface{}{}
for _, val := range c.Arg4 {
trailing = append(trailing, val)
@ -486,6 +185,6 @@ func (c PolicyMatcherCommitsDescribedByPolicyFuncCall) Args() []interface{} {
// Results returns an interface slice containing the results of this
// invocation.
func (c PolicyMatcherCommitsDescribedByPolicyFuncCall) Results() []interface{} {
func (c PolicyMatcherCommitsDescribedByPolicyInternalFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}

View File

@ -2,26 +2,125 @@ package scheduler
import (
"context"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing"
"github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type scheduler struct {
autoindexingSvc *autoindexing.Service
dbStore DBStore
policySvc PolicyService
uploadSvc UploadService
policyMatcher PolicyMatcher
logger log.Logger
}
var _ goroutine.Handler = &scheduler{}
var _ goroutine.ErrorHandler = &scheduler{}
var (
_ goroutine.Handler = &scheduler{}
_ goroutine.ErrorHandler = &scheduler{}
)
func (r *scheduler) Handle(ctx context.Context) error {
return r.handle(ctx)
// For mocking in tests
var autoIndexingEnabled = conf.CodeIntelAutoIndexingEnabled
func (s *scheduler) Handle(ctx context.Context) error {
if !autoIndexingEnabled() {
return nil
}
var repositoryMatchLimit *int
if val := conf.CodeIntelAutoIndexingPolicyRepositoryMatchLimit(); val != -1 {
repositoryMatchLimit = &val
}
// Get the batch of repositories that we'll handle in this invocation of the periodic goroutine. This
// set should contain repositories that have yet to be updated, or that have been updated least recently.
// This allows us to update every repository reliably, even if it takes a long time to process through
// the backlog.
repositories, err := s.uploadSvc.GetRepositoriesForIndexScan(
ctx,
"lsif_last_index_scan",
"last_index_scan_at",
ConfigInst.RepositoryProcessDelay,
conf.CodeIntelAutoIndexingAllowGlobalPolicies(),
repositoryMatchLimit,
ConfigInst.RepositoryBatchSize,
time.Now(),
)
if err != nil {
return errors.Wrap(err, "uploadSvc.GetRepositoriesForIndexScan")
}
if len(repositories) == 0 {
// All repositories updated recently enough
return nil
}
now := timeutil.Now()
for _, repositoryID := range repositories {
if repositoryErr := s.handleRepository(ctx, repositoryID, now); repositoryErr != nil {
if err == nil {
err = repositoryErr
} else {
err = errors.Append(err, repositoryErr)
}
}
}
return err
}
func (r *scheduler) HandleError(err error) {
func (s *scheduler) handleRepository(ctx context.Context, repositoryID int, now time.Time) error {
offset := 0
for {
// Retrieve the set of configuration policies that affect indexing for this repository.
policies, totalCount, err := s.policySvc.GetConfigurationPolicies(ctx, shared.GetConfigurationPoliciesOptions{
RepositoryID: repositoryID,
ForIndexing: true,
Limit: ConfigInst.PolicyBatchSize,
Offset: offset,
})
if err != nil {
return errors.Wrap(err, "policySvc.GetConfigurationPolicies")
}
offset += len(policies)
// Get the set of commits within this repository that match an indexing policy
commitMap, err := s.policyMatcher.CommitsDescribedByPolicyInternal(ctx, repositoryID, policies, now)
if err != nil {
return errors.Wrap(err, "policies.CommitsDescribedByPolicy")
}
for commit, policyMatches := range commitMap {
if len(policyMatches) == 0 {
continue
}
// Attempt to queue an index if one does not exist for each of the matching commits
if _, err := s.autoindexingSvc.QueueIndexes(ctx, repositoryID, commit, "", false, false); err != nil {
if errors.HasType(err, &gitdomain.RevisionNotFoundError{}) {
continue
}
return errors.Wrap(err, "indexEnqueuer.QueueIndexes")
}
}
if len(policies) == 0 || offset >= totalCount {
return nil
}
}
}
func (s *scheduler) HandleError(err error) {
s.logger.Error("Failed to schedule index jobs", log.Error(err))
}

View File

@ -1,6 +1,8 @@
package store
import (
"database/sql"
"sort"
"strconv"
"strings"
@ -110,3 +112,76 @@ func scanIndexWithCount(s dbutil.Scanner) (index shared.Index, count int, err er
return index, count, nil
}
// scanCounts scans pairs of id/counts from the return value of `*Store.query`.
func scanCounts(rows *sql.Rows, queryErr error) (_ map[int]int, err error) {
if queryErr != nil {
return nil, queryErr
}
defer func() { err = basestore.CloseRows(rows, err) }()
visibilities := map[int]int{}
for rows.Next() {
var id int
var count int
if err := rows.Scan(&id, &count); err != nil {
return nil, err
}
visibilities[id] = count
}
return visibilities, nil
}
// scanSourcedCommits scans triples of repository ids/repository names/commits from the
// return value of `*Store.query`. The output of this function is ordered by repository
// identifier, then by commit.
func scanSourcedCommits(rows *sql.Rows, queryErr error) (_ []shared.SourcedCommits, err error) {
if queryErr != nil {
return nil, queryErr
}
defer func() { err = basestore.CloseRows(rows, err) }()
sourcedCommitsMap := map[int]shared.SourcedCommits{}
for rows.Next() {
var repositoryID int
var repositoryName string
var commit string
if err := rows.Scan(&repositoryID, &repositoryName, &commit); err != nil {
return nil, err
}
sourcedCommitsMap[repositoryID] = shared.SourcedCommits{
RepositoryID: repositoryID,
RepositoryName: repositoryName,
Commits: append(sourcedCommitsMap[repositoryID].Commits, commit),
}
}
flattened := make([]shared.SourcedCommits, 0, len(sourcedCommitsMap))
for _, sourcedCommits := range sourcedCommitsMap {
sort.Strings(sourcedCommits.Commits)
flattened = append(flattened, sourcedCommits)
}
sort.Slice(flattened, func(i, j int) bool {
return flattened[i].RepositoryID < flattened[j].RepositoryID
})
return flattened, nil
}
func scanCount(rows *sql.Rows, queryErr error) (value int, err error) {
if queryErr != nil {
return 0, queryErr
}
defer func() { err = basestore.CloseRows(rows, err) }()
for rows.Next() {
if err := rows.Scan(&value); err != nil {
return 0, err
}
}
return value, nil
}

View File

@ -2,15 +2,12 @@ package store
import (
"context"
"database/sql"
"sort"
"time"
"github.com/keegancsmith/sqlf"
"github.com/opentracing/opentracing-go/log"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/shared"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/observation"
)
@ -172,76 +169,3 @@ delete_indexes AS (
SELECT
(SELECT COUNT(*) FROM delete_indexes) AS num_indexes_deleted
`
// scanCounts scans pairs of id/counts from the return value of `*Store.query`.
func scanCounts(rows *sql.Rows, queryErr error) (_ map[int]int, err error) {
if queryErr != nil {
return nil, queryErr
}
defer func() { err = basestore.CloseRows(rows, err) }()
visibilities := map[int]int{}
for rows.Next() {
var id int
var count int
if err := rows.Scan(&id, &count); err != nil {
return nil, err
}
visibilities[id] = count
}
return visibilities, nil
}
// scanSourcedCommits scans triples of repository ids/repository names/commits from the
// return value of `*Store.query`. The output of this function is ordered by repository
// identifier, then by commit.
func scanSourcedCommits(rows *sql.Rows, queryErr error) (_ []shared.SourcedCommits, err error) {
if queryErr != nil {
return nil, queryErr
}
defer func() { err = basestore.CloseRows(rows, err) }()
sourcedCommitsMap := map[int]shared.SourcedCommits{}
for rows.Next() {
var repositoryID int
var repositoryName string
var commit string
if err := rows.Scan(&repositoryID, &repositoryName, &commit); err != nil {
return nil, err
}
sourcedCommitsMap[repositoryID] = shared.SourcedCommits{
RepositoryID: repositoryID,
RepositoryName: repositoryName,
Commits: append(sourcedCommitsMap[repositoryID].Commits, commit),
}
}
flattened := make([]shared.SourcedCommits, 0, len(sourcedCommitsMap))
for _, sourcedCommits := range sourcedCommitsMap {
sort.Strings(sourcedCommits.Commits)
flattened = append(flattened, sourcedCommits)
}
sort.Slice(flattened, func(i, j int) bool {
return flattened[i].RepositoryID < flattened[j].RepositoryID
})
return flattened, nil
}
func scanCount(rows *sql.Rows, queryErr error) (value int, err error) {
if queryErr != nil {
return 0, queryErr
}
defer func() { err = basestore.CloseRows(rows, err) }()
for rows.Next() {
if err := rows.Scan(&value); err != nil {
return 0, err
}
}
return value, nil
}

View File

@ -4,6 +4,8 @@ import (
"context"
"time"
"github.com/opentracing/opentracing-go/log"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/shared"
"github.com/sourcegraph/sourcegraph/internal/observation"
@ -42,35 +44,45 @@ func New(svc *autoindexing.Service, observationContext *observation.Context) Res
}
func (r *resolver) GetIndexByID(ctx context.Context, id int) (_ shared.Index, _ bool, err error) {
ctx, _, endObservation := r.operations.getIndexByID.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.getIndexByID.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("id", id)},
})
defer endObservation(1, observation.Args{})
return r.svc.GetIndexByID(ctx, id)
}
func (r *resolver) GetIndexesByIDs(ctx context.Context, ids ...int) (_ []shared.Index, err error) {
ctx, _, endObservation := r.operations.getIndexesByIDs.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.getIndexesByIDs.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("totalIds", len(ids))},
})
defer endObservation(1, observation.Args{})
return r.svc.GetIndexesByIDs(ctx, ids...)
}
func (r *resolver) GetRecentIndexesSummary(ctx context.Context, repositoryID int) (summaries []shared.IndexesWithRepositoryNamespace, err error) {
ctx, _, endObservation := r.operations.getRecentIndexesSummary.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.getRecentIndexesSummary.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID)},
})
defer endObservation(1, observation.Args{})
return r.svc.GetRecentIndexesSummary(ctx, repositoryID)
}
func (r *resolver) GetLastIndexScanForRepository(ctx context.Context, repositoryID int) (_ *time.Time, err error) {
ctx, _, endObservation := r.operations.getLastIndexScanForRepository.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.getLastIndexScanForRepository.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID)},
})
defer endObservation(1, observation.Args{})
return r.svc.GetLastIndexScanForRepository(ctx, repositoryID)
}
func (r *resolver) DeleteIndexByID(ctx context.Context, id int) (err error) {
ctx, _, endObservation := r.operations.deleteIndexByID.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.deleteIndexByID.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("id", id)},
})
defer endObservation(1, observation.Args{})
_, err = r.svc.DeleteIndexByID(ctx, id)
@ -78,14 +90,18 @@ func (r *resolver) DeleteIndexByID(ctx context.Context, id int) (err error) {
}
func (r *resolver) QueueAutoIndexJobsForRepo(ctx context.Context, repositoryID int, rev, configuration string) (_ []shared.Index, err error) {
ctx, _, endObservation := r.operations.queueAutoIndexJobsForRepo.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.queueAutoIndexJobsForRepo.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID), log.String("rev", rev), log.String("configuration", configuration)},
})
defer endObservation(1, observation.Args{})
return r.svc.QueueIndexes(ctx, repositoryID, rev, configuration, true, true)
}
func (r *resolver) GetIndexConfiguration(ctx context.Context, repositoryID int) (_ []byte, _ bool, err error) {
ctx, _, endObservation := r.operations.getIndexConfiguration.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.getIndexConfiguration.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID)},
})
defer endObservation(1, observation.Args{})
configuration, exists, err := r.svc.GetIndexConfigurationByRepositoryID(ctx, repositoryID)
@ -100,7 +116,9 @@ func (r *resolver) GetIndexConfiguration(ctx context.Context, repositoryID int)
}
func (r *resolver) InferedIndexConfiguration(ctx context.Context, repositoryID int, commit string) (_ *config.IndexConfiguration, _ bool, err error) {
ctx, _, endObservation := r.operations.inferedIndexConfiguration.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.inferedIndexConfiguration.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID), log.String("commit", commit)},
})
defer endObservation(1, observation.Args{})
maybeConfig, _, err := r.svc.InferIndexConfiguration(ctx, repositoryID, commit, true)
@ -112,7 +130,9 @@ func (r *resolver) InferedIndexConfiguration(ctx context.Context, repositoryID i
}
func (r *resolver) InferedIndexConfigurationHints(ctx context.Context, repositoryID int, commit string) (_ []config.IndexJobHint, err error) {
ctx, _, endObservation := r.operations.inferedIndexConfigurationHints.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.inferedIndexConfigurationHints.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID), log.String("commit", commit)},
})
defer endObservation(1, observation.Args{})
_, hints, err := r.svc.InferIndexConfiguration(ctx, repositoryID, commit, true)
@ -124,7 +144,9 @@ func (r *resolver) InferedIndexConfigurationHints(ctx context.Context, repositor
}
func (r *resolver) UpdateIndexConfigurationByRepositoryID(ctx context.Context, repositoryID int, configuration string) (err error) {
ctx, _, endObservation := r.operations.updateIndexConfigurationByRepositoryID.With(ctx, &err, observation.Args{})
ctx, _, endObservation := r.operations.updateIndexConfigurationByRepositoryID.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID), log.String("configuration", configuration)},
})
defer endObservation(1, observation.Args{})
if _, err := config.UnmarshalJSON([]byte(configuration)); err != nil {

View File

@ -68,7 +68,6 @@ type operations struct {
requeue *observation.Operation
requeueIndex *observation.Operation
selectPoliciesForRepositoryMembershipUpdate *observation.Operation
selectRepositoriesForIndexScan *observation.Operation
selectRepositoriesForRetentionScan *observation.Operation
selectRepositoriesForLockfileIndexScan *observation.Operation
updateCommitedAt *observation.Operation
@ -169,7 +168,6 @@ func newOperations(observationContext *observation.Context, metrics *metrics.RED
requeueIndex: op("RequeueIndex"),
selectPoliciesForRepositoryMembershipUpdate: op("selectPoliciesForRepositoryMembershipUpdate"),
selectRepositoriesForIndexScan: op("SelectRepositoriesForIndexScan"),
selectRepositoriesForRetentionScan: op("SelectRepositoriesForRetentionScan"),
selectRepositoriesForLockfileIndexScan: op("SelectRepositoriesForLockfileIndexScan"),
updateCommitedAt: op("UpdateCommitedAt"),

View File

@ -820,107 +820,6 @@ WITH locked_uploads AS (
DELETE FROM lsif_uploads WHERE id IN (SELECT id FROM locked_uploads)
`
// SelectRepositoriesForIndexScan returns a set of repository identifiers that should be considered
// for indexing jobs. Repositories that were returned previously from this call within the given
// process delay are not returned.
//
// If allowGlobalPolicies is false, then configuration policies that define neither a repository id
// nor a non-empty set of repository patterns wl be ignored. When true, such policies apply over all
// repositories known to the instance.
func (s *Store) SelectRepositoriesForIndexScan(ctx context.Context, table, column string, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int) (_ []int, err error) {
return s.selectRepositoriesForIndexScan(ctx, table, column, processDelay, allowGlobalPolicies, repositoryMatchLimit, limit, timeutil.Now())
}
func (s *Store) selectRepositoriesForIndexScan(ctx context.Context, table, column string, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) (_ []int, err error) {
ctx, _, endObservation := s.operations.selectRepositoriesForIndexScan.With(ctx, &err, observation.Args{LogFields: []log.Field{
log.Bool("allowGlobalPolicies", allowGlobalPolicies),
log.Int("limit", limit),
}})
defer endObservation(1, observation.Args{})
limitExpression := sqlf.Sprintf("")
if repositoryMatchLimit != nil {
limitExpression = sqlf.Sprintf("LIMIT %s", *repositoryMatchLimit)
}
replacer := strings.NewReplacer("{column_name}", column)
return basestore.ScanInts(s.Query(ctx, sqlf.Sprintf(
replacer.Replace(selectRepositoriesForIndexScanQuery),
allowGlobalPolicies,
limitExpression,
quote(table),
now,
int(processDelay/time.Second),
limit,
quote(table),
now,
now,
)))
}
func quote(s string) *sqlf.Query { return sqlf.Sprintf(s) }
const selectRepositoriesForIndexScanQuery = `
-- source: internal/codeintel/stores/dbstore/uploads.go:selectRepositoriesForIndexScan
WITH
repositories_matching_policy AS (
(
SELECT r.id FROM repo r WHERE EXISTS (
SELECT 1
FROM lsif_configuration_policies p
WHERE
p.indexing_enabled AND
p.repository_id IS NULL AND
p.repository_patterns IS NULL AND
%s -- completely enable or disable this query
)
ORDER BY stars DESC NULLS LAST, id
%s
)
UNION ALL
SELECT p.repository_id AS id
FROM lsif_configuration_policies p
WHERE
p.indexing_enabled AND
p.repository_id IS NOT NULL
UNION ALL
SELECT rpl.repo_id AS id
FROM lsif_configuration_policies p
JOIN lsif_configuration_policies_repository_pattern_lookup rpl ON rpl.policy_id = p.id
WHERE p.indexing_enabled
),
candidate_repositories AS (
SELECT r.id AS id
FROM repo r
WHERE
r.deleted_at IS NULL AND
r.blocked IS NULL AND
r.id IN (SELECT id FROM repositories_matching_policy)
),
repositories AS (
SELECT cr.id
FROM candidate_repositories cr
LEFT JOIN %s lrs ON lrs.repository_id = cr.id
-- Ignore records that have been checked recently. Note this condition is
-- true for a null {column_name} (which has never been checked).
WHERE (%s - lrs.{column_name} > (%s * '1 second'::interval)) IS DISTINCT FROM FALSE
ORDER BY
lrs.{column_name} NULLS FIRST,
cr.id -- tie breaker
LIMIT %s
)
INSERT INTO %s (repository_id, {column_name})
SELECT r.id, %s::timestamp FROM repositories r
ON CONFLICT (repository_id) DO UPDATE
SET {column_name} = %s
RETURNING repository_id
`
// SelectRepositoriesForRetentionScan returns a set of repository identifiers with live code intelligence
// data and a fresh associated commit graph. Repositories that were returned previously from this call
// within the given process delay are not returned.

View File

@ -904,242 +904,6 @@ func TestHardDeleteUploadByIDDuplicatePackageProvider(t *testing.T) {
})
}
func TestSelectRepositoriesForIndexScan(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))
store := testStoreWithoutConfigurationPolicies(t, db)
now := timeutil.Now()
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
query := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
indexing_enabled,
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
(101, 50, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, 0, false),
(102, 51, 'policy 2', 'GIT_TREE', 'cd/', null, true, 0, false, true, 0, false),
(103, 52, 'policy 3', 'GIT_TREE', 'ef/', null, true, 0, false, true, 0, false),
(104, 53, 'policy 4', 'GIT_TREE', 'gh/', null, true, 0, false, true, 0, false),
(105, 54, 'policy 5', 'GIT_TREE', 'gh/', null, true, 0, false, false, 0, false)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// Can return null last_index_scan
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 2, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 20 minutes later, first two repositories are still on cooldown
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 30 minutes later, all repositories are still on cooldown
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 90 minutes later, all repositories are visible
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// Make new invisible repository
insertRepo(t, db, 54, "r4")
// 95 minutes later, new repository is not yet visible
if repositoryIDs, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*95)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositoryIDs); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
query = `UPDATE lsif_configuration_policies SET indexing_enabled = true WHERE id = 105`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// 100 minutes later, only new repository is visible
if repositoryIDs, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*100)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{54}, repositoryIDs); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
}
func TestSelectRepositoriesForIndexScanWithGlobalPolicy(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))
store := testStoreWithoutConfigurationPolicies(t, db)
now := timeutil.Now()
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
query := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
indexing_enabled,
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
(101, NULL, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, 0, false)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// Returns nothing when disabled
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, false, nil, 100, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// Returns at most configured limit
limit := 2
// Can return null last_index_scan
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, &limit, 100, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 20 minutes later, first two repositories are still on cooldown
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 30 minutes later, all repositories are still on cooldown
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 90 minutes later, all repositories are visible
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
}
func TestSelectRepositoriesForIndexScanInDifferentTable(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))
store := testStoreWithoutConfigurationPolicies(t, db)
now := timeutil.Now()
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
query := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
indexing_enabled,
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
(101, NULL, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, 0, false)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// Create a new table
query = `
CREATE TABLE last_incredible_testing_scan (
repository_id integer NOT NULL PRIMARY KEY,
last_incredible_testing_scan_at timestamp with time zone NOT NULL
)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
tableName := "last_incredible_testing_scan"
columnName := "last_incredible_testing_scan_at"
// Returns at most configured limit
limit := 2
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), tableName, columnName, time.Hour, true, &limit, 100, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 20 minutes later, first two repositories are still on cooldown
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), tableName, columnName, time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 30 minutes later, all repositories are still on cooldown
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), tableName, columnName, time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 90 minutes later, all repositories are visible
if repositories, err := store.selectRepositoriesForIndexScan(context.Background(), tableName, columnName, time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
}
func TestUpdateUploadRetention(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))

View File

@ -21,6 +21,7 @@ type operations struct {
hasCommit *observation.Operation
// Repositories
getRepositoriesForIndexScan *observation.Operation
getRepositoriesMaxStaleAge *observation.Operation
setRepositoryAsDirty *observation.Operation
setRepositoryAsDirtyWithTx *observation.Operation
@ -93,6 +94,7 @@ func newOperations(observationContext *observation.Context) *operations {
hasCommit: op("HasCommit"),
// Repositories
getRepositoriesForIndexScan: op("GetRepositoriesForIndexScan"),
getRepositoriesMaxStaleAge: op("GetRepositoriesMaxStaleAge"),
getDirtyRepositories: op("GetDirtyRepositories"),
setRepositoryAsDirty: op("SetRepositoryAsDirty"),

View File

@ -30,6 +30,7 @@ type Store interface {
HasCommit(ctx context.Context, repositoryID int, commit string) (_ bool, err error)
// Repositories
GetRepositoriesForIndexScan(ctx context.Context, table, column string, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) (_ []int, err error)
GetRepositoriesMaxStaleAge(ctx context.Context) (_ time.Duration, err error)
SetRepositoryAsDirty(ctx context.Context, repositoryID int) (err error)
GetDirtyRepositories(ctx context.Context) (_ map[int]int, err error)

View File

@ -3,6 +3,7 @@ package store
import (
"context"
"errors"
"strings"
"time"
"github.com/keegancsmith/sqlf"
@ -14,6 +15,103 @@ import (
"github.com/sourcegraph/sourcegraph/internal/timeutil"
)
// GetRepositoriesForIndexScan returns a set of repository identifiers that should be considered
// for indexing jobs. Repositories that were returned previously from this call within the given
// process delay are not returned.
//
// If allowGlobalPolicies is false, then configuration policies that define neither a repository id
// nor a non-empty set of repository patterns wl be ignored. When true, such policies apply over all
// repositories known to the instance.
func (s *store) GetRepositoriesForIndexScan(ctx context.Context, table, column string, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) (_ []int, err error) {
ctx, _, endObservation := s.operations.getRepositoriesForIndexScan.With(ctx, &err, observation.Args{LogFields: []log.Field{
log.Bool("allowGlobalPolicies", allowGlobalPolicies),
log.Int("limit", limit),
}})
defer endObservation(1, observation.Args{})
limitExpression := sqlf.Sprintf("")
if repositoryMatchLimit != nil {
limitExpression = sqlf.Sprintf("LIMIT %s", *repositoryMatchLimit)
}
replacer := strings.NewReplacer("{column_name}", column)
return basestore.ScanInts(s.db.Query(ctx, sqlf.Sprintf(
replacer.Replace(getRepositoriesForIndexScanQuery),
allowGlobalPolicies,
limitExpression,
quote(table),
now,
int(processDelay/time.Second),
limit,
quote(table),
now,
now,
)))
}
func quote(s string) *sqlf.Query { return sqlf.Sprintf(s) }
const getRepositoriesForIndexScanQuery = `
-- source: internal/codeintel/uploads/internal/store/store_repositories.go:getRepositoriesForIndexScanQuery
WITH
repositories_matching_policy AS (
(
SELECT r.id FROM repo r WHERE EXISTS (
SELECT 1
FROM lsif_configuration_policies p
WHERE
p.indexing_enabled AND
p.repository_id IS NULL AND
p.repository_patterns IS NULL AND
%s -- completely enable or disable this query
)
ORDER BY stars DESC NULLS LAST, id
%s
)
UNION ALL
SELECT p.repository_id AS id
FROM lsif_configuration_policies p
WHERE
p.indexing_enabled AND
p.repository_id IS NOT NULL
UNION ALL
SELECT rpl.repo_id AS id
FROM lsif_configuration_policies p
JOIN lsif_configuration_policies_repository_pattern_lookup rpl ON rpl.policy_id = p.id
WHERE p.indexing_enabled
),
candidate_repositories AS (
SELECT r.id AS id
FROM repo r
WHERE
r.deleted_at IS NULL AND
r.blocked IS NULL AND
r.id IN (SELECT id FROM repositories_matching_policy)
),
repositories AS (
SELECT cr.id
FROM candidate_repositories cr
LEFT JOIN %s lrs ON lrs.repository_id = cr.id
-- Ignore records that have been checked recently. Note this condition is
-- true for a null {column_name} (which has never been checked).
WHERE (%s - lrs.{column_name} > (%s * '1 second'::interval)) IS DISTINCT FROM FALSE
ORDER BY
lrs.{column_name} NULLS FIRST,
cr.id -- tie breaker
LIMIT %s
)
INSERT INTO %s (repository_id, {column_name})
SELECT r.id, %s::timestamp FROM repositories r
ON CONFLICT (repository_id) DO UPDATE
SET {column_name} = %s
RETURNING repository_id
`
// SetRepositoriesForRetentionScan returns a set of repository identifiers with live code intelligence
// data and a fresh associated commit graph. Repositories that were returned previously from this call
// within the given process delay are not returned.
@ -215,6 +313,6 @@ func (s *store) HasRepository(ctx context.Context, repositoryID int) (_ bool, er
}
const hasRepositoryQuery = `
-- source: internal/codeintel/stores/dbstore/commits.go:HasRepository
-- source: internal/codeintel/uploads/internal/store/store_repositories.go:HasRepository
SELECT 1 FROM lsif_uploads WHERE state NOT IN ('deleted', 'deleting') AND repository_id = %s LIMIT 1
`

View File

@ -19,6 +19,242 @@ import (
"github.com/sourcegraph/sourcegraph/internal/timeutil"
)
func TestSelectRepositoriesForIndexScan(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))
store := testStoreWithoutConfigurationPolicies(t, db)
now := timeutil.Now()
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
query := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
indexing_enabled,
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
(101, 50, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, 0, false),
(102, 51, 'policy 2', 'GIT_TREE', 'cd/', null, true, 0, false, true, 0, false),
(103, 52, 'policy 3', 'GIT_TREE', 'ef/', null, true, 0, false, true, 0, false),
(104, 53, 'policy 4', 'GIT_TREE', 'gh/', null, true, 0, false, true, 0, false),
(105, 54, 'policy 5', 'GIT_TREE', 'gh/', null, true, 0, false, false, 0, false)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// Can return null last_index_scan
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 2, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 20 minutes later, first two repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 30 minutes later, all repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 90 minutes later, all repositories are visible
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// Make new invisible repository
insertRepo(t, db, 54, "r4")
// 95 minutes later, new repository is not yet visible
if repositoryIDs, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*95)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositoryIDs); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
query = `UPDATE lsif_configuration_policies SET indexing_enabled = true WHERE id = 105`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// 100 minutes later, only new repository is visible
if repositoryIDs, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*100)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{54}, repositoryIDs); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
}
func TestSelectRepositoriesForIndexScanWithGlobalPolicy(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))
store := testStoreWithoutConfigurationPolicies(t, db)
now := timeutil.Now()
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
query := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
indexing_enabled,
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
(101, NULL, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, 0, false)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// Returns nothing when disabled
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, false, nil, 100, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// Returns at most configured limit
limit := 2
// Can return null last_index_scan
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, &limit, 100, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 20 minutes later, first two repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 30 minutes later, all repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 90 minutes later, all repositories are visible
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), "lsif_last_index_scan", "last_index_scan_at", time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
}
func TestSelectRepositoriesForIndexScanInDifferentTable(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))
store := testStoreWithoutConfigurationPolicies(t, db)
now := timeutil.Now()
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
query := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
indexing_enabled,
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
(101, NULL, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, 0, false)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// Create a new table
query = `
CREATE TABLE last_incredible_testing_scan (
repository_id integer NOT NULL PRIMARY KEY,
last_incredible_testing_scan_at timestamp with time zone NOT NULL
)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
tableName := "last_incredible_testing_scan"
columnName := "last_incredible_testing_scan_at"
// Returns at most configured limit
limit := 2
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), tableName, columnName, time.Hour, true, &limit, 100, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 20 minutes later, first two repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), tableName, columnName, time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 30 minutes later, all repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), tableName, columnName, time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 90 minutes later, all repositories are visible
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), tableName, columnName, time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
}
func TestSetRepositoryAsDirty(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))
@ -230,3 +466,12 @@ func deleteRepo(t testing.TB, db database.DB, id int, deleted_at time.Time) {
t.Fatalf("unexpected error while deleting repository: %s", err)
}
}
func testStoreWithoutConfigurationPolicies(t *testing.T, db database.DB) Store {
if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
store := New(db, &observation.TestContext)
return store
}

View File

@ -72,6 +72,10 @@ type MockStore struct {
// GetOldestCommitDateFunc is an instance of a mock function object
// controlling the behavior of the method GetOldestCommitDate.
GetOldestCommitDateFunc *StoreGetOldestCommitDateFunc
// GetRepositoriesForIndexScanFunc is an instance of a mock function
// object controlling the behavior of the method
// GetRepositoriesForIndexScan.
GetRepositoriesForIndexScanFunc *StoreGetRepositoriesForIndexScanFunc
// GetRepositoriesMaxStaleAgeFunc is an instance of a mock function
// object controlling the behavior of the method
// GetRepositoriesMaxStaleAge.
@ -225,6 +229,11 @@ func NewMockStore() *MockStore {
return
},
},
GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, string, string, time.Duration, bool, *int, int, time.Time) (r0 []int, r1 error) {
return
},
},
GetRepositoriesMaxStaleAgeFunc: &StoreGetRepositoriesMaxStaleAgeFunc{
defaultHook: func(context.Context) (r0 time.Duration, r1 error) {
return
@ -417,6 +426,11 @@ func NewStrictMockStore() *MockStore {
panic("unexpected invocation of MockStore.GetOldestCommitDate")
},
},
GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, string, string, time.Duration, bool, *int, int, time.Time) ([]int, error) {
panic("unexpected invocation of MockStore.GetRepositoriesForIndexScan")
},
},
GetRepositoriesMaxStaleAgeFunc: &StoreGetRepositoriesMaxStaleAgeFunc{
defaultHook: func(context.Context) (time.Duration, error) {
panic("unexpected invocation of MockStore.GetRepositoriesMaxStaleAge")
@ -581,6 +595,9 @@ func NewMockStoreFrom(i store.Store) *MockStore {
GetOldestCommitDateFunc: &StoreGetOldestCommitDateFunc{
defaultHook: i.GetOldestCommitDate,
},
GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{
defaultHook: i.GetRepositoriesForIndexScan,
},
GetRepositoriesMaxStaleAgeFunc: &StoreGetRepositoriesMaxStaleAgeFunc{
defaultHook: i.GetRepositoriesMaxStaleAge,
},
@ -2228,6 +2245,135 @@ func (c StoreGetOldestCommitDateFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1, c.Result2}
}
// StoreGetRepositoriesForIndexScanFunc describes the behavior when the
// GetRepositoriesForIndexScan method of the parent MockStore instance is
// invoked.
type StoreGetRepositoriesForIndexScanFunc struct {
defaultHook func(context.Context, string, string, time.Duration, bool, *int, int, time.Time) ([]int, error)
hooks []func(context.Context, string, string, time.Duration, bool, *int, int, time.Time) ([]int, error)
history []StoreGetRepositoriesForIndexScanFuncCall
mutex sync.Mutex
}
// GetRepositoriesForIndexScan delegates to the next hook function in the
// queue and stores the parameter and result values of this invocation.
func (m *MockStore) GetRepositoriesForIndexScan(v0 context.Context, v1 string, v2 string, v3 time.Duration, v4 bool, v5 *int, v6 int, v7 time.Time) ([]int, error) {
r0, r1 := m.GetRepositoriesForIndexScanFunc.nextHook()(v0, v1, v2, v3, v4, v5, v6, v7)
m.GetRepositoriesForIndexScanFunc.appendCall(StoreGetRepositoriesForIndexScanFuncCall{v0, v1, v2, v3, v4, v5, v6, v7, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the
// GetRepositoriesForIndexScan method of the parent MockStore instance is
// invoked and the hook queue is empty.
func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultHook(hook func(context.Context, string, string, time.Duration, bool, *int, int, time.Time) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// GetRepositoriesForIndexScan method of the parent MockStore instance
// invokes the hook at the front of the queue and discards it. After the
// queue is empty, the default hook function is invoked for any future
// action.
func (f *StoreGetRepositoriesForIndexScanFunc) PushHook(hook func(context.Context, string, string, time.Duration, bool, *int, int, time.Time) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, string, string, time.Duration, bool, *int, int, time.Time) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *StoreGetRepositoriesForIndexScanFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, string, string, time.Duration, bool, *int, int, time.Time) ([]int, error) {
return r0, r1
})
}
func (f *StoreGetRepositoriesForIndexScanFunc) nextHook() func(context.Context, string, string, time.Duration, bool, *int, int, time.Time) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *StoreGetRepositoriesForIndexScanFunc) appendCall(r0 StoreGetRepositoriesForIndexScanFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of StoreGetRepositoriesForIndexScanFuncCall
// objects describing the invocations of this function.
func (f *StoreGetRepositoriesForIndexScanFunc) History() []StoreGetRepositoriesForIndexScanFuncCall {
f.mutex.Lock()
history := make([]StoreGetRepositoriesForIndexScanFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// StoreGetRepositoriesForIndexScanFuncCall is an object that describes an
// invocation of method GetRepositoriesForIndexScan on an instance of
// MockStore.
type StoreGetRepositoriesForIndexScanFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 string
// Arg2 is the value of the 3rd argument passed to this method
// invocation.
Arg2 string
// Arg3 is the value of the 4th argument passed to this method
// invocation.
Arg3 time.Duration
// Arg4 is the value of the 5th argument passed to this method
// invocation.
Arg4 bool
// Arg5 is the value of the 6th argument passed to this method
// invocation.
Arg5 *int
// Arg6 is the value of the 7th argument passed to this method
// invocation.
Arg6 int
// Arg7 is the value of the 8th argument passed to this method
// invocation.
Arg7 time.Time
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c StoreGetRepositoriesForIndexScanFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3, c.Arg4, c.Arg5, c.Arg6, c.Arg7}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c StoreGetRepositoriesForIndexScanFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// StoreGetRepositoriesMaxStaleAgeFunc describes the behavior when the
// GetRepositoriesMaxStaleAge method of the parent MockStore instance is
// invoked.

View File

@ -26,6 +26,7 @@ type operations struct {
// Repositories
getRepoName *observation.Operation
getRepositoriesForIndexScan *observation.Operation
getRepositoriesMaxStaleAge *observation.Operation
getDirtyRepositories *observation.Operation
setRepositoryAsDirty *observation.Operation
@ -96,6 +97,7 @@ func newOperations(observationContext *observation.Context) *operations {
// Repositories
getRepoName: op("GetRepoName"),
getRepositoriesForIndexScan: op("GetRepositoriesForIndexScan"),
getRepositoriesMaxStaleAge: op("GetRepositoriesMaxStaleAge"),
getDirtyRepositories: op("GetDirtyRepositories"),
setRepositoryAsDirty: op("SetRepositoryAsDirty"),

View File

@ -2,6 +2,7 @@ package uploads
import (
"context"
"fmt"
"io"
"sort"
"time"
@ -43,6 +44,7 @@ type service interface {
// Repositories
GetRepoName(ctx context.Context, repositoryID int) (_ string, err error)
GetRepositoriesForIndexScan(ctx context.Context, table, column string, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) (_ []int, err error)
GetRepositoriesMaxStaleAge(ctx context.Context) (_ time.Duration, err error)
GetDirtyRepositories(ctx context.Context) (_ map[int]int, err error)
SetRepositoryAsDirty(ctx context.Context, repositoryID int) (err error)
@ -154,42 +156,61 @@ func (s *Service) UploadsVisibleToCommit(ctx context.Context, commit string) (up
}
func (s *Service) GetStaleSourcedCommits(ctx context.Context, minimumTimeSinceLastCheck time.Duration, limit int, now time.Time) (_ []shared.SourcedCommits, err error) {
ctx, _, endObservation := s.operations.getStaleSourcedCommits.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.getStaleSourcedCommits.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.Int("minimumTimeSinceLastCheck in ms", int(minimumTimeSinceLastCheck.Milliseconds())),
log.Int("limit", limit),
log.String("now", now.String()),
},
})
defer endObservation(1, observation.Args{})
return s.store.GetStaleSourcedCommits(ctx, minimumTimeSinceLastCheck, limit, now)
}
func (s *Service) UpdateSourcedCommits(ctx context.Context, repositoryID int, commit string, now time.Time) (uploadsUpdated int, err error) {
ctx, _, endObservation := s.operations.updateSourcedCommits.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.updateSourcedCommits.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID), log.String("commit", commit), log.String("now", now.String())},
})
defer endObservation(1, observation.Args{})
return s.store.UpdateSourcedCommits(ctx, repositoryID, commit, now)
}
func (s *Service) DeleteSourcedCommits(ctx context.Context, repositoryID int, commit string, maximumCommitLag time.Duration, now time.Time) (uploadsUpdated int, uploadsDeleted int, err error) {
ctx, _, endObservation := s.operations.deleteSourcedCommits.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.deleteSourcedCommits.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID), log.String("commit", commit), log.Int("maximumCommitLag in ms", int(maximumCommitLag.Milliseconds())), log.String("now", now.String())},
})
defer endObservation(1, observation.Args{})
return s.store.DeleteSourcedCommits(ctx, repositoryID, commit, maximumCommitLag, now)
}
func (s *Service) GetOldestCommitDate(ctx context.Context, repositoryID int) (time.Time, bool, error) {
ctx, _, endObservation := s.operations.getOldestCommitDate.With(ctx, nil, observation.Args{})
ctx, _, endObservation := s.operations.getOldestCommitDate.With(ctx, nil, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID)},
})
defer endObservation(1, observation.Args{})
return s.store.GetOldestCommitDate(ctx, repositoryID)
}
func (s *Service) SetRepositoryAsDirty(ctx context.Context, repositoryID int) (err error) {
ctx, _, endObservation := s.operations.setRepositoryAsDirty.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.setRepositoryAsDirty.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID)},
})
defer endObservation(1, observation.Args{})
return s.store.SetRepositoryAsDirty(ctx, repositoryID)
}
func (s *Service) UpdateDirtyRepositories(ctx context.Context, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) (err error) {
ctx, _, endObservation := s.operations.updateDirtyRepositories.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.updateDirtyRepositories.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.Int("maxAgeForNonStaleBranches in ms", int(maxAgeForNonStaleBranches.Milliseconds())),
log.Int("maxAgeForNonStaleTags in ms", int(maxAgeForNonStaleTags.Milliseconds())),
},
})
defer endObservation(1, observation.Args{})
repositoryIDs, err := s.GetDirtyRepositories(ctx)
@ -303,19 +324,38 @@ func (s *Service) getCommitGraph(ctx context.Context, repositoryID int) (*gitdom
}
func (s *Service) SetRepositoriesForRetentionScan(ctx context.Context, processDelay time.Duration, limit int) (_ []int, err error) {
ctx, _, endObservation := s.operations.setRepositoriesForRetentionScan.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.setRepositoriesForRetentionScan.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("processDelay in ms", int(processDelay.Milliseconds())), log.Int("limit", limit)},
})
defer endObservation(1, observation.Args{})
return s.store.SetRepositoriesForRetentionScan(ctx, processDelay, limit)
}
func (s *Service) GetRepoName(ctx context.Context, repositoryID int) (_ string, err error) {
ctx, _, endObservation := s.operations.getRepoName.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.getRepoName.With(ctx, &err, observation.Args{LogFields: []log.Field{log.Int("repositoryID", repositoryID)}})
defer endObservation(1, observation.Args{})
return s.store.RepoName(ctx, repositoryID)
}
func (s *Service) GetRepositoriesForIndexScan(ctx context.Context, table, column string, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) (_ []int, err error) {
ctx, _, endObservation := s.operations.getRepositoriesForIndexScan.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.String("table", table),
log.String("column", column),
log.Int("processDelay in ms", int(processDelay.Milliseconds())),
log.Bool("allowGlobalPolicies", allowGlobalPolicies),
log.Int("repositoryMatchLimit", *repositoryMatchLimit),
log.Int("limit", limit),
log.String("now", now.String()),
},
})
defer endObservation(1, observation.Args{})
return s.store.GetRepositoriesForIndexScan(ctx, table, column, processDelay, allowGlobalPolicies, repositoryMatchLimit, limit, now)
}
func (s *Service) GetRepositoriesMaxStaleAge(ctx context.Context) (_ time.Duration, err error) {
ctx, _, endObservation := s.operations.getRepositoriesMaxStaleAge.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})
@ -331,42 +371,69 @@ func (s *Service) GetDirtyRepositories(ctx context.Context) (_ map[int]int, err
}
func (s *Service) GetUploads(ctx context.Context, opts shared.GetUploadsOptions) (uploads []Upload, totalCount int, err error) {
ctx, _, endObservation := s.operations.getUploads.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.getUploads.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", opts.RepositoryID), log.String("state", opts.State), log.String("term", opts.Term)},
})
defer endObservation(1, observation.Args{})
return s.store.GetUploads(ctx, opts)
}
func (s *Service) GetUploadIDsWithReferences(ctx context.Context, orderedMonikers []precise.QualifiedMonikerData, ignoreIDs []int, repositoryID int, commit string, limit int, offset int) (ids []int, recordsScanned int, totalCount int, err error) {
ctx, trace, endObservation := s.operations.getVisibleUploadsMatchingMonikers.With(ctx, &err, observation.Args{})
ctx, trace, endObservation := s.operations.getVisibleUploadsMatchingMonikers.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.Int("repositoryID", repositoryID),
log.String("commit", commit),
log.Int("limit", limit),
log.Int("offset", offset),
log.String("orderedMonikers", fmt.Sprintf("%v", orderedMonikers)),
log.String("ignoreIDs", fmt.Sprintf("%v", ignoreIDs)),
},
})
defer endObservation(1, observation.Args{})
return s.store.GetUploadIDsWithReferences(ctx, orderedMonikers, ignoreIDs, repositoryID, commit, limit, offset, trace)
}
func (s *Service) UpdateUploadsVisibleToCommits(ctx context.Context, repositoryID int, graph *gitdomain.CommitGraph, refDescriptions map[string][]gitdomain.RefDescription, maxAgeForNonStaleBranches, maxAgeForNonStaleTags time.Duration, dirtyToken int, now time.Time) (err error) {
ctx, _, endObservation := s.operations.updateUploadsVisibleToCommits.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.updateUploadsVisibleToCommits.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.Int("repositoryID", repositoryID),
log.String("graph", fmt.Sprintf("%v", graph)),
log.String("refDescriptions", fmt.Sprintf("%v", refDescriptions)),
log.String("maxAgeForNonStaleBranches", maxAgeForNonStaleBranches.String()),
log.String("maxAgeForNonStaleTags", maxAgeForNonStaleTags.String()),
log.Int("dirtyToken", dirtyToken),
log.String("now", now.String()),
},
})
defer endObservation(1, observation.Args{})
return s.store.UpdateUploadsVisibleToCommits(ctx, repositoryID, graph, refDescriptions, maxAgeForNonStaleBranches, maxAgeForNonStaleTags, dirtyToken, now)
}
func (s *Service) UpdateUploadRetention(ctx context.Context, protectedIDs, expiredIDs []int) (err error) {
ctx, _, endObservation := s.operations.updateUploadRetention.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.updateUploadRetention.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.String("protectedIDs", fmt.Sprintf("%v", protectedIDs)), log.String("expiredIDs", fmt.Sprintf("%v", expiredIDs))},
})
defer endObservation(1, observation.Args{})
return s.store.UpdateUploadRetention(ctx, protectedIDs, expiredIDs)
}
func (s *Service) BackfillReferenceCountBatch(ctx context.Context, batchSize int) (err error) {
ctx, _, endObservation := s.operations.backfillReferenceCountBatch.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.backfillReferenceCountBatch.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("batchSize", batchSize)},
})
defer endObservation(1, observation.Args{})
return s.store.BackfillReferenceCountBatch(ctx, batchSize)
}
func (s *Service) UpdateUploadsReferenceCounts(ctx context.Context, ids []int, dependencyUpdateType shared.DependencyReferenceCountUpdateType) (updated int, err error) {
ctx, _, endObservation := s.operations.updateUploadsReferenceCounts.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.updateUploadsReferenceCounts.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.String("ids", fmt.Sprintf("%v", ids))},
})
defer endObservation(1, observation.Args{})
return s.store.UpdateUploadsReferenceCounts(ctx, ids, dependencyUpdateType)
@ -380,14 +447,18 @@ func (s *Service) SoftDeleteExpiredUploads(ctx context.Context) (count int, err
}
func (s *Service) DeleteUploadsStuckUploading(ctx context.Context, uploadedBefore time.Time) (_ int, err error) {
ctx, _, endObservation := s.operations.deleteUploadsStuckUploading.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.deleteUploadsStuckUploading.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.String("uploadedBefore", uploadedBefore.String())},
})
defer endObservation(1, observation.Args{})
return s.store.DeleteUploadsStuckUploading(ctx, uploadedBefore)
}
func (s *Service) DeleteUploadsWithoutRepository(ctx context.Context, now time.Time) (_ map[int]int, err error) {
ctx, _, endObservation := s.operations.deleteUploadsWithoutRepository.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.deleteUploadsWithoutRepository.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.String("now", now.String())},
})
defer endObservation(1, observation.Args{})
return s.store.DeleteUploadsWithoutRepository(ctx, now)
@ -414,7 +485,9 @@ const numAncestors = 100
// this commit will.
//
func (s *Service) InferClosestUploads(ctx context.Context, repositoryID int, commit, path string, exactPath bool, indexer string) (_ []shared.Dump, err error) {
ctx, _, endObservation := s.operations.inferClosestUploads.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.inferClosestUploads.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID), log.String("commit", commit), log.String("path", path), log.Bool("exactPath", exactPath), log.String("indexer", indexer)},
})
defer endObservation(1, observation.Args{})
// The parameters exactPath and rootMustEnclosePath align here: if we're looking for dumps
@ -467,28 +540,42 @@ func (s *Service) InferClosestUploads(ctx context.Context, repositoryID int, com
}
func (s *Service) FindClosestDumps(ctx context.Context, repositoryID int, commit, path string, rootMustEnclosePath bool, indexer string) (_ []shared.Dump, err error) {
ctx, _, endObservation := s.operations.findClosestDumps.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.findClosestDumps.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.Int("repositoryID", repositoryID), log.String("commit", commit), log.String("path", path),
log.Bool("rootMustEnclosePath", rootMustEnclosePath), log.String("indexer", indexer),
},
})
defer endObservation(1, observation.Args{})
return s.store.FindClosestDumps(ctx, repositoryID, commit, path, rootMustEnclosePath, indexer)
}
func (s *Service) FindClosestDumpsFromGraphFragment(ctx context.Context, repositoryID int, commit, path string, rootMustEnclosePath bool, indexer string, commitGraph *gitdomain.CommitGraph) (_ []shared.Dump, err error) {
ctx, _, endObservation := s.operations.findClosestDumpsFromGraphFragment.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.findClosestDumpsFromGraphFragment.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.Int("repositoryID", repositoryID), log.String("commit", commit), log.String("path", path),
log.Bool("rootMustEnclosePath", rootMustEnclosePath), log.String("indexer", indexer),
},
})
defer endObservation(1, observation.Args{})
return s.store.FindClosestDumpsFromGraphFragment(ctx, repositoryID, commit, path, rootMustEnclosePath, indexer, commitGraph)
}
func (s *Service) GetDumpsWithDefinitionsForMonikers(ctx context.Context, monikers []precise.QualifiedMonikerData) (_ []shared.Dump, err error) {
ctx, _, endObservation := s.operations.getDumpsWithDefinitionsForMonikers.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.getDumpsWithDefinitionsForMonikers.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.String("monikers", fmt.Sprintf("%v", monikers))},
})
defer endObservation(1, observation.Args{})
return s.store.GetDumpsWithDefinitionsForMonikers(ctx, monikers)
}
func (s *Service) GetDumpsByIDs(ctx context.Context, ids []int) (_ []shared.Dump, err error) {
ctx, _, endObservation := s.operations.getDumpsByIDs.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.getDumpsByIDs.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("total_ids", len(ids)), log.String("ids", fmt.Sprintf("%v", ids))},
})
defer endObservation(1, observation.Args{})
return s.store.GetDumpsByIDs(ctx, ids)
@ -535,21 +622,27 @@ func (s *Service) HardDeleteExpiredUploads(ctx context.Context) (count int, err
}
func (s *Service) UpdatePackages(ctx context.Context, dumpID int, packages []precise.Package) (err error) {
ctx, _, endObservation := s.operations.updatePackages.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.updatePackages.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("dumpID", dumpID), log.String("packages", fmt.Sprintf("%v", packages))},
})
defer endObservation(1, observation.Args{})
return s.store.UpdatePackages(ctx, dumpID, packages)
}
func (s *Service) UpdatePackageReferences(ctx context.Context, dumpID int, references []precise.PackageReference) (err error) {
ctx, _, endObservation := s.operations.updatePackageReferences.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.updatePackageReferences.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("dumpID", dumpID), log.String("references", fmt.Sprintf("%v", references))},
})
defer endObservation(1, observation.Args{})
return s.store.UpdatePackageReferences(ctx, dumpID, references)
}
func (s *Service) DeleteOldAuditLogs(ctx context.Context, maxAge time.Duration, now time.Time) (count int, err error) {
ctx, _, endObservation := s.operations.deleteOldAuditLogs.With(ctx, &err, observation.Args{})
ctx, _, endObservation := s.operations.deleteOldAuditLogs.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.String("maxAge", maxAge.String()), log.String("now", now.String())},
})
defer endObservation(1, observation.Args{})
return s.store.DeleteOldAuditLogs(ctx, maxAge, now)
@ -576,7 +669,7 @@ func (s *Service) BackfillCommittedAtBatch(ctx context.Context, batchSize int) (
for _, sourcedCommits := range batch {
for _, commit := range sourcedCommits.Commits {
commitDateString, err := s.getCommitDate(ctx, tx, sourcedCommits.RepositoryID, commit)
commitDateString, err := s.getCommitDate(ctx, sourcedCommits.RepositoryID, commit)
if err != nil {
return err
}
@ -596,7 +689,7 @@ func (s *Service) BackfillCommittedAtBatch(ctx context.Context, batchSize int) (
return nil
}
func (s *Service) getCommitDate(ctx context.Context, tx store.Store, repositoryID int, commit string) (string, error) {
func (s *Service) getCommitDate(ctx context.Context, repositoryID int, commit string) (string, error) {
_, commitDate, revisionExists, err := s.gitserverClient.CommitDate(ctx, repositoryID, commit)
if err != nil && !gitdomain.IsRepoNotExist(err) {
return "", errors.Wrap(err, "gitserver.CommitDate")

View File

@ -109,7 +109,6 @@
- filename: internal/codeintel/autoindexing/background/scheduler/mocks_test.go
path: github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/background/scheduler
interfaces:
- DBStore
- PolicyMatcher
- filename: internal/codeintel/autoindexing/internal/inference/mocks_test.go
path: github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/inference