Extract reposcheduler and add support for syntactic indexes (#60958)

This commit is contained in:
Anton Sviridov 2024-04-02 11:37:30 +01:00 committed by GitHub
parent b54845b687
commit 612e26da48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1005 additions and 596 deletions

View File

@ -52,7 +52,8 @@ func (j *autoindexingScheduler) Routines(_ context.Context, observationCtx *obse
services.UploadsService,
services.PoliciesService,
matcher,
services.AutoIndexingService,
services.PreciseRepoSchedulingService,
*services.AutoIndexingService,
db.Repos(),
), nil
}

View File

@ -12,6 +12,7 @@ go_library(
"//internal/codeintel/dependencies",
"//internal/codeintel/policies",
"//internal/codeintel/ranking",
"//internal/codeintel/reposcheduler",
"//internal/codeintel/sentinel",
"//internal/codeintel/shared",
"//internal/codeintel/uploads",

View File

@ -24,6 +24,7 @@ go_library(
"//internal/codeintel/autoindexing/internal/store",
"//internal/codeintel/autoindexing/shared",
"//internal/codeintel/dependencies",
"//internal/codeintel/reposcheduler",
"//internal/codeintel/uploads/shared",
"//internal/database",
"//internal/gitserver",

View File

@ -7,6 +7,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/background/summary"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/inference"
autoindexingstore "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/store"
"github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/gitserver"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
@ -49,14 +50,15 @@ func NewIndexSchedulers(
uploadSvc UploadService,
policiesSvc PoliciesService,
policyMatcher PolicyMatcher,
autoindexingSvc *Service,
repoSchedulingSvc reposcheduler.RepositorySchedulingService,
autoindexingSvc Service,
repoStore database.RepoStore,
) []goroutine.BackgroundRoutine {
return background.NewIndexSchedulers(
scopedContext("scheduler", observationCtx),
policiesSvc,
policyMatcher,
autoindexingSvc,
repoSchedulingSvc,
autoindexingSvc.indexEnqueuer,
repoStore,
autoindexingSvc.store,

View File

@ -11,6 +11,7 @@ go_library(
"//internal/codeintel/autoindexing/internal/background/summary",
"//internal/codeintel/autoindexing/internal/jobselector",
"//internal/codeintel/autoindexing/internal/store",
"//internal/codeintel/reposcheduler",
"//internal/database",
"//internal/goroutine",
"//internal/observation",

View File

@ -1932,10 +1932,6 @@ type MockStore struct {
// GetQueuedRepoRevFunc is an instance of a mock function object
// controlling the behavior of the method GetQueuedRepoRev.
GetQueuedRepoRevFunc *StoreGetQueuedRepoRevFunc
// GetRepositoriesForIndexScanFunc is an instance of a mock function
// object controlling the behavior of the method
// GetRepositoriesForIndexScan.
GetRepositoriesForIndexScanFunc *StoreGetRepositoriesForIndexScanFunc
// InsertDependencyIndexingJobFunc is an instance of a mock function
// object controlling the behavior of the method
// InsertDependencyIndexingJob.
@ -2012,11 +2008,6 @@ func NewMockStore() *MockStore {
return
},
},
GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, time.Duration, bool, *int, int, time.Time) (r0 []int, r1 error) {
return
},
},
InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{
defaultHook: func(context.Context, int, string, time.Time) (r0 int, r1 error) {
return
@ -2119,11 +2110,6 @@ func NewStrictMockStore() *MockStore {
panic("unexpected invocation of MockStore.GetQueuedRepoRev")
},
},
GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) {
panic("unexpected invocation of MockStore.GetRepositoriesForIndexScan")
},
},
InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{
defaultHook: func(context.Context, int, string, time.Time) (int, error) {
panic("unexpected invocation of MockStore.InsertDependencyIndexingJob")
@ -2218,9 +2204,6 @@ func NewMockStoreFrom(i store.Store) *MockStore {
GetQueuedRepoRevFunc: &StoreGetQueuedRepoRevFunc{
defaultHook: i.GetQueuedRepoRev,
},
GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{
defaultHook: i.GetRepositoriesForIndexScan,
},
InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{
defaultHook: i.InsertDependencyIndexingJob,
},
@ -2709,129 +2692,6 @@ func (c StoreGetQueuedRepoRevFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// StoreGetRepositoriesForIndexScanFunc describes the behavior when the
// GetRepositoriesForIndexScan method of the parent MockStore instance is
// invoked.
type StoreGetRepositoriesForIndexScanFunc struct {
defaultHook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)
hooks []func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)
history []StoreGetRepositoriesForIndexScanFuncCall
mutex sync.Mutex
}
// GetRepositoriesForIndexScan delegates to the next hook function in the
// queue and stores the parameter and result values of this invocation.
func (m *MockStore) GetRepositoriesForIndexScan(v0 context.Context, v1 time.Duration, v2 bool, v3 *int, v4 int, v5 time.Time) ([]int, error) {
r0, r1 := m.GetRepositoriesForIndexScanFunc.nextHook()(v0, v1, v2, v3, v4, v5)
m.GetRepositoriesForIndexScanFunc.appendCall(StoreGetRepositoriesForIndexScanFuncCall{v0, v1, v2, v3, v4, v5, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the
// GetRepositoriesForIndexScan method of the parent MockStore instance is
// invoked and the hook queue is empty.
func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultHook(hook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// GetRepositoriesForIndexScan method of the parent MockStore instance
// invokes the hook at the front of the queue and discards it. After the
// queue is empty, the default hook function is invoked for any future
// action.
func (f *StoreGetRepositoriesForIndexScanFunc) PushHook(hook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *StoreGetRepositoriesForIndexScanFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) {
return r0, r1
})
}
func (f *StoreGetRepositoriesForIndexScanFunc) nextHook() func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *StoreGetRepositoriesForIndexScanFunc) appendCall(r0 StoreGetRepositoriesForIndexScanFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of StoreGetRepositoriesForIndexScanFuncCall
// objects describing the invocations of this function.
func (f *StoreGetRepositoriesForIndexScanFunc) History() []StoreGetRepositoriesForIndexScanFuncCall {
f.mutex.Lock()
history := make([]StoreGetRepositoriesForIndexScanFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// StoreGetRepositoriesForIndexScanFuncCall is an object that describes an
// invocation of method GetRepositoriesForIndexScan on an instance of
// MockStore.
type StoreGetRepositoriesForIndexScanFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 time.Duration
// Arg2 is the value of the 3rd argument passed to this method
// invocation.
Arg2 bool
// Arg3 is the value of the 4th argument passed to this method
// invocation.
Arg3 *int
// Arg4 is the value of the 5th argument passed to this method
// invocation.
Arg4 int
// Arg5 is the value of the 6th argument passed to this method
// invocation.
Arg5 time.Time
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c StoreGetRepositoriesForIndexScanFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3, c.Arg4, c.Arg5}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c StoreGetRepositoriesForIndexScanFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// StoreInsertDependencyIndexingJobFunc describes the behavior when the
// InsertDependencyIndexingJob method of the parent MockStore instance is
// invoked.

View File

@ -6,6 +6,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/background/summary"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/jobselector"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/store"
"github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/observation"
@ -23,7 +24,7 @@ func NewIndexSchedulers(
observationCtx *observation.Context,
policiesSvc scheduler.PoliciesService,
policyMatcher scheduler.PolicyMatcher,
autoindexingSvc scheduler.AutoIndexingService,
repoSchedulingSvc reposcheduler.RepositorySchedulingService,
indexEnqueuer scheduler.IndexEnqueuer,
repoStore database.RepoStore,
store store.Store,
@ -32,7 +33,7 @@ func NewIndexSchedulers(
return []goroutine.BackgroundRoutine{
scheduler.NewScheduler(
observationCtx,
autoindexingSvc,
repoSchedulingSvc,
policiesSvc,
policyMatcher,
indexEnqueuer,

View File

@ -17,6 +17,7 @@ go_library(
"//internal/codeintel/dependencies",
"//internal/codeintel/policies",
"//internal/codeintel/policies/shared",
"//internal/codeintel/reposcheduler",
"//internal/codeintel/uploads/shared",
"//internal/conf",
"//internal/database",

View File

@ -23,7 +23,3 @@ type IndexEnqueuer interface {
QueueIndexes(ctx context.Context, repositoryID int, rev, configuration string, force, bypassLimit bool) (_ []uploadsshared.Index, err error)
QueueIndexesForPackage(ctx context.Context, pkg dependencies.MinimialVersionedPackageRepo) (err error)
}
type AutoIndexingService interface {
GetRepositoriesForIndexScan(ctx context.Context, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) (_ []int, err error)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/inference"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/store"
policiesshared "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
@ -23,18 +24,18 @@ import (
)
type indexSchedulerJob struct {
autoindexingSvc AutoIndexingService
policiesSvc PoliciesService
policyMatcher PolicyMatcher
indexEnqueuer IndexEnqueuer
repoStore database.RepoStore
repoSchedulingSvc reposcheduler.RepositorySchedulingService
policiesSvc PoliciesService
policyMatcher PolicyMatcher
indexEnqueuer IndexEnqueuer
repoStore database.RepoStore
}
var m = new(metrics.SingletonREDMetrics)
func NewScheduler(
observationCtx *observation.Context,
autoindexingSvc AutoIndexingService,
repoSchedulingSvc reposcheduler.RepositorySchedulingService,
policiesSvc PoliciesService,
policyMatcher PolicyMatcher,
indexEnqueuer IndexEnqueuer,
@ -42,11 +43,11 @@ func NewScheduler(
config *Config,
) goroutine.BackgroundRoutine {
job := indexSchedulerJob{
autoindexingSvc: autoindexingSvc,
policiesSvc: policiesSvc,
policyMatcher: policyMatcher,
indexEnqueuer: indexEnqueuer,
repoStore: repoStore,
repoSchedulingSvc: repoSchedulingSvc,
policiesSvc: policiesSvc,
policyMatcher: policyMatcher,
indexEnqueuer: indexEnqueuer,
repoStore: repoStore,
}
redMetrics := m.Get(func() *metrics.REDMetrics {
@ -103,12 +104,13 @@ func (b indexSchedulerJob) handleScheduler(
// set should contain repositories that have yet to be updated, or that have been updated least recently.
// This allows us to update every repository reliably, even if it takes a long time to process through
// the backlog.
repositories, err := b.autoindexingSvc.GetRepositoriesForIndexScan(
repositories, err := b.repoSchedulingSvc.GetRepositoriesForIndexScan(
ctx,
repositoryProcessDelay,
conf.CodeIntelAutoIndexingAllowGlobalPolicies(),
repositoryMatchLimit,
repositoryBatchSize,
reposcheduler.NewBatchOptions(
repositoryProcessDelay,
conf.CodeIntelAutoIndexingAllowGlobalPolicies(),
repositoryMatchLimit,
repositoryBatchSize),
time.Now(),
)
if err != nil {
@ -128,13 +130,13 @@ func (b indexSchedulerJob) handleScheduler(
errMu sync.Mutex
)
for _, repositoryID := range repositories {
for _, repository := range repositories {
if err := sema.Acquire(ctx, 1); err != nil {
return err
}
go func() {
defer sema.Release(1)
if repositoryErr := b.handleRepository(ctx, repositoryID, policyBatchSize, now); repositoryErr != nil {
if repositoryErr := b.handleRepository(ctx, repository.ID, policyBatchSize, now); repositoryErr != nil {
if !errors.As(err, &inference.LimitError{}) {
errMu.Lock()
errs = errors.Append(errs, repositoryErr)

View File

@ -45,8 +45,8 @@ go_test(
"coverage_test.go",
"dependencies_test.go",
"enqueuer_test.go",
"scheduler_test.go",
"store_helpers_test.go",
"store_test.go",
],
embed = [":store"],
tags = [
@ -61,7 +61,6 @@ go_test(
"//internal/database/dbtest",
"//internal/executor",
"//internal/observation",
"//internal/timeutil",
"@com_github_google_go_cmp//cmp",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_lib_pq//:pq",

View File

@ -1,260 +0,0 @@
package store
import (
"context"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/log/logtest"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
)
func TestSelectRepositoriesForIndexScan(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(t))
store := testStoreWithoutConfigurationPolicies(t, db)
now := timeutil.Now()
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
updateGitserverUpdatedAt(t, db, now)
query := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
indexing_enabled,
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
(101, 50, 'policy 1', 'GIT_COMMIT', 'HEAD', null, true, 0, false, true, 0, false),
(102, 51, 'policy 2', 'GIT_COMMIT', 'HEAD', null, true, 0, false, true, 0, false),
(103, 52, 'policy 3', 'GIT_TREE', 'ef/', null, true, 0, false, true, 0, false),
(104, 53, 'policy 4', 'GIT_TREE', 'gh/', null, true, 0, false, true, 0, false),
(105, 54, 'policy 5', 'GIT_TREE', 'gh/', null, true, 0, false, false, 0, false)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// Can return null last_index_scan
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 2, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 20 minutes later, first two repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 30 minutes later, all repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 90 minutes later, all repositories are visible
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// Make new invisible repository
insertRepo(t, db, 54, "r4")
// 95 minutes later, new repository is not yet visible
if repositoryIDs, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*95)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositoryIDs); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
query = `UPDATE lsif_configuration_policies SET indexing_enabled = true WHERE id = 105`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// 100 minutes later, only new repository is visible
if repositoryIDs, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*100)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{54}, repositoryIDs); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 110 minutes later, nothing is ready to go (too close to last index scan)
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*110)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// Update repo 50 (GIT_COMMIT/HEAD policy), and 51 (GIT_TREE policy)
gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s WHERE repo_id IN (50, 52)`, now.Add(time.Minute*105))
if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil {
t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err)
}
// 110 minutes later, updated repositories are ready for re-indexing
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*110)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
}
func TestSelectRepositoriesForIndexScanWithGlobalPolicy(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(t))
store := testStoreWithoutConfigurationPolicies(t, db)
now := timeutil.Now()
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
updateGitserverUpdatedAt(t, db, now)
query := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
indexing_enabled,
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
(101, NULL, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, 0, false)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// Returns nothing when disabled
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, false, nil, 100, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// Returns at most configured limit
limit := 2
// Can return null last_index_scan
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, &limit, 100, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 20 minutes later, first two repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 30 minutes later, all repositories are still on cooldown
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int(nil), repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
// 90 minutes later, all repositories are visible
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
}
func TestMarkRepoRevsAsProcessed(t *testing.T) {
ctx := context.Background()
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(t))
store := New(&observation.TestContext, db)
expected := []RepoRev{
{1, 50, "HEAD"},
{2, 50, "HEAD~1"},
{3, 50, "HEAD~2"},
{4, 51, "HEAD"},
{5, 51, "HEAD~1"},
{6, 51, "HEAD~2"},
{7, 52, "HEAD"},
{8, 52, "HEAD~1"},
{9, 52, "HEAD~2"},
}
for _, repoRev := range expected {
if err := store.QueueRepoRev(ctx, repoRev.RepositoryID, repoRev.Rev); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
// entire set
repoRevs, err := store.GetQueuedRepoRev(ctx, 50)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if diff := cmp.Diff(expected, repoRevs); diff != "" {
t.Errorf("unexpected repo revs (-want +got):\n%s", diff)
}
// mark first elements as complete; re-request remaining
if err := store.MarkRepoRevsAsProcessed(ctx, []int{1, 2, 3, 4, 5}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
repoRevs, err = store.GetQueuedRepoRev(ctx, 50)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if diff := cmp.Diff(expected[5:], repoRevs); diff != "" {
t.Errorf("unexpected repo revs (-want +got):\n%s", diff)
}
}
//
//
// removes default configuration policies
func testStoreWithoutConfigurationPolicies(t *testing.T, db database.DB) Store {
if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
return New(&observation.TestContext, db)
}
func updateGitserverUpdatedAt(t *testing.T, db database.DB, now time.Time) {
gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s`, now.Add(-time.Hour*24))
if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil {
t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err)
}
}

View File

@ -34,7 +34,6 @@ type Store interface {
TruncateConfigurationSummary(ctx context.Context, numRecordsToRetain int) error
// Scheduler
GetRepositoriesForIndexScan(ctx context.Context, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) ([]int, error)
GetQueuedRepoRev(ctx context.Context, batchSize int) ([]RepoRev, error)
MarkRepoRevsAsProcessed(ctx context.Context, ids []int) error

View File

@ -0,0 +1,76 @@
package store
import (
"context"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/log/logtest"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
)
func TestMarkRepoRevsAsProcessed(t *testing.T) {
ctx := context.Background()
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(t))
store := New(&observation.TestContext, db)
expected := []RepoRev{
{1, 50, "HEAD"},
{2, 50, "HEAD~1"},
{3, 50, "HEAD~2"},
{4, 51, "HEAD"},
{5, 51, "HEAD~1"},
{6, 51, "HEAD~2"},
{7, 52, "HEAD"},
{8, 52, "HEAD~1"},
{9, 52, "HEAD~2"},
}
for _, repoRev := range expected {
if err := store.QueueRepoRev(ctx, repoRev.RepositoryID, repoRev.Rev); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
// entire set
repoRevs, err := store.GetQueuedRepoRev(ctx, 50)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if diff := cmp.Diff(expected, repoRevs); diff != "" {
t.Errorf("unexpected repo revs (-want +got):\n%s", diff)
}
// mark first elements as complete; re-request remaining
if err := store.MarkRepoRevsAsProcessed(ctx, []int{1, 2, 3, 4, 5}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
repoRevs, err = store.GetQueuedRepoRev(ctx, 50)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if diff := cmp.Diff(expected[5:], repoRevs); diff != "" {
t.Errorf("unexpected repo revs (-want +got):\n%s", diff)
}
}
// removes default configuration policies
func testStoreWithoutConfigurationPolicies(t *testing.T, db database.DB) Store {
if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
return New(&observation.TestContext, db)
}
func updateGitserverUpdatedAt(t *testing.T, db database.DB, now time.Time) {
gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s`, now.Add(-time.Hour*24))
if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil {
t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err)
}
}

View File

@ -37,10 +37,6 @@ type MockStore struct {
// GetQueuedRepoRevFunc is an instance of a mock function object
// controlling the behavior of the method GetQueuedRepoRev.
GetQueuedRepoRevFunc *StoreGetQueuedRepoRevFunc
// GetRepositoriesForIndexScanFunc is an instance of a mock function
// object controlling the behavior of the method
// GetRepositoriesForIndexScan.
GetRepositoriesForIndexScanFunc *StoreGetRepositoriesForIndexScanFunc
// InsertDependencyIndexingJobFunc is an instance of a mock function
// object controlling the behavior of the method
// InsertDependencyIndexingJob.
@ -117,11 +113,6 @@ func NewMockStore() *MockStore {
return
},
},
GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, time.Duration, bool, *int, int, time.Time) (r0 []int, r1 error) {
return
},
},
InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{
defaultHook: func(context.Context, int, string, time.Time) (r0 int, r1 error) {
return
@ -224,11 +215,6 @@ func NewStrictMockStore() *MockStore {
panic("unexpected invocation of MockStore.GetQueuedRepoRev")
},
},
GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{
defaultHook: func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) {
panic("unexpected invocation of MockStore.GetRepositoriesForIndexScan")
},
},
InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{
defaultHook: func(context.Context, int, string, time.Time) (int, error) {
panic("unexpected invocation of MockStore.InsertDependencyIndexingJob")
@ -323,9 +309,6 @@ func NewMockStoreFrom(i store.Store) *MockStore {
GetQueuedRepoRevFunc: &StoreGetQueuedRepoRevFunc{
defaultHook: i.GetQueuedRepoRev,
},
GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{
defaultHook: i.GetRepositoriesForIndexScan,
},
InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{
defaultHook: i.InsertDependencyIndexingJob,
},
@ -814,129 +797,6 @@ func (c StoreGetQueuedRepoRevFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// StoreGetRepositoriesForIndexScanFunc describes the behavior when the
// GetRepositoriesForIndexScan method of the parent MockStore instance is
// invoked.
type StoreGetRepositoriesForIndexScanFunc struct {
defaultHook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)
hooks []func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)
history []StoreGetRepositoriesForIndexScanFuncCall
mutex sync.Mutex
}
// GetRepositoriesForIndexScan delegates to the next hook function in the
// queue and stores the parameter and result values of this invocation.
func (m *MockStore) GetRepositoriesForIndexScan(v0 context.Context, v1 time.Duration, v2 bool, v3 *int, v4 int, v5 time.Time) ([]int, error) {
r0, r1 := m.GetRepositoriesForIndexScanFunc.nextHook()(v0, v1, v2, v3, v4, v5)
m.GetRepositoriesForIndexScanFunc.appendCall(StoreGetRepositoriesForIndexScanFuncCall{v0, v1, v2, v3, v4, v5, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the
// GetRepositoriesForIndexScan method of the parent MockStore instance is
// invoked and the hook queue is empty.
func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultHook(hook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// GetRepositoriesForIndexScan method of the parent MockStore instance
// invokes the hook at the front of the queue and discards it. After the
// queue is empty, the default hook function is invoked for any future
// action.
func (f *StoreGetRepositoriesForIndexScanFunc) PushHook(hook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *StoreGetRepositoriesForIndexScanFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) {
return r0, r1
})
}
func (f *StoreGetRepositoriesForIndexScanFunc) nextHook() func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *StoreGetRepositoriesForIndexScanFunc) appendCall(r0 StoreGetRepositoriesForIndexScanFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of StoreGetRepositoriesForIndexScanFuncCall
// objects describing the invocations of this function.
func (f *StoreGetRepositoriesForIndexScanFunc) History() []StoreGetRepositoriesForIndexScanFuncCall {
f.mutex.Lock()
history := make([]StoreGetRepositoriesForIndexScanFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// StoreGetRepositoriesForIndexScanFuncCall is an object that describes an
// invocation of method GetRepositoriesForIndexScan on an instance of
// MockStore.
type StoreGetRepositoriesForIndexScanFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 time.Duration
// Arg2 is the value of the 3rd argument passed to this method
// invocation.
Arg2 bool
// Arg3 is the value of the 4th argument passed to this method
// invocation.
Arg3 *int
// Arg4 is the value of the 5th argument passed to this method
// invocation.
Arg4 int
// Arg5 is the value of the 6th argument passed to this method
// invocation.
Arg5 time.Time
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c StoreGetRepositoriesForIndexScanFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3, c.Arg4, c.Arg5}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c StoreGetRepositoriesForIndexScanFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// StoreInsertDependencyIndexingJobFunc describes the behavior when the
// InsertDependencyIndexingJob method of the parent MockStore instance is
// invoked.

View File

@ -145,10 +145,6 @@ func IsLimitError(err error) bool {
return errors.As(err, &inference.LimitError{})
}
func (s *Service) GetRepositoriesForIndexScan(ctx context.Context, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) ([]int, error) {
return s.store.GetRepositoriesForIndexScan(ctx, processDelay, allowGlobalPolicies, repositoryMatchLimit, limit, now)
}
func (s *Service) RepositoryIDsWithConfiguration(ctx context.Context, offset, limit int) ([]uploadsshared.RepositoryWithAvailableIndexers, int, error) {
return s.store.RepositoryIDsWithConfiguration(ctx, offset, limit)
}

View File

@ -0,0 +1,40 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("//dev:go_defs.bzl", "go_test")
go_library(
name = "reposcheduler",
srcs = [
"init.go",
"options.go",
"store.go",
"store_observability.go",
],
importpath = "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler",
visibility = ["//:__subpackages__"],
deps = [
"//internal/database",
"//internal/database/basestore",
"//internal/database/dbutil",
"//internal/metrics",
"//internal/observation",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_sourcegraph_log//:log",
"@io_opentelemetry_go_otel//attribute",
],
)
go_test(
name = "reposcheduler_test",
srcs = ["store_test.go"],
embed = [":reposcheduler"],
tags = ["requires-network"],
deps = [
"//internal/database",
"//internal/database/dbtest",
"//internal/observation",
"//internal/timeutil",
"@com_github_google_go_cmp//cmp",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_sourcegraph_log//logtest",
],
)

View File

@ -0,0 +1,30 @@
package reposcheduler
import (
"context"
"time"
)
type RepositoryToIndex struct {
ID int
}
type RepositorySchedulingService interface {
GetRepositoriesForIndexScan(ctx context.Context, _ RepositoryBatchOptions, now time.Time) (_ []RepositoryToIndex, err error)
}
type service struct {
store RepositorySchedulingStore
}
var _ RepositorySchedulingService = &service{}
func NewService(store RepositorySchedulingStore) RepositorySchedulingService {
return &service{
store: store,
}
}
func (s *service) GetRepositoriesForIndexScan(ctx context.Context, batchOptions RepositoryBatchOptions, now time.Time) ([]RepositoryToIndex, error) {
return s.store.GetRepositoriesForIndexScan(ctx, batchOptions, now)
}

View File

@ -0,0 +1,33 @@
package reposcheduler
import "time"
type RepositoryBatchOptions struct {
// Time to wait before making repository visible again
// after previous scan. Note that this delay can be superseded if repository
// has been changed in the meantime
ProcessDelay time.Duration
// If allowGlobalPolicies is false, then configuration policies that do not specify a repository name
// or patterns will be ignored.
// When true, such policies apply over all repositories known to the instance.
AllowGlobalPolicies bool
// This optional limit controls how many repositores will be considered by matching
// via global policy. As global policy matches large sets of repositories,
// this limit allows reducing the number of repositories that will be considered
// for scanning.
GlobalPolicyRepositoriesMatchLimit *int
// The maximum number repositories that will be returned in a batch
Limit int
}
func NewBatchOptions(processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int) RepositoryBatchOptions {
return RepositoryBatchOptions{
ProcessDelay: processDelay,
AllowGlobalPolicies: allowGlobalPolicies,
GlobalPolicyRepositoriesMatchLimit: repositoryMatchLimit,
Limit: limit,
}
}

View File

@ -0,0 +1,289 @@
package reposcheduler
import (
"context"
"fmt"
"time"
"github.com/keegancsmith/sqlf"
logger "github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/observation"
"go.opentelemetry.io/otel/attribute"
)
type RepoRev struct {
ID int
RepositoryID int
Rev string
}
type RepositorySchedulingStore interface {
WithTransaction(ctx context.Context, f func(tx RepositorySchedulingStore) error) error
GetRepositoriesForIndexScan(ctx context.Context, batchOptions RepositoryBatchOptions, now time.Time) ([]RepositoryToIndex, error)
}
type store struct {
db *basestore.Store
logger logger.Logger
operations *operations
dbLayout dbLayout
}
var _ RepositorySchedulingStore = &store{}
type storeType int8
const (
preciseStore storeType = 1
syntacticStore storeType = 2
)
func NewPreciseStore(observationCtx *observation.Context, db database.DB) RepositorySchedulingStore {
return &store{
db: basestore.NewWithHandle(db.Handle()),
logger: observationCtx.Logger.Scoped("reposcheduler.syntactic_store"),
operations: newOperations(observationCtx, preciseStore),
dbLayout: dbLayout{
policyEnablementFieldName: "indexing_enabled",
lastScanTableName: "lsif_last_index_scan",
},
}
}
func NewSyntacticStore(observationCtx *observation.Context, db database.DB) RepositorySchedulingStore {
return &store{
db: basestore.NewWithHandle(db.Handle()),
logger: observationCtx.Logger.Scoped("reposcheduler.precise_store"),
operations: newOperations(observationCtx, syntacticStore),
dbLayout: dbLayout{
policyEnablementFieldName: "syntactic_indexing_enabled",
lastScanTableName: "syntactic_scip_last_index_scan",
},
}
}
func (s *store) WithTransaction(ctx context.Context, f func(s RepositorySchedulingStore) error) error {
return s.withTransaction(ctx, func(s *store) error { return f(s) })
}
func (s *store) withTransaction(ctx context.Context, f func(s *store) error) error {
return basestore.InTransaction(ctx, s, f)
}
func (s *store) Transact(ctx context.Context) (*store, error) {
tx, err := s.db.Transact(ctx)
if err != nil {
return nil, err
}
return &store{
logger: s.logger,
db: tx,
operations: s.operations,
}, nil
}
func (s *store) Done(err error) error {
return s.db.Done(err)
}
type dbLayout struct {
policyEnablementFieldName string
lastScanTableName string
}
// GetRepositoriesForIndexScan returns a set of repository identifiers that should be considered
// for indexing jobs. Repositories that were returned previously from this call within the given
// process delay are not returned.
func (store *store) GetRepositoriesForIndexScan(
ctx context.Context,
batchOptions RepositoryBatchOptions,
now time.Time,
) (_ []RepositoryToIndex, err error) {
var globalPolicyRepositoryMatchLimitValue int
if batchOptions.GlobalPolicyRepositoriesMatchLimit != nil {
globalPolicyRepositoryMatchLimitValue = *batchOptions.GlobalPolicyRepositoriesMatchLimit
}
ctx, _, endObservation := store.operations.getRepositoriesForIndexScan.With(ctx, &err,
observation.Args{Attrs: []attribute.KeyValue{
attribute.Bool("allowGlobalPolicies", batchOptions.AllowGlobalPolicies),
attribute.Int("globalPolicyRepositoryMatchLimit", globalPolicyRepositoryMatchLimitValue),
attribute.Int("limit", batchOptions.Limit),
}})
defer endObservation(1, observation.Args{})
queries := make([]*sqlf.Query, 0, 3)
if batchOptions.AllowGlobalPolicies {
queries = append(queries, sqlf.Sprintf(
getRepositoriesForIndexScanGlobalRepositoriesQuery,
optionalLimit(batchOptions.GlobalPolicyRepositoriesMatchLimit),
))
}
queries = append(queries, sqlf.Sprintf(getRepositoriesForIndexScanRepositoriesWithPolicyQuery(store.dbLayout)))
queries = append(queries, sqlf.Sprintf(getRepositoriesForIndexScanRepositoriesWithPolicyViaPatternQuery(store.dbLayout)))
for i, query := range queries {
queries[i] = sqlf.Sprintf("(%s)", query)
}
query := getRepositoriesForIndexScanQuery(store.dbLayout)
finalQuery := sqlf.Sprintf(
query,
sqlf.Join(queries, " UNION ALL "),
now,
int(batchOptions.ProcessDelay/time.Second),
batchOptions.Limit,
now,
now,
)
repositoryIds, err := basestore.ScanInts(store.db.Query(ctx, finalQuery))
if err != nil {
return nil, err
}
repos := make([]RepositoryToIndex, len(repositoryIds))
for i, repoId := range repositoryIds {
repos[i] = RepositoryToIndex{ID: repoId}
}
return repos, nil
}
func getRepositoriesForIndexScanQuery(layout dbLayout) string {
return fmt.Sprintf(`
WITH
-- This CTE will contain a single row if there is at least one global policy, and will return an empty
-- result set otherwise. If any global policy is for HEAD, the value for the column is_head_policy will
-- be true.
global_policy_descriptor AS MATERIALIZED (
SELECT (p.type = 'GIT_COMMIT' AND p.pattern = 'HEAD') AS is_head_policy
FROM lsif_configuration_policies p
WHERE
p.%s AND
p.repository_id IS NULL AND
p.repository_patterns IS NULL
ORDER BY is_head_policy DESC
LIMIT 1
),
repositories_matching_policy AS (
%%s
),
repositories AS (
SELECT rmp.id
FROM repositories_matching_policy rmp
LEFT JOIN %s lrs ON lrs.repository_id = rmp.id
WHERE
-- Records that have not been checked within the global reindex threshold are also eligible for
-- indexing. Note that condition here is true for a record that has never been indexed.
(%%s - lrs.last_index_scan_at > (%%s * '1 second'::interval)) IS DISTINCT FROM FALSE OR
-- Records that have received an update since their last scan are also eligible for re-indexing.
-- Note that last_changed is NULL unless the repository is attached to a policy for HEAD.
(rmp.last_changed > lrs.last_index_scan_at)
ORDER BY
lrs.last_index_scan_at NULLS FIRST,
rmp.id -- tie breaker
LIMIT %%s
)
INSERT INTO %s (repository_id, last_index_scan_at)
SELECT DISTINCT r.id, %%s::timestamp FROM repositories r
ON CONFLICT (repository_id) DO UPDATE
SET last_index_scan_at = %%s
RETURNING repository_id
`, layout.policyEnablementFieldName,
layout.lastScanTableName,
layout.lastScanTableName,
)
}
const getRepositoriesForIndexScanGlobalRepositoriesQuery = `
SELECT
r.id,
CASE
-- Return non-NULL last_changed only for policies that are attached to a HEAD commit.
-- We don't want to superfluously return the same repos because they had an update, but
-- we only (for example) index a branch that doesn't have many active commits.
WHEN gpd.is_head_policy THEN gr.last_changed
ELSE NULL
END AS last_changed
FROM repo r
JOIN gitserver_repos gr ON gr.repo_id = r.id
JOIN global_policy_descriptor gpd ON TRUE
WHERE
r.deleted_at IS NULL AND
r.blocked IS NULL AND
gr.clone_status = 'cloned'
ORDER BY stars DESC NULLS LAST, id
%s
`
func getRepositoriesForIndexScanRepositoriesWithPolicyQuery(layout dbLayout) string {
return fmt.Sprintf(`
SELECT
r.id,
CASE
-- Return non-NULL last_changed only for policies that are attached to a HEAD commit.
-- We don't want to superfluously return the same repos because they had an update, but
-- we only (for example) index a branch that doesn't have many active commits.
WHEN p.type = 'GIT_COMMIT' AND p.pattern = 'HEAD' THEN gr.last_changed
ELSE NULL
END AS last_changed
FROM repo r
JOIN gitserver_repos gr ON gr.repo_id = r.id
JOIN lsif_configuration_policies p ON p.repository_id = r.id
WHERE
r.deleted_at IS NULL AND
r.blocked IS NULL AND
p.%s AND
gr.clone_status = 'cloned'
`, layout.policyEnablementFieldName)
}
func getRepositoriesForIndexScanRepositoriesWithPolicyViaPatternQuery(layout dbLayout) string {
return fmt.Sprintf(`
SELECT
r.id,
CASE
-- Return non-NULL last_changed only for policies that are attached to a HEAD commit.
-- We don't want to superfluously return the same repos because they had an update, but
-- we only (for example) index a branch that doesn't have many active commits.
WHEN p.type = 'GIT_COMMIT' AND p.pattern = 'HEAD' THEN gr.last_changed
ELSE NULL
END AS last_changed
FROM repo r
JOIN gitserver_repos gr ON gr.repo_id = r.id
JOIN lsif_configuration_policies_repository_pattern_lookup rpl ON rpl.repo_id = r.id
JOIN lsif_configuration_policies p ON p.id = rpl.policy_id
WHERE
r.deleted_at IS NULL AND
r.blocked IS NULL AND
p.%s AND
gr.clone_status = 'cloned'
`, layout.policyEnablementFieldName)
}
func scanRepoRev(s dbutil.Scanner) (rr RepoRev, err error) {
err = s.Scan(&rr.ID, &rr.RepositoryID, &rr.Rev)
return rr, err
}
var scanRepoRevs = basestore.NewSliceScanner(scanRepoRev)
func optionalLimit(limit *int) *sqlf.Query {
if limit != nil {
return sqlf.Sprintf("LIMIT %d", *limit)
}
return sqlf.Sprintf("")
}

View File

@ -0,0 +1,58 @@
package reposcheduler
import (
"fmt"
"github.com/sourcegraph/sourcegraph/internal/metrics"
"github.com/sourcegraph/sourcegraph/internal/observation"
)
type operations struct {
getRepositoriesForIndexScan *observation.Operation
getQueuedRepoRev *observation.Operation
markRepoRevsAsProcessed *observation.Operation
queueRepoRev *observation.Operation
isQueued *observation.Operation
}
var (
m = new(metrics.SingletonREDMetrics)
)
func newOperations(observationCtx *observation.Context, storeType storeType) *operations {
var metricPrefix string
var operationNamespace string
if storeType == preciseStore {
metricPrefix = "codeintel_precise_reposcheduler_store"
operationNamespace = "precise_reposcheduler"
} else {
metricPrefix = "codeintel_syntactic_reposcheduler_store"
operationNamespace = "syntactic_reposcheduler"
}
m := m.Get(func() *metrics.REDMetrics {
return metrics.NewREDMetrics(
observationCtx.Registerer,
metricPrefix,
metrics.WithLabels("op"),
metrics.WithCountHelp("Total number of method invocations."),
)
})
op := func(name string) *observation.Operation {
return observationCtx.Operation(observation.Op{
Name: fmt.Sprintf("codeintel.%s.store.%s", operationNamespace, name),
MetricLabelValues: []string{name},
Metrics: m,
})
}
return &operations{
getRepositoriesForIndexScan: op("GetRepositoriesForIndexScan"),
getQueuedRepoRev: op("GetQueuedRepoRev"),
markRepoRevsAsProcessed: op("MarkRepoRevsAsProcessed"),
queueRepoRev: op("QueueRepoRev"),
isQueued: op("IsQueued"),
}
}

View File

@ -0,0 +1,303 @@
package reposcheduler
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/log/logtest"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
)
// Helper function to setup a configuration policies table
// for either precise or syntactic indexing exclusively
func databaseSetupQuery(enabledColumn string) string {
var disabledColumn string
if enabledColumn == "indexing_enabled" {
disabledColumn = "syntactic_indexing_enabled"
} else {
disabledColumn = "indexing_enabled"
}
base := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
%s, -- enabled column
%s, -- disabled column
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
-- enabled | | disabled
-- v v
(101, 50, 'policy 1', 'GIT_COMMIT', 'HEAD', null, true, 0, false, true, false, 0, false),
(102, 51, 'policy 2', 'GIT_COMMIT', 'HEAD', null, true, 0, false, true, false, 0, false),
(103, 52, 'policy 3', 'GIT_TREE', 'ef/', null, true, 0, false, true, false, 0, false),
(104, 53, 'policy 4', 'GIT_TREE', 'gh/', null, true, 0, false, true, false, 0, false),
(105, 54, 'policy 5', 'GIT_TREE', 'gh/', null, true, 0, false, false, false, 0, false)
`
return fmt.Sprintf(base, enabledColumn, disabledColumn)
}
func TestSelectRepositoriesForSyntacticIndexing(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(t))
preciseStore := testPreciseStoreWithoutConfigurationPolicies(t, db)
syntacticStore := testSyntacticStoreWithoutConfigurationPolicies(t, db)
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
var tests = []struct {
name string
store RepositorySchedulingStore
enabledColumn string
}{
{"precise store", preciseStore, "indexing_enabled"},
{"syntactic store", syntacticStore, "syntactic_indexing_enabled"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if _, err := db.ExecContext(context.Background(), "TRUNCATE lsif_configuration_policies"); err != nil {
t.Fatalf("unexpected error while truncating configuration policies: %s", err)
}
setupQuery := databaseSetupQuery(tt.enabledColumn)
if _, err := db.ExecContext(context.Background(), setupQuery); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
now := timeutil.Now()
updateGitserverUpdatedAt(t, db, now)
// The following tests simulate the passage of time (i.e. repeated scheduled invocations of the repo scheduling logic)
// T is time
// N.B. We use 1 hour process delay in all those tests,
// which means that IF the repository id was returned, it will be put on cooldown for an hour
// T = 0: No records in last index scan table, so we return all repos permitted by the limit parameter
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 2), now, []int{50, 51})
// T + 20 minutes: first two repositories are still on cooldown
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*20), []int{52, 53})
// T + 30 minutes: all repositories are on cooldown
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*30), []int(nil))
// T + 90 minutes: all repositories are visible again
// Repos 50, 51 are visible because they were scheduled at (T + 0) - which is more than 1 hour ago
// Repos 52, 53 are visible because they were scheduled at (T + 20) - which is more than 1 hour ago
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*90), []int{50, 51, 52, 53})
// Make a new repository, not yet covered by the configuration policies
insertRepo(t, db, 54, "r4")
// T + 95: no repositories are visible
// Repos 50,51,52,53 are invisible because they were scheduled at (T + 90), which is less than 1 hour ago
// Repo 54 is invisible because it doesn't have `indexing_enabled=true` in the configuration policies table
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*95), []int(nil))
// Explicitly enable indexing for repo 54
query := fmt.Sprintf(`UPDATE lsif_configuration_policies SET %s = true WHERE id = 105`, tt.enabledColumn)
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
// T + 100: only repository 54 is visible
// Repos 50-53 are still on cooldown as they were scheduled at (T + 90), less than 1 hour ago
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*100), []int{54})
// T + 110: no repositories are visible
// Repos 50,51,52,53 are invisible because they were scheduled at (T + 90), which is less than 1 hour ago
// Repo 54 is invisible because it was scheduled at (T + 100), which is less than 1 hour ago
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*110), []int(nil))
// Update repo 50 (GIT_COMMIT/HEAD policy), and 51 (GIT_TREE policy)
gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s WHERE repo_id IN (50, 52)`, now.Add(time.Minute*105))
if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil {
t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err)
}
// T + 110: only repository 50 is visible
// Repos 51-54 are invisible because they were scheduled less than 1 hour ago
// Repo 50 is visible despite it being scheduled less than 1 hour ago - it was recently updated, so that takes precedence
assertRepoList(t, tt.store, NewBatchOptions(time.Hour, true, nil, 100), now.Add(time.Minute*110), []int{50})
})
}
}
/*
This test verifies that repository scheduling works when there's only a single global policy
*/
func TestSelectRepositoriesForIndexScanWithGlobalPolicy(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(t))
preciseStore := testPreciseStoreWithoutConfigurationPolicies(t, db)
syntacticStore := testSyntacticStoreWithoutConfigurationPolicies(t, db)
now := timeutil.Now()
insertRepo(t, db, 50, "r0")
insertRepo(t, db, 51, "r1")
insertRepo(t, db, 52, "r2")
insertRepo(t, db, 53, "r3")
updateGitserverUpdatedAt(t, db, now)
query := `
INSERT INTO lsif_configuration_policies (
id,
repository_id,
name,
type,
pattern,
repository_patterns,
retention_enabled,
retention_duration_hours,
retain_intermediate_commits,
indexing_enabled,
syntactic_indexing_enabled,
index_commit_max_age_hours,
index_intermediate_commits
) VALUES
-- indexing | | syntactic indexing
-- v v
(101, NULL, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, true, 0, false)
`
if _, err := db.ExecContext(context.Background(), query); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
var tests = []struct {
name string
store RepositorySchedulingStore
}{
{"precise store", preciseStore},
{"syntactic store", syntacticStore},
}
// Returns at most configured limit
limit := 2
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// The following tests simulate the passage of time (i.e. repeated scheduled invocations of the repo scheduling logic)
// T is time
// N.B. We use 1 hour process delay in all those tests,
// which means that IF the repository id was returned, it will be put on cooldown for an hour
// Returns nothing when disabled
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, false, nil, 100), now, []int(nil))
// T = 0: No records in last index scan table, so we return all repos permitted by the limit parameter
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, &limit, 100), now, []int{50, 51})
// T + 20 minutes: first two repositories are still on cooldown
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*20), []int{52, 53})
// T + 30 minutes: all repositories are on cooldown
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*30), []int(nil))
// T + 90 minutes: all repositories are visible again
// Repos 50, 51 are visible because they were scheduled at (T + 0) - which is more than 1 hour ago
// Repos 52, 53 are visible because they were scheduled at (T + 20) - which is more than 1 hour ago
assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*90), []int{50, 51, 52, 53})
})
}
}
// removes default configuration policies
func testPreciseStoreWithoutConfigurationPolicies(t *testing.T, db database.DB) RepositorySchedulingStore {
if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
return NewPreciseStore(&observation.TestContext, db)
}
func testSyntacticStoreWithoutConfigurationPolicies(t *testing.T, db database.DB) RepositorySchedulingStore {
if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil {
t.Fatalf("unexpected error while inserting configuration policies: %s", err)
}
return NewSyntacticStore(&observation.TestContext, db)
}
func assertRepoList(t *testing.T, store RepositorySchedulingStore, batchOptions RepositoryBatchOptions, now time.Time, want []int) {
t.Helper()
wantedRepos := make([]RepositoryToIndex, len(want))
for i, repoId := range want {
wantedRepos[i] = RepositoryToIndex{ID: repoId}
}
if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), batchOptions, now); err != nil {
t.Fatalf("unexpected error fetching repositories for index scan: %s", err)
} else if diff := cmp.Diff(wantedRepos, repositories); diff != "" {
t.Fatalf("unexpected repository list (-want +got):\n%s", diff)
}
}
func updateGitserverUpdatedAt(t *testing.T, db database.DB, now time.Time) {
gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s`, now.Add(-time.Hour*24))
if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil {
t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err)
}
}
func insertRepo(t testing.TB, db database.DB, id int, name string) {
if name == "" {
name = fmt.Sprintf("n-%d", id)
}
deletedAt := sqlf.Sprintf("NULL")
if strings.HasPrefix(name, "DELETED-") {
deletedAt = sqlf.Sprintf("%s", time.Unix(1587396557, 0).UTC())
}
insertRepoQuery := sqlf.Sprintf(
`INSERT INTO repo (id, name, deleted_at, private) VALUES (%s, %s, %s, %s) ON CONFLICT (id) DO NOTHING`,
id,
name,
deletedAt,
false,
)
if _, err := db.ExecContext(context.Background(), insertRepoQuery.Query(sqlf.PostgresBindVar), insertRepoQuery.Args()...); err != nil {
t.Fatalf("unexpected error while upserting repository: %s", err)
}
status := "cloned"
if strings.HasPrefix(name, "DELETED-") {
status = "not_cloned"
}
updateGitserverRepoQuery := sqlf.Sprintf(
`UPDATE gitserver_repos SET clone_status = %s WHERE repo_id = %s`,
status,
id,
)
if _, err := db.ExecContext(context.Background(), updateGitserverRepoQuery.Query(sqlf.PostgresBindVar), updateGitserverRepoQuery.Args()...); err != nil {
t.Fatalf("unexpected error while upserting gitserver repository: %s", err)
}
}

View File

@ -8,6 +8,7 @@ import (
ossdependencies "github.com/sourcegraph/sourcegraph/internal/codeintel/dependencies"
"github.com/sourcegraph/sourcegraph/internal/codeintel/policies"
"github.com/sourcegraph/sourcegraph/internal/codeintel/ranking"
"github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler"
"github.com/sourcegraph/sourcegraph/internal/codeintel/sentinel"
codeintelshared "github.com/sourcegraph/sourcegraph/internal/codeintel/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads"
@ -17,15 +18,16 @@ import (
)
type Services struct {
AutoIndexingService *autoindexing.Service
CodenavService *codenav.Service
DependenciesService *ossdependencies.Service
PoliciesService *policies.Service
RankingService *ranking.Service
UploadsService *uploads.Service
SentinelService *sentinel.Service
ContextService *context.Service
GitserverClient gitserver.Client
AutoIndexingService *autoindexing.Service
PreciseRepoSchedulingService reposcheduler.RepositorySchedulingService
CodenavService *codenav.Service
DependenciesService *ossdependencies.Service
PoliciesService *policies.Service
RankingService *ranking.Service
UploadsService *uploads.Service
SentinelService *sentinel.Service
ContextService *context.Service
GitserverClient gitserver.Client
}
type ServiceDependencies struct {
@ -46,16 +48,18 @@ func NewServices(deps ServiceDependencies) (Services, error) {
rankingSvc := ranking.NewService(deps.ObservationCtx, db, codeIntelDB)
sentinelService := sentinel.NewService(deps.ObservationCtx, db)
contextService := context.NewService(deps.ObservationCtx, db)
reposchedulingService := reposcheduler.NewService(reposcheduler.NewPreciseStore(deps.ObservationCtx, db))
return Services{
AutoIndexingService: autoIndexingSvc,
CodenavService: codenavSvc,
DependenciesService: dependenciesSvc,
PoliciesService: policiesSvc,
RankingService: rankingSvc,
UploadsService: uploadsSvc,
SentinelService: sentinelService,
ContextService: contextService,
GitserverClient: gitserverClient,
AutoIndexingService: autoIndexingSvc,
PreciseRepoSchedulingService: reposchedulingService,
CodenavService: codenavSvc,
DependenciesService: dependenciesSvc,
PoliciesService: policiesSvc,
RankingService: rankingSvc,
UploadsService: uploadsSvc,
SentinelService: sentinelService,
ContextService: contextService,
GitserverClient: gitserverClient,
}, nil
}

View File

@ -11154,6 +11154,19 @@
"IndexName": "access_tokens_lookup_double_hash"
}
},
{
"ID": 1709738515,
"Name": "Add syntactic indexing support to policies",
"UpQuery": "alter table lsif_configuration_policies add column if not exists syntactic_indexing_enabled bool not null default false;\n\nCREATE TABLE IF NOT EXISTS syntactic_scip_last_index_scan(\n repository_id int NOT NULL,\n last_index_scan_at timestamp with time zone NOT NULL,\n PRIMARY KEY(repository_id)\n);\n\nCOMMENT ON TABLE syntactic_scip_last_index_scan IS 'Tracks the last time repository was checked for syntactic indexing job scheduling.';\nCOMMENT ON COLUMN syntactic_scip_last_index_scan.last_index_scan_at IS 'The last time uploads of this repository were considered for syntactic indexing job scheduling.';",
"DownQuery": "alter table lsif_configuration_policies drop column if exists syntactic_indexing_enabled;\ndrop table if exists syntactic_scip_last_index_scan;",
"Privileged": false,
"NonIdempotent": false,
"Parents": [
1708596613
],
"IsCreateIndexConcurrently": false,
"IndexMetadata": null
},
{
"ID": 1711003437,
"Name": "remove feature flag foreign key",
@ -11430,6 +11443,7 @@
"v5.3.0": {
"RootID": 1648051770,
"LeafIDs": [
1709738515,
1711003437,
1711538234
],

View File

@ -15552,6 +15552,19 @@
"GenerationExpression": "",
"Comment": "Whether or not this configuration policy affects data retention rules."
},
{
"Name": "syntactic_indexing_enabled",
"Index": 17,
"TypeName": "boolean",
"IsNullable": false,
"Default": "false",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "type",
"Index": 4,
@ -26586,6 +26599,52 @@
],
"Triggers": []
},
{
"Name": "syntactic_scip_last_index_scan",
"Comment": "Tracks the last time repository was checked for syntactic indexing job scheduling.",
"Columns": [
{
"Name": "last_index_scan_at",
"Index": 2,
"TypeName": "timestamp with time zone",
"IsNullable": false,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": "The last time uploads of this repository were considered for syntactic indexing job scheduling."
},
{
"Name": "repository_id",
"Index": 1,
"TypeName": "integer",
"IsNullable": false,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
}
],
"Indexes": [
{
"Name": "syntactic_scip_last_index_scan_pkey",
"IsPrimaryKey": true,
"IsUnique": true,
"IsExclusion": false,
"IsDeferrable": false,
"IndexDefinition": "CREATE UNIQUE INDEX syntactic_scip_last_index_scan_pkey ON syntactic_scip_last_index_scan USING btree (repository_id)",
"ConstraintType": "p",
"ConstraintDefinition": "PRIMARY KEY (repository_id)"
}
],
"Constraints": null,
"Triggers": []
},
{
"Name": "team_members",
"Comment": "",

View File

@ -2120,6 +2120,7 @@ Stores data points for a code insight that do not need to be queried directly, b
repository_patterns | text[] | | |
last_resolved_at | timestamp with time zone | | |
embeddings_enabled | boolean | | not null | false
syntactic_indexing_enabled | boolean | | not null | false
Indexes:
"lsif_configuration_policies_pkey" PRIMARY KEY, btree (id)
"lsif_configuration_policies_repository_id" btree (repository_id)
@ -4050,6 +4051,21 @@ Stores metadata about a code intel syntactic index job.
**execution_logs**: An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution.
# Table "public.syntactic_scip_last_index_scan"
```
Column | Type | Collation | Nullable | Default
--------------------+--------------------------+-----------+----------+---------
repository_id | integer | | not null |
last_index_scan_at | timestamp with time zone | | not null |
Indexes:
"syntactic_scip_last_index_scan_pkey" PRIMARY KEY, btree (repository_id)
```
Tracks the last time repository was checked for syntactic indexing job scheduling.
**last_index_scan_at**: The last time uploads of this repository were considered for syntactic indexing job scheduling.
# Table "public.team_members"
```
Column | Type | Collation | Nullable | Default

View File

@ -0,0 +1,2 @@
alter table lsif_configuration_policies drop column if exists syntactic_indexing_enabled;
drop table if exists syntactic_scip_last_index_scan;

View File

@ -0,0 +1,2 @@
name: Add syntactic indexing support to policies
parents: [1708596613]

View File

@ -0,0 +1,10 @@
alter table lsif_configuration_policies add column if not exists syntactic_indexing_enabled bool not null default false;
CREATE TABLE IF NOT EXISTS syntactic_scip_last_index_scan(
repository_id int NOT NULL,
last_index_scan_at timestamp with time zone NOT NULL,
PRIMARY KEY(repository_id)
);
COMMENT ON TABLE syntactic_scip_last_index_scan IS 'Tracks the last time repository was checked for syntactic indexing job scheduling.';
COMMENT ON COLUMN syntactic_scip_last_index_scan.last_index_scan_at IS 'The last time uploads of this repository were considered for syntactic indexing job scheduling.';

View File

@ -1722,7 +1722,8 @@ CREATE TABLE lsif_configuration_policies (
protected boolean DEFAULT false NOT NULL,
repository_patterns text[],
last_resolved_at timestamp with time zone,
embeddings_enabled boolean DEFAULT false NOT NULL
embeddings_enabled boolean DEFAULT false NOT NULL,
syntactic_indexing_enabled boolean DEFAULT false NOT NULL
);
COMMENT ON COLUMN lsif_configuration_policies.repository_id IS 'The identifier of the repository to which this configuration policy applies. If absent, this policy is applied globally.';
@ -4685,6 +4686,15 @@ CREATE VIEW syntactic_scip_indexing_jobs_with_repository_name AS
JOIN repo r ON ((r.id = u.repository_id)))
WHERE (r.deleted_at IS NULL);
CREATE TABLE syntactic_scip_last_index_scan (
repository_id integer NOT NULL,
last_index_scan_at timestamp with time zone NOT NULL
);
COMMENT ON TABLE syntactic_scip_last_index_scan IS 'Tracks the last time repository was checked for syntactic indexing job scheduling.';
COMMENT ON COLUMN syntactic_scip_last_index_scan.last_index_scan_at IS 'The last time uploads of this repository were considered for syntactic indexing job scheduling.';
CREATE TABLE team_members (
team_id integer NOT NULL,
user_id integer NOT NULL,
@ -5803,6 +5813,9 @@ ALTER TABLE ONLY survey_responses
ALTER TABLE ONLY syntactic_scip_indexing_jobs
ADD CONSTRAINT syntactic_scip_indexing_jobs_pkey PRIMARY KEY (id);
ALTER TABLE ONLY syntactic_scip_last_index_scan
ADD CONSTRAINT syntactic_scip_last_index_scan_pkey PRIMARY KEY (repository_id);
ALTER TABLE ONLY team_members
ADD CONSTRAINT team_members_team_id_user_id_key PRIMARY KEY (team_id, user_id);
@ -7043,9 +7056,9 @@ ALTER TABLE ONLY webhooks
ALTER TABLE ONLY zoekt_repos
ADD CONSTRAINT zoekt_repos_repo_id_fkey FOREIGN KEY (repo_id) REFERENCES repo(id) ON DELETE CASCADE;
INSERT INTO lsif_configuration_policies VALUES (1, NULL, 'Default tip-of-branch retention policy', 'GIT_TREE', '*', true, 2016, false, false, 0, false, true, NULL, NULL, false);
INSERT INTO lsif_configuration_policies VALUES (2, NULL, 'Default tag retention policy', 'GIT_TAG', '*', true, 8064, false, false, 0, false, true, NULL, NULL, false);
INSERT INTO lsif_configuration_policies VALUES (3, NULL, 'Default commit retention policy', 'GIT_TREE', '*', true, 168, true, false, 0, false, true, NULL, NULL, false);
INSERT INTO lsif_configuration_policies VALUES (1, NULL, 'Default tip-of-branch retention policy', 'GIT_TREE', '*', true, 2016, false, false, 0, false, true, NULL, NULL, false, false);
INSERT INTO lsif_configuration_policies VALUES (2, NULL, 'Default tag retention policy', 'GIT_TAG', '*', true, 8064, false, false, 0, false, true, NULL, NULL, false, false);
INSERT INTO lsif_configuration_policies VALUES (3, NULL, 'Default commit retention policy', 'GIT_TREE', '*', true, 168, true, false, 0, false, true, NULL, NULL, false, false);
SELECT pg_catalog.setval('lsif_configuration_policies_id_seq', 3, true);