From 0a73bcb36980a012cddcf169aa79ad75bbf926cd Mon Sep 17 00:00:00 2001 From: chwarwick Date: Fri, 15 Apr 2022 13:44:53 -0400 Subject: [PATCH] Insights: Optional batching of commit indexer (#33666) Setting to allow insights commit indexer to retrieve historic commits in batches based on a number of days --- .../insights/background/background.go | 3 +- .../internal/insights/compression/commits.go | 14 +- .../insights/compression/mock_commit_store.go | 60 +++-- .../internal/insights/compression/worker.go | 92 ++++--- .../insights/compression/worker_test.go | 249 ++++++++++++++++-- schema/schema.go | 2 + schema/site.schema.json | 7 + 7 files changed, 339 insertions(+), 88 deletions(-) diff --git a/enterprise/internal/insights/background/background.go b/enterprise/internal/insights/background/background.go index 785d621c12d..9bd14766080 100644 --- a/enterprise/internal/insights/background/background.go +++ b/enterprise/internal/insights/background/background.go @@ -5,6 +5,7 @@ import ( "database/sql" "os" "strconv" + "time" "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/pings" "github.com/sourcegraph/sourcegraph/internal/database" @@ -56,7 +57,7 @@ func GetBackgroundJobs(ctx context.Context, mainAppDB *sql.DB, insightsDB *sql.D } // todo(insights) add setting to disable this indexer - routines = append(routines, compression.NewCommitIndexerWorker(ctx, database.NewDB(mainAppDB), insightsDB, observationContext)) + routines = append(routines, compression.NewCommitIndexerWorker(ctx, database.NewDB(mainAppDB), insightsDB, time.Now, observationContext)) // Register the background goroutine which discovers historical gaps in data and enqueues // work to fill them - if not disabled. diff --git a/enterprise/internal/insights/compression/commits.go b/enterprise/internal/insights/compression/commits.go index e77cc084545..cd87a84414e 100644 --- a/enterprise/internal/insights/compression/commits.go +++ b/enterprise/internal/insights/compression/commits.go @@ -23,8 +23,8 @@ type CommitStore interface { Save(ctx context.Context, id api.RepoID, commit *gitdomain.Commit, debugInfo string) error Get(ctx context.Context, id api.RepoID, start time.Time, end time.Time) ([]CommitStamp, error) GetMetadata(ctx context.Context, id api.RepoID) (CommitIndexMetadata, error) - UpsertMetadataStamp(ctx context.Context, id api.RepoID) (CommitIndexMetadata, error) - InsertCommits(ctx context.Context, id api.RepoID, commits []*gitdomain.Commit, debugInfo string) error + UpsertMetadataStamp(ctx context.Context, id api.RepoID, indexedThrough time.Time) (CommitIndexMetadata, error) + InsertCommits(ctx context.Context, id api.RepoID, commits []*gitdomain.Commit, indexedThrough time.Time, debugInfo string) error } func NewCommitStore(db dbutil.DB) *DBCommitStore { @@ -52,7 +52,7 @@ func (c *DBCommitStore) Save(ctx context.Context, id api.RepoID, commit *gitdoma return nil } -func (c *DBCommitStore) InsertCommits(ctx context.Context, id api.RepoID, commits []*gitdomain.Commit, debugInfo string) (err error) { +func (c *DBCommitStore) InsertCommits(ctx context.Context, id api.RepoID, commits []*gitdomain.Commit, indexedThrough time.Time, debugInfo string) (err error) { tx, err := c.Transact(ctx) if err != nil { return err @@ -66,7 +66,7 @@ func (c *DBCommitStore) InsertCommits(ctx context.Context, id api.RepoID, commit } } - if _, err = tx.UpsertMetadataStamp(ctx, id); err != nil { + if _, err = tx.UpsertMetadataStamp(ctx, id, indexedThrough); err != nil { return err } @@ -108,8 +108,8 @@ func (c *DBCommitStore) GetMetadata(ctx context.Context, id api.RepoID) (CommitI } // UpsertMetadataStamp inserts (or updates, if the row already exists) the index metadata timestamp for a given repository -func (c *DBCommitStore) UpsertMetadataStamp(ctx context.Context, id api.RepoID) (CommitIndexMetadata, error) { - row := c.QueryRow(ctx, sqlf.Sprintf(upsertCommitIndexMetadataStampStr, id)) +func (c *DBCommitStore) UpsertMetadataStamp(ctx context.Context, id api.RepoID, indexedThrough time.Time) (CommitIndexMetadata, error) { + row := c.QueryRow(ctx, sqlf.Sprintf(upsertCommitIndexMetadataStampStr, id, indexedThrough)) var metadata CommitIndexMetadata if err := row.Scan(&metadata.RepoId, &metadata.Enabled, &metadata.LastIndexedAt); err != nil { @@ -154,6 +154,6 @@ const upsertCommitIndexMetadataStampStr = ` INSERT INTO commit_index_metadata(repo_id) VALUES (%v) ON CONFLICT (repo_id) DO UPDATE -SET last_indexed_at = CURRENT_TIMESTAMP +SET last_indexed_at = %v RETURNING repo_id, enabled, last_indexed_at; ` diff --git a/enterprise/internal/insights/compression/mock_commit_store.go b/enterprise/internal/insights/compression/mock_commit_store.go index bee259496c3..4f70ce4459e 100644 --- a/enterprise/internal/insights/compression/mock_commit_store.go +++ b/enterprise/internal/insights/compression/mock_commit_store.go @@ -48,7 +48,7 @@ func NewMockCommitStore() *MockCommitStore { }, }, InsertCommitsFunc: &CommitStoreInsertCommitsFunc{ - defaultHook: func(context.Context, api.RepoID, []*gitdomain.Commit, string) error { + defaultHook: func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error { return nil }, }, @@ -58,7 +58,7 @@ func NewMockCommitStore() *MockCommitStore { }, }, UpsertMetadataStampFunc: &CommitStoreUpsertMetadataStampFunc{ - defaultHook: func(context.Context, api.RepoID) (CommitIndexMetadata, error) { + defaultHook: func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) { return CommitIndexMetadata{}, nil }, }, @@ -80,7 +80,7 @@ func NewStrictMockCommitStore() *MockCommitStore { }, }, InsertCommitsFunc: &CommitStoreInsertCommitsFunc{ - defaultHook: func(context.Context, api.RepoID, []*gitdomain.Commit, string) error { + defaultHook: func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error { panic("unexpected invocation of MockCommitStore.InsertCommits") }, }, @@ -90,7 +90,7 @@ func NewStrictMockCommitStore() *MockCommitStore { }, }, UpsertMetadataStampFunc: &CommitStoreUpsertMetadataStampFunc{ - defaultHook: func(context.Context, api.RepoID) (CommitIndexMetadata, error) { + defaultHook: func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) { panic("unexpected invocation of MockCommitStore.UpsertMetadataStamp") }, }, @@ -344,24 +344,24 @@ func (c CommitStoreGetMetadataFuncCall) Results() []interface{} { // CommitStoreInsertCommitsFunc describes the behavior when the // InsertCommits method of the parent MockCommitStore instance is invoked. type CommitStoreInsertCommitsFunc struct { - defaultHook func(context.Context, api.RepoID, []*gitdomain.Commit, string) error - hooks []func(context.Context, api.RepoID, []*gitdomain.Commit, string) error + defaultHook func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error + hooks []func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error history []CommitStoreInsertCommitsFuncCall mutex sync.Mutex } // InsertCommits delegates to the next hook function in the queue and stores // the parameter and result values of this invocation. -func (m *MockCommitStore) InsertCommits(v0 context.Context, v1 api.RepoID, v2 []*gitdomain.Commit, v3 string) error { - r0 := m.InsertCommitsFunc.nextHook()(v0, v1, v2, v3) - m.InsertCommitsFunc.appendCall(CommitStoreInsertCommitsFuncCall{v0, v1, v2, v3, r0}) +func (m *MockCommitStore) InsertCommits(v0 context.Context, v1 api.RepoID, v2 []*gitdomain.Commit, v3 time.Time, v4 string) error { + r0 := m.InsertCommitsFunc.nextHook()(v0, v1, v2, v3, v4) + m.InsertCommitsFunc.appendCall(CommitStoreInsertCommitsFuncCall{v0, v1, v2, v3, v4, r0}) return r0 } // SetDefaultHook sets function that is called when the InsertCommits method // of the parent MockCommitStore instance is invoked and the hook queue is // empty. -func (f *CommitStoreInsertCommitsFunc) SetDefaultHook(hook func(context.Context, api.RepoID, []*gitdomain.Commit, string) error) { +func (f *CommitStoreInsertCommitsFunc) SetDefaultHook(hook func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error) { f.defaultHook = hook } @@ -369,7 +369,7 @@ func (f *CommitStoreInsertCommitsFunc) SetDefaultHook(hook func(context.Context, // InsertCommits method of the parent MockCommitStore instance invokes the // hook at the front of the queue and discards it. After the queue is empty, // the default hook function is invoked for any future action. -func (f *CommitStoreInsertCommitsFunc) PushHook(hook func(context.Context, api.RepoID, []*gitdomain.Commit, string) error) { +func (f *CommitStoreInsertCommitsFunc) PushHook(hook func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -378,19 +378,19 @@ func (f *CommitStoreInsertCommitsFunc) PushHook(hook func(context.Context, api.R // SetDefaultReturn calls SetDefaultHook with a function that returns the // given values. func (f *CommitStoreInsertCommitsFunc) SetDefaultReturn(r0 error) { - f.SetDefaultHook(func(context.Context, api.RepoID, []*gitdomain.Commit, string) error { + f.SetDefaultHook(func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error { return r0 }) } // PushReturn calls PushHook with a function that returns the given values. func (f *CommitStoreInsertCommitsFunc) PushReturn(r0 error) { - f.PushHook(func(context.Context, api.RepoID, []*gitdomain.Commit, string) error { + f.PushHook(func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error { return r0 }) } -func (f *CommitStoreInsertCommitsFunc) nextHook() func(context.Context, api.RepoID, []*gitdomain.Commit, string) error { +func (f *CommitStoreInsertCommitsFunc) nextHook() func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error { f.mutex.Lock() defer f.mutex.Unlock() @@ -434,7 +434,10 @@ type CommitStoreInsertCommitsFuncCall struct { Arg2 []*gitdomain.Commit // Arg3 is the value of the 4th argument passed to this method // invocation. - Arg3 string + Arg3 time.Time + // Arg4 is the value of the 5th argument passed to this method + // invocation. + Arg4 string // Result0 is the value of the 1st result returned from this method // invocation. Result0 error @@ -443,7 +446,7 @@ type CommitStoreInsertCommitsFuncCall struct { // Args returns an interface slice containing the arguments of this // invocation. func (c CommitStoreInsertCommitsFuncCall) Args() []interface{} { - return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3} + return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3, c.Arg4} } // Results returns an interface slice containing the results of this @@ -566,24 +569,24 @@ func (c CommitStoreSaveFuncCall) Results() []interface{} { // UpsertMetadataStamp method of the parent MockCommitStore instance is // invoked. type CommitStoreUpsertMetadataStampFunc struct { - defaultHook func(context.Context, api.RepoID) (CommitIndexMetadata, error) - hooks []func(context.Context, api.RepoID) (CommitIndexMetadata, error) + defaultHook func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) + hooks []func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) history []CommitStoreUpsertMetadataStampFuncCall mutex sync.Mutex } // UpsertMetadataStamp delegates to the next hook function in the queue and // stores the parameter and result values of this invocation. -func (m *MockCommitStore) UpsertMetadataStamp(v0 context.Context, v1 api.RepoID) (CommitIndexMetadata, error) { - r0, r1 := m.UpsertMetadataStampFunc.nextHook()(v0, v1) - m.UpsertMetadataStampFunc.appendCall(CommitStoreUpsertMetadataStampFuncCall{v0, v1, r0, r1}) +func (m *MockCommitStore) UpsertMetadataStamp(v0 context.Context, v1 api.RepoID, v2 time.Time) (CommitIndexMetadata, error) { + r0, r1 := m.UpsertMetadataStampFunc.nextHook()(v0, v1, v2) + m.UpsertMetadataStampFunc.appendCall(CommitStoreUpsertMetadataStampFuncCall{v0, v1, v2, r0, r1}) return r0, r1 } // SetDefaultHook sets function that is called when the UpsertMetadataStamp // method of the parent MockCommitStore instance is invoked and the hook // queue is empty. -func (f *CommitStoreUpsertMetadataStampFunc) SetDefaultHook(hook func(context.Context, api.RepoID) (CommitIndexMetadata, error)) { +func (f *CommitStoreUpsertMetadataStampFunc) SetDefaultHook(hook func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error)) { f.defaultHook = hook } @@ -591,7 +594,7 @@ func (f *CommitStoreUpsertMetadataStampFunc) SetDefaultHook(hook func(context.Co // UpsertMetadataStamp method of the parent MockCommitStore instance invokes // the hook at the front of the queue and discards it. After the queue is // empty, the default hook function is invoked for any future action. -func (f *CommitStoreUpsertMetadataStampFunc) PushHook(hook func(context.Context, api.RepoID) (CommitIndexMetadata, error)) { +func (f *CommitStoreUpsertMetadataStampFunc) PushHook(hook func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error)) { f.mutex.Lock() f.hooks = append(f.hooks, hook) f.mutex.Unlock() @@ -600,19 +603,19 @@ func (f *CommitStoreUpsertMetadataStampFunc) PushHook(hook func(context.Context, // SetDefaultReturn calls SetDefaultHook with a function that returns the // given values. func (f *CommitStoreUpsertMetadataStampFunc) SetDefaultReturn(r0 CommitIndexMetadata, r1 error) { - f.SetDefaultHook(func(context.Context, api.RepoID) (CommitIndexMetadata, error) { + f.SetDefaultHook(func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) { return r0, r1 }) } // PushReturn calls PushHook with a function that returns the given values. func (f *CommitStoreUpsertMetadataStampFunc) PushReturn(r0 CommitIndexMetadata, r1 error) { - f.PushHook(func(context.Context, api.RepoID) (CommitIndexMetadata, error) { + f.PushHook(func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) { return r0, r1 }) } -func (f *CommitStoreUpsertMetadataStampFunc) nextHook() func(context.Context, api.RepoID) (CommitIndexMetadata, error) { +func (f *CommitStoreUpsertMetadataStampFunc) nextHook() func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) { f.mutex.Lock() defer f.mutex.Unlock() @@ -652,6 +655,9 @@ type CommitStoreUpsertMetadataStampFuncCall struct { // Arg1 is the value of the 2nd argument passed to this method // invocation. Arg1 api.RepoID + // Arg2 is the value of the 3rd argument passed to this method + // invocation. + Arg2 time.Time // Result0 is the value of the 1st result returned from this method // invocation. Result0 CommitIndexMetadata @@ -663,7 +669,7 @@ type CommitStoreUpsertMetadataStampFuncCall struct { // Args returns an interface slice containing the arguments of this // invocation. func (c CommitStoreUpsertMetadataStampFuncCall) Args() []interface{} { - return []interface{}{c.Arg0, c.Arg1} + return []interface{}{c.Arg0, c.Arg1, c.Arg2} } // Results returns an interface slice containing the results of this diff --git a/enterprise/internal/insights/compression/worker.go b/enterprise/internal/insights/compression/worker.go index 24af8f5aaaf..689bbcc5ec4 100644 --- a/enterprise/internal/insights/compression/worker.go +++ b/enterprise/internal/insights/compression/worker.go @@ -34,14 +34,15 @@ type CommitIndexer struct { db database.DB limiter *rate.Limiter allReposIterator func(ctx context.Context, each func(repoName string, id api.RepoID) error) error - getCommits func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) + getCommits func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, until *time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) commitStore CommitStore maxHistoricalTime time.Time background context.Context operations *operations + clock func() time.Time } -func NewCommitIndexer(background context.Context, base database.DB, insights dbutil.DB, observationContext *observation.Context) *CommitIndexer { +func NewCommitIndexer(background context.Context, base database.DB, insights dbutil.DB, clock func() time.Time, observationContext *observation.Context) *CommitIndexer { //TODO(insights): add a setting for historical index length startTime := time.Now().AddDate(-1, 0, 0) @@ -74,12 +75,14 @@ func NewCommitIndexer(background context.Context, base database.DB, insights dbu background: background, getCommits: getCommits, operations: operations, + clock: clock, } + return &indexer } -func NewCommitIndexerWorker(ctx context.Context, base database.DB, insights dbutil.DB, observationContext *observation.Context) goroutine.BackgroundRoutine { - indexer := NewCommitIndexer(ctx, base, insights, observationContext) +func NewCommitIndexerWorker(ctx context.Context, base database.DB, insights dbutil.DB, clock func() time.Time, observationContext *observation.Context) goroutine.BackgroundRoutine { + indexer := NewCommitIndexer(ctx, base, insights, clock, observationContext) return indexer.Handler(ctx, observationContext) } @@ -106,24 +109,38 @@ func (i *CommitIndexer) indexAll(ctx context.Context) error { return nil } -// indexRepository attempts to index the commits given a repository name. This method will absorb any errors that -// occur during execution and skip the index for this repository. +// maxWindowsPerRepo Limits the number of windows of commits indexRepository can process per run +const maxWindowsPerRepo = 25 + +// indexRepository attempts to index the commits given a repository name one time window at a time. +// This method will absorb any errors that occur during execution and skip any remaining windows. // If this repository already has some commits indexed, only commits made more recently than the previous index will be added. func (i *CommitIndexer) indexRepository(name string, id api.RepoID) error { - err := i.index(name, id) - if err != nil { - log15.Error(err.Error()) + windowsProccssed := 0 + additionalWindows := true + // It is important that the window size stays consistent during processing + // so that it can correctly determine the time the repository has been indexed though + windowDuration := conf.Get().InsightsCommitIndexerWindowDuration + for additionalWindows && windowsProccssed < maxWindowsPerRepo { + var err error + additionalWindows, err = i.indexNextWindow(name, id, windowDuration) + windowsProccssed++ + if err != nil { + log15.Error(err.Error()) + return nil + } } return nil + } -func (i *CommitIndexer) index(name string, id api.RepoID) (err error) { +func (i *CommitIndexer) indexNextWindow(name string, id api.RepoID, windowDuration int) (moreWindows bool, err error) { ctx, cancel := context.WithTimeout(i.background, time.Second*45) defer cancel() err = i.limiter.Wait(ctx) if err != nil { - return nil + return false, err } logger := log15.Root().New("worker", "insights-commit-indexer") @@ -133,50 +150,63 @@ func (i *CommitIndexer) index(name string, id api.RepoID) (err error) { metadata, err := getMetadata(ctx, repoId, i.commitStore) if err != nil { - return errors.Wrapf(err, "unable to fetch commit index metadata repo_id: %v", repoId) + return false, errors.Wrapf(err, "unable to fetch commit index metadata repo_id: %v", repoId) } if !metadata.Enabled { logger.Debug("commit indexing disabled", "repo_id", repoId) - return nil + return false, nil } - searchTime := max(i.maxHistoricalTime, metadata.LastIndexedAt) + searchStartTime := max(i.maxHistoricalTime, metadata.LastIndexedAt) + commitLogRequestTime := i.clock().UTC() - logger.Debug("fetching commits", "repo_id", repoId, "after", searchTime) - commits, err := i.getCommits(ctx, i.db, repoName, searchTime, i.operations.getCommits) + var searchEndTime *time.Time + if windowDuration > 0 { + endTime := searchStartTime.Add(time.Duration(24*windowDuration) * time.Hour) + searchEndTime = &endTime + } + + logger.Debug("fetching commits", "repo_id", repoId, "after", searchStartTime, "until", searchEndTime) + commits, err := i.getCommits(ctx, i.db, repoName, searchStartTime, searchEndTime, i.operations.getCommits) if err != nil { - return errors.Wrapf(err, "error fetching commits from gitserver repo_id: %v", repoId) + return false, errors.Wrapf(err, "error fetching commits from gitserver repo_id: %v", repoId) } i.operations.countCommits.WithLabelValues().Add(float64(len(commits))) - if len(commits) == 0 { - logger.Debug("commit index up to date", "repo_id", repoId) - - if _, err = i.commitStore.UpsertMetadataStamp(ctx, repoId); err != nil { - return err + // default to thinking indexing is done + indexedThrough := commitLogRequestTime // The time we issued the git log request + moreWindows = false + // If we are looking at a window of time determine if reached the end + if searchEndTime != nil { + moreWindows = searchEndTime.Before(commitLogRequestTime) + if moreWindows { + indexedThrough = *searchEndTime } - - return nil } - log15.Debug("indexing commits", "repo_id", repoId, "count", len(commits)) - err = i.commitStore.InsertCommits(ctx, repoId, commits, fmt.Sprintf("|repoName:%s|repoId:%d", repoName, repoId)) + log15.Debug("indexing commits", "repo_id", repoId, "count", len(commits), "indexedThrough", indexedThrough) + err = i.commitStore.InsertCommits(ctx, repoId, commits, indexedThrough, fmt.Sprintf("|repoName:%s|repoId:%d", repoName, repoId)) if err != nil { - return errors.Wrapf(err, "unable to update commit index repo_id: %v", repoId) + return false, errors.Wrapf(err, "unable to update commit index repo_id: %v", repoId) } - return nil + return moreWindows, nil } // getCommits fetches the commits from the remote gitserver for a repository after a certain time. -func getCommits(ctx context.Context, db database.DB, name api.RepoName, after time.Time, operation *observation.Operation) (_ []*gitdomain.Commit, err error) { +func getCommits(ctx context.Context, db database.DB, name api.RepoName, after time.Time, until *time.Time, operation *observation.Operation) (_ []*gitdomain.Commit, err error) { ctx, endObservation := operation.With(ctx, &err, observation.Args{}) defer endObservation(1, observation.Args{}) - return git.Commits(ctx, db, name, git.CommitsOptions{N: 0, DateOrder: true, NoEnsureRevision: true, After: after.Format(time.RFC3339)}, authz.DefaultSubRepoPermsChecker) + before := "" + if until != nil { + before = until.Format(time.RFC3339) + } + + return git.Commits(ctx, db, name, git.CommitsOptions{N: 0, DateOrder: true, NoEnsureRevision: true, After: after.Format(time.RFC3339), Before: before}, authz.DefaultSubRepoPermsChecker) } // getMetadata gets the index metadata for a repository. The metadata will be generated if it doesn't already exist, such as @@ -184,7 +214,7 @@ func getCommits(ctx context.Context, db database.DB, name api.RepoName, after ti func getMetadata(ctx context.Context, id api.RepoID, store CommitStore) (CommitIndexMetadata, error) { metadata, err := store.GetMetadata(ctx, id) if errors.Is(err, sql.ErrNoRows) { - metadata, err = store.UpsertMetadataStamp(ctx, id) + metadata, err = store.UpsertMetadataStamp(ctx, id, time.Time{}.UTC()) } if err != nil { return CommitIndexMetadata{}, err diff --git a/enterprise/internal/insights/compression/worker_test.go b/enterprise/internal/insights/compression/worker_test.go index 82bca52009f..846537fa6af 100644 --- a/enterprise/internal/insights/compression/worker_test.go +++ b/enterprise/internal/insights/compression/worker_test.go @@ -10,23 +10,29 @@ import ( "golang.org/x/time/rate" "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain" "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/schema" ) +var ops *operations = newOperations(&observation.TestContext) + func TestCommitIndexer_indexAll(t *testing.T) { ctx := context.Background() commitStore := NewMockCommitStore() maxHistorical := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) + clock := func() time.Time { return time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) } indexer := CommitIndexer{ limiter: rate.NewLimiter(10, 1), commitStore: commitStore, maxHistoricalTime: maxHistorical, background: context.Background(), - operations: newOperations(&observation.TestContext), + operations: ops, + clock: clock, } // Testing a scenario with 3 repos @@ -54,22 +60,29 @@ func TestCommitIndexer_indexAll(t *testing.T) { commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ RepoId: 1, Enabled: false, - LastIndexedAt: time.Now(), + LastIndexedAt: time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC), }, nil) commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ RepoId: 2, Enabled: true, - LastIndexedAt: time.Now(), + LastIndexedAt: time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC), }, nil) commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ RepoId: 3, Enabled: true, - LastIndexedAt: time.Now(), + LastIndexedAt: time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC), }, nil) t.Run("multi_repository", func(t *testing.T) { + windowDuration := 0 + conf.Mock(&conf.Unified{ + SiteConfiguration: schema.SiteConfiguration{ + InsightsCommitIndexerWindowDuration: windowDuration, + }, + }) + defer conf.Mock(nil) err := indexer.indexAll(ctx) if err != nil { t.Fatal(err) @@ -80,27 +93,28 @@ func TestCommitIndexer_indexAll(t *testing.T) { t.Errorf("got GetMetadata invocations: %v want %v", got, want) } - // Only one repository should actually update any commits - if got, want := len(commitStore.InsertCommitsFunc.history), 1; got != want { + // Both enabled repositories should call insert commits + if got, want := len(commitStore.InsertCommitsFunc.history), 2; got != want { t.Errorf("got InsertCommits invocations: %v want %v", got, want) } else { - call := commitStore.InsertCommitsFunc.history[0] - for i, got := range call.Arg2 { - if diff := cmp.Diff(commits["really-big-repo"][i], got); diff != "" { - t.Errorf("unexpected commit\n%s", diff) + calls := map[string]CommitStoreInsertCommitsFuncCall{ + "really-big-repo": commitStore.InsertCommitsFunc.history[0], + "no-commits": commitStore.InsertCommitsFunc.history[1], + } + for repo, call := range calls { + // Check Indexed though is the clock time + if diff := cmp.Diff(clock(), call.Arg3); diff != "" { + t.Errorf("unexpected indexed though date/time") + } + // Check the correct commits + for i, got := range call.Arg2 { + if diff := cmp.Diff(commits[repo][i], got); diff != "" { + t.Errorf("unexpected commit\n%s", diff) + } } } } - // One repository had no commits, so only the timestamp would get updated - if got, want := len(commitStore.UpsertMetadataStampFunc.history), 1; got != want { - t.Errorf("got UpsertMetadataStamp invocations: %v want %v", got, want) - } else { - call := commitStore.UpsertMetadataStampFunc.history[0] - if call.Arg1 != 2 { - t.Errorf("unexpected repository for UpsertMetadataStamp repo_id: %v", call.Arg1) - } - } }) } @@ -171,6 +185,187 @@ func Test_getMetadata_NoInsertRequired(t *testing.T) { }) } +func TestCommitIndexer_windowing(t *testing.T) { + ctx := context.Background() + commitStore := NewMockCommitStore() + + maxHistorical := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) + clock := func() time.Time { return time.Date(2020, time.June, 1, 0, 0, 0, 0, time.UTC) } + + indexer := CommitIndexer{ + limiter: rate.NewLimiter(10, 1), + commitStore: commitStore, + maxHistoricalTime: maxHistorical, + background: context.Background(), + operations: ops, + clock: clock, + } + + // Testing a scenario with 3 repos and a window of 30 days + // "repo-one" has been recently indexed and all commits are in one window + // "really-big-repo" has 2 windows of commits + // "no-commits-recent" has no commits and was recently indexed + // "no-commits-not-recent" has no commits but is 2 windows behind on indexing + commits := map[string][]*gitdomain.Commit{ + "repo-one": { + commit("ref1", "2020-05-10T00:00:00+00:00"), + commit("ref2", "2020-05-12T00:00:00+00:00"), + }, + "really-big-repo": { + commit("bigref1", "2020-04-17T00:00:00+00:00"), + commit("bigref2", "2020-04-18T00:00:00+00:00"), + commit("bigref3", "2020-05-17T00:00:00+00:00"), + commit("bigref4", "2020-05-18T00:00:00+00:00"), + }, + "no-commits-recent": {}, + "no-commits-not-recent": {}, + "only-recent": { + commit("bigref4", "2020-05-18T00:00:00+00:00"), + }, + } + indexer.getCommits = mockCommits(commits) + indexer.allReposIterator = mockIterator([]string{"repo-one", "really-big-repo", "no-commits-recent", "no-commits-not-recent", "only-recent"}) + + commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ + RepoId: 1, + Enabled: true, + LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC), + }, nil) + + commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ + RepoId: 2, + Enabled: true, + LastIndexedAt: time.Date(2020, time.April, 5, 0, 0, 0, 0, time.UTC), + }, nil) + + commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ + RepoId: 2, + Enabled: true, + LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC), + }, nil) + + commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ + RepoId: 3, + Enabled: true, + LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC), + }, nil) + + commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ + RepoId: 4, + Enabled: true, + LastIndexedAt: time.Date(2020, time.April, 5, 0, 0, 0, 0, time.UTC), + }, nil) + + commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ + RepoId: 4, + Enabled: true, + LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC), + }, nil) + + commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ + RepoId: 5, + Enabled: true, + LastIndexedAt: time.Date(2020, time.April, 5, 0, 0, 0, 0, time.UTC), + }, nil) + + commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{ + RepoId: 5, + Enabled: true, + LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC), + }, nil) + + endOfApril5Window := time.Date(2020, time.April, 5, 0, 0, 0, 0, time.UTC).Add(24 * 30 * time.Hour) + + t.Run("multi_repository_paging", func(t *testing.T) { + + conf.Mock(&conf.Unified{ + SiteConfiguration: schema.SiteConfiguration{ + InsightsCommitIndexerWindowDuration: 30, + }, + }) + defer conf.Mock(nil) + err := indexer.indexAll(ctx) + if err != nil { + t.Fatal(err) + } + + // 4 enabled repos get metadata, repo 2, 4 and 5 need 2 windows all others just 1 + if got, want := len(commitStore.GetMetadataFunc.history), 8; got != want { + t.Errorf("got GetMetadata invocations: %v want %v", got, want) + } + + // Each time though we call insert commits even if there are none repo 2, 4 and 5 need 2 windows so 8 total + if got, want := len(commitStore.InsertCommitsFunc.history), 8; got != want { + t.Errorf("got InsertCommits invocations: %v want %v", got, want) + } else { + + /* repo one + ** All commits present and sets last indexed to the clock time + */ + checkCommits(t, commits["repo-one"], commitStore.InsertCommitsFunc.history[0].Arg2) + checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[0].Arg3) + + /* really-big-repo + ** Last indexed more than 1 window ago so needs to make 2 passes + ** First Pass: + ** First two commits and sets last indxed to the end of the time window (last_indexed + 30 days) + ** Second Pass: + ** Last two commits and sets last indexed to clock time because end of window was greater than clock + */ + checkCommits(t, commits["really-big-repo"][:2], commitStore.InsertCommitsFunc.history[1].Arg2) + checkIndexedThough(t, endOfApril5Window, commitStore.InsertCommitsFunc.history[1].Arg3) + checkCommits(t, commits["really-big-repo"][2:], commitStore.InsertCommitsFunc.history[2].Arg2) + checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[2].Arg3) + + /* no-commits-recent + ** There are no commits to save and sets last indexed to the clock time + */ + checkCommits(t, []*gitdomain.Commit{}, commitStore.InsertCommitsFunc.history[3].Arg2) + checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[3].Arg3) + + /* no-commits-not-recent + ** Last indexed is more than 1 window agao so need to make 2 passes + ** First Pass: + ** No commits to save and sets last indxed to the end of the time window (last_indexed + 30 days) + ** Second Pass: + ** Still no commits and sets last indexed to clock time + */ + checkCommits(t, []*gitdomain.Commit{}, commitStore.InsertCommitsFunc.history[4].Arg2) + checkIndexedThough(t, endOfApril5Window, commitStore.InsertCommitsFunc.history[4].Arg3) + checkCommits(t, []*gitdomain.Commit{}, commitStore.InsertCommitsFunc.history[5].Arg2) + checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[5].Arg3) + + /* only-recent + ** Last indexed is more than 1 window agao so need to make 2 passes + ** First Pass: + ** No commits to save and sets last indxed to the end of the time window (last_indexed + 30 days) + ** Second Pass: + ** Saves the 1 commit and sets last indexed to clock time + */ + checkCommits(t, []*gitdomain.Commit{}, commitStore.InsertCommitsFunc.history[6].Arg2) + checkIndexedThough(t, endOfApril5Window, commitStore.InsertCommitsFunc.history[6].Arg3) + checkCommits(t, commits["only-recent"], commitStore.InsertCommitsFunc.history[7].Arg2) + checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[7].Arg3) + + } + + }) +} + +func checkCommits(t *testing.T, want []*gitdomain.Commit, got []*gitdomain.Commit) { + for i, commit := range got { + if diff := cmp.Diff(want[i], commit); diff != "" { + t.Errorf("unexpected commit\n%s", diff) + } + } +} + +func checkIndexedThough(t *testing.T, want time.Time, got time.Time) { + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected indexed through date\n%s", diff) + } +} + // mockIterator generates iterator methods given a list of repo names for test scenarios func mockIterator(repos []string) func(ctx context.Context, each func(repoName string, id api.RepoID) error) error { return func(ctx context.Context, each func(repoName string, id api.RepoID) error) error { @@ -194,8 +389,18 @@ func commit(ref string, commitTime string) *gitdomain.Commit { } } -func mockCommits(commits map[string][]*gitdomain.Commit) func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) { - return func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) { - return commits[(string(name))], nil +func mockCommits(commits map[string][]*gitdomain.Commit) func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, until *time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) { + return func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, until *time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) { + filteredCommits := make([]*gitdomain.Commit, 0) + for _, commit := range commits[string(name)] { + if commit.Committer.Date.Before(after) { + continue + } + if until != nil && commit.Committer.Date.After(*until) { + continue + } + filteredCommits = append(filteredCommits, commit) + } + return filteredCommits, nil } } diff --git a/schema/schema.go b/schema/schema.go index a37be4bb67e..6a105c8cb59 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -1809,6 +1809,8 @@ type SiteConfiguration struct { HtmlHeadTop string `json:"htmlHeadTop,omitempty"` // InsightsCommitIndexerInterval description: The interval (in minutes) at which the insights commit indexer will check for new commits. InsightsCommitIndexerInterval int `json:"insights.commit.indexer.interval,omitempty"` + // InsightsCommitIndexerWindowDuration description: The number of days of commits the insights commit indexer will pull during each request (0 is no limit). + InsightsCommitIndexerWindowDuration int `json:"insights.commit.indexer.windowDuration,omitempty"` // InsightsHistoricalFrameLength description: (debug) duration of historical insights timeframes, one point per repository will be recorded in each timeframe. InsightsHistoricalFrameLength string `json:"insights.historical.frameLength,omitempty"` // InsightsHistoricalFrames description: (debug) number of historical insights timeframes to populate diff --git a/schema/site.schema.json b/schema/site.schema.json index 4c96633caed..63c365e2f11 100644 --- a/schema/site.schema.json +++ b/schema/site.schema.json @@ -1195,6 +1195,13 @@ "default": 60, "examples": [120] }, + "insights.commit.indexer.windowDuration": { + "description": "The number of days of commits the insights commit indexer will pull during each request (0 is no limit).", + "type": "integer", + "group": "CodeInsights", + "default": 0, + "examples": [30] + }, "htmlHeadTop": { "description": "HTML to inject at the top of the `` element on each page, for analytics scripts", "type": "string",