mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 17:31:43 +00:00
Insights: Optional batching of commit indexer (#33666)
Setting to allow insights commit indexer to retrieve historic commits in batches based on a number of days
This commit is contained in:
parent
2b7071956c
commit
0a73bcb369
@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/pings"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
@ -56,7 +57,7 @@ func GetBackgroundJobs(ctx context.Context, mainAppDB *sql.DB, insightsDB *sql.D
|
||||
}
|
||||
|
||||
// todo(insights) add setting to disable this indexer
|
||||
routines = append(routines, compression.NewCommitIndexerWorker(ctx, database.NewDB(mainAppDB), insightsDB, observationContext))
|
||||
routines = append(routines, compression.NewCommitIndexerWorker(ctx, database.NewDB(mainAppDB), insightsDB, time.Now, observationContext))
|
||||
|
||||
// Register the background goroutine which discovers historical gaps in data and enqueues
|
||||
// work to fill them - if not disabled.
|
||||
|
||||
@ -23,8 +23,8 @@ type CommitStore interface {
|
||||
Save(ctx context.Context, id api.RepoID, commit *gitdomain.Commit, debugInfo string) error
|
||||
Get(ctx context.Context, id api.RepoID, start time.Time, end time.Time) ([]CommitStamp, error)
|
||||
GetMetadata(ctx context.Context, id api.RepoID) (CommitIndexMetadata, error)
|
||||
UpsertMetadataStamp(ctx context.Context, id api.RepoID) (CommitIndexMetadata, error)
|
||||
InsertCommits(ctx context.Context, id api.RepoID, commits []*gitdomain.Commit, debugInfo string) error
|
||||
UpsertMetadataStamp(ctx context.Context, id api.RepoID, indexedThrough time.Time) (CommitIndexMetadata, error)
|
||||
InsertCommits(ctx context.Context, id api.RepoID, commits []*gitdomain.Commit, indexedThrough time.Time, debugInfo string) error
|
||||
}
|
||||
|
||||
func NewCommitStore(db dbutil.DB) *DBCommitStore {
|
||||
@ -52,7 +52,7 @@ func (c *DBCommitStore) Save(ctx context.Context, id api.RepoID, commit *gitdoma
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *DBCommitStore) InsertCommits(ctx context.Context, id api.RepoID, commits []*gitdomain.Commit, debugInfo string) (err error) {
|
||||
func (c *DBCommitStore) InsertCommits(ctx context.Context, id api.RepoID, commits []*gitdomain.Commit, indexedThrough time.Time, debugInfo string) (err error) {
|
||||
tx, err := c.Transact(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -66,7 +66,7 @@ func (c *DBCommitStore) InsertCommits(ctx context.Context, id api.RepoID, commit
|
||||
}
|
||||
}
|
||||
|
||||
if _, err = tx.UpsertMetadataStamp(ctx, id); err != nil {
|
||||
if _, err = tx.UpsertMetadataStamp(ctx, id, indexedThrough); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -108,8 +108,8 @@ func (c *DBCommitStore) GetMetadata(ctx context.Context, id api.RepoID) (CommitI
|
||||
}
|
||||
|
||||
// UpsertMetadataStamp inserts (or updates, if the row already exists) the index metadata timestamp for a given repository
|
||||
func (c *DBCommitStore) UpsertMetadataStamp(ctx context.Context, id api.RepoID) (CommitIndexMetadata, error) {
|
||||
row := c.QueryRow(ctx, sqlf.Sprintf(upsertCommitIndexMetadataStampStr, id))
|
||||
func (c *DBCommitStore) UpsertMetadataStamp(ctx context.Context, id api.RepoID, indexedThrough time.Time) (CommitIndexMetadata, error) {
|
||||
row := c.QueryRow(ctx, sqlf.Sprintf(upsertCommitIndexMetadataStampStr, id, indexedThrough))
|
||||
|
||||
var metadata CommitIndexMetadata
|
||||
if err := row.Scan(&metadata.RepoId, &metadata.Enabled, &metadata.LastIndexedAt); err != nil {
|
||||
@ -154,6 +154,6 @@ const upsertCommitIndexMetadataStampStr = `
|
||||
INSERT INTO commit_index_metadata(repo_id)
|
||||
VALUES (%v)
|
||||
ON CONFLICT (repo_id) DO UPDATE
|
||||
SET last_indexed_at = CURRENT_TIMESTAMP
|
||||
SET last_indexed_at = %v
|
||||
RETURNING repo_id, enabled, last_indexed_at;
|
||||
`
|
||||
|
||||
@ -48,7 +48,7 @@ func NewMockCommitStore() *MockCommitStore {
|
||||
},
|
||||
},
|
||||
InsertCommitsFunc: &CommitStoreInsertCommitsFunc{
|
||||
defaultHook: func(context.Context, api.RepoID, []*gitdomain.Commit, string) error {
|
||||
defaultHook: func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error {
|
||||
return nil
|
||||
},
|
||||
},
|
||||
@ -58,7 +58,7 @@ func NewMockCommitStore() *MockCommitStore {
|
||||
},
|
||||
},
|
||||
UpsertMetadataStampFunc: &CommitStoreUpsertMetadataStampFunc{
|
||||
defaultHook: func(context.Context, api.RepoID) (CommitIndexMetadata, error) {
|
||||
defaultHook: func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) {
|
||||
return CommitIndexMetadata{}, nil
|
||||
},
|
||||
},
|
||||
@ -80,7 +80,7 @@ func NewStrictMockCommitStore() *MockCommitStore {
|
||||
},
|
||||
},
|
||||
InsertCommitsFunc: &CommitStoreInsertCommitsFunc{
|
||||
defaultHook: func(context.Context, api.RepoID, []*gitdomain.Commit, string) error {
|
||||
defaultHook: func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error {
|
||||
panic("unexpected invocation of MockCommitStore.InsertCommits")
|
||||
},
|
||||
},
|
||||
@ -90,7 +90,7 @@ func NewStrictMockCommitStore() *MockCommitStore {
|
||||
},
|
||||
},
|
||||
UpsertMetadataStampFunc: &CommitStoreUpsertMetadataStampFunc{
|
||||
defaultHook: func(context.Context, api.RepoID) (CommitIndexMetadata, error) {
|
||||
defaultHook: func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) {
|
||||
panic("unexpected invocation of MockCommitStore.UpsertMetadataStamp")
|
||||
},
|
||||
},
|
||||
@ -344,24 +344,24 @@ func (c CommitStoreGetMetadataFuncCall) Results() []interface{} {
|
||||
// CommitStoreInsertCommitsFunc describes the behavior when the
|
||||
// InsertCommits method of the parent MockCommitStore instance is invoked.
|
||||
type CommitStoreInsertCommitsFunc struct {
|
||||
defaultHook func(context.Context, api.RepoID, []*gitdomain.Commit, string) error
|
||||
hooks []func(context.Context, api.RepoID, []*gitdomain.Commit, string) error
|
||||
defaultHook func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error
|
||||
hooks []func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error
|
||||
history []CommitStoreInsertCommitsFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// InsertCommits delegates to the next hook function in the queue and stores
|
||||
// the parameter and result values of this invocation.
|
||||
func (m *MockCommitStore) InsertCommits(v0 context.Context, v1 api.RepoID, v2 []*gitdomain.Commit, v3 string) error {
|
||||
r0 := m.InsertCommitsFunc.nextHook()(v0, v1, v2, v3)
|
||||
m.InsertCommitsFunc.appendCall(CommitStoreInsertCommitsFuncCall{v0, v1, v2, v3, r0})
|
||||
func (m *MockCommitStore) InsertCommits(v0 context.Context, v1 api.RepoID, v2 []*gitdomain.Commit, v3 time.Time, v4 string) error {
|
||||
r0 := m.InsertCommitsFunc.nextHook()(v0, v1, v2, v3, v4)
|
||||
m.InsertCommitsFunc.appendCall(CommitStoreInsertCommitsFuncCall{v0, v1, v2, v3, v4, r0})
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the InsertCommits method
|
||||
// of the parent MockCommitStore instance is invoked and the hook queue is
|
||||
// empty.
|
||||
func (f *CommitStoreInsertCommitsFunc) SetDefaultHook(hook func(context.Context, api.RepoID, []*gitdomain.Commit, string) error) {
|
||||
func (f *CommitStoreInsertCommitsFunc) SetDefaultHook(hook func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
@ -369,7 +369,7 @@ func (f *CommitStoreInsertCommitsFunc) SetDefaultHook(hook func(context.Context,
|
||||
// InsertCommits method of the parent MockCommitStore instance invokes the
|
||||
// hook at the front of the queue and discards it. After the queue is empty,
|
||||
// the default hook function is invoked for any future action.
|
||||
func (f *CommitStoreInsertCommitsFunc) PushHook(hook func(context.Context, api.RepoID, []*gitdomain.Commit, string) error) {
|
||||
func (f *CommitStoreInsertCommitsFunc) PushHook(hook func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
@ -378,19 +378,19 @@ func (f *CommitStoreInsertCommitsFunc) PushHook(hook func(context.Context, api.R
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *CommitStoreInsertCommitsFunc) SetDefaultReturn(r0 error) {
|
||||
f.SetDefaultHook(func(context.Context, api.RepoID, []*gitdomain.Commit, string) error {
|
||||
f.SetDefaultHook(func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *CommitStoreInsertCommitsFunc) PushReturn(r0 error) {
|
||||
f.PushHook(func(context.Context, api.RepoID, []*gitdomain.Commit, string) error {
|
||||
f.PushHook(func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
func (f *CommitStoreInsertCommitsFunc) nextHook() func(context.Context, api.RepoID, []*gitdomain.Commit, string) error {
|
||||
func (f *CommitStoreInsertCommitsFunc) nextHook() func(context.Context, api.RepoID, []*gitdomain.Commit, time.Time, string) error {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
@ -434,7 +434,10 @@ type CommitStoreInsertCommitsFuncCall struct {
|
||||
Arg2 []*gitdomain.Commit
|
||||
// Arg3 is the value of the 4th argument passed to this method
|
||||
// invocation.
|
||||
Arg3 string
|
||||
Arg3 time.Time
|
||||
// Arg4 is the value of the 5th argument passed to this method
|
||||
// invocation.
|
||||
Arg4 string
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 error
|
||||
@ -443,7 +446,7 @@ type CommitStoreInsertCommitsFuncCall struct {
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c CommitStoreInsertCommitsFuncCall) Args() []interface{} {
|
||||
return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3}
|
||||
return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3, c.Arg4}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
@ -566,24 +569,24 @@ func (c CommitStoreSaveFuncCall) Results() []interface{} {
|
||||
// UpsertMetadataStamp method of the parent MockCommitStore instance is
|
||||
// invoked.
|
||||
type CommitStoreUpsertMetadataStampFunc struct {
|
||||
defaultHook func(context.Context, api.RepoID) (CommitIndexMetadata, error)
|
||||
hooks []func(context.Context, api.RepoID) (CommitIndexMetadata, error)
|
||||
defaultHook func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error)
|
||||
hooks []func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error)
|
||||
history []CommitStoreUpsertMetadataStampFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// UpsertMetadataStamp delegates to the next hook function in the queue and
|
||||
// stores the parameter and result values of this invocation.
|
||||
func (m *MockCommitStore) UpsertMetadataStamp(v0 context.Context, v1 api.RepoID) (CommitIndexMetadata, error) {
|
||||
r0, r1 := m.UpsertMetadataStampFunc.nextHook()(v0, v1)
|
||||
m.UpsertMetadataStampFunc.appendCall(CommitStoreUpsertMetadataStampFuncCall{v0, v1, r0, r1})
|
||||
func (m *MockCommitStore) UpsertMetadataStamp(v0 context.Context, v1 api.RepoID, v2 time.Time) (CommitIndexMetadata, error) {
|
||||
r0, r1 := m.UpsertMetadataStampFunc.nextHook()(v0, v1, v2)
|
||||
m.UpsertMetadataStampFunc.appendCall(CommitStoreUpsertMetadataStampFuncCall{v0, v1, v2, r0, r1})
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the UpsertMetadataStamp
|
||||
// method of the parent MockCommitStore instance is invoked and the hook
|
||||
// queue is empty.
|
||||
func (f *CommitStoreUpsertMetadataStampFunc) SetDefaultHook(hook func(context.Context, api.RepoID) (CommitIndexMetadata, error)) {
|
||||
func (f *CommitStoreUpsertMetadataStampFunc) SetDefaultHook(hook func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error)) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
@ -591,7 +594,7 @@ func (f *CommitStoreUpsertMetadataStampFunc) SetDefaultHook(hook func(context.Co
|
||||
// UpsertMetadataStamp method of the parent MockCommitStore instance invokes
|
||||
// the hook at the front of the queue and discards it. After the queue is
|
||||
// empty, the default hook function is invoked for any future action.
|
||||
func (f *CommitStoreUpsertMetadataStampFunc) PushHook(hook func(context.Context, api.RepoID) (CommitIndexMetadata, error)) {
|
||||
func (f *CommitStoreUpsertMetadataStampFunc) PushHook(hook func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error)) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
@ -600,19 +603,19 @@ func (f *CommitStoreUpsertMetadataStampFunc) PushHook(hook func(context.Context,
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *CommitStoreUpsertMetadataStampFunc) SetDefaultReturn(r0 CommitIndexMetadata, r1 error) {
|
||||
f.SetDefaultHook(func(context.Context, api.RepoID) (CommitIndexMetadata, error) {
|
||||
f.SetDefaultHook(func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) {
|
||||
return r0, r1
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *CommitStoreUpsertMetadataStampFunc) PushReturn(r0 CommitIndexMetadata, r1 error) {
|
||||
f.PushHook(func(context.Context, api.RepoID) (CommitIndexMetadata, error) {
|
||||
f.PushHook(func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) {
|
||||
return r0, r1
|
||||
})
|
||||
}
|
||||
|
||||
func (f *CommitStoreUpsertMetadataStampFunc) nextHook() func(context.Context, api.RepoID) (CommitIndexMetadata, error) {
|
||||
func (f *CommitStoreUpsertMetadataStampFunc) nextHook() func(context.Context, api.RepoID, time.Time) (CommitIndexMetadata, error) {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
@ -652,6 +655,9 @@ type CommitStoreUpsertMetadataStampFuncCall struct {
|
||||
// Arg1 is the value of the 2nd argument passed to this method
|
||||
// invocation.
|
||||
Arg1 api.RepoID
|
||||
// Arg2 is the value of the 3rd argument passed to this method
|
||||
// invocation.
|
||||
Arg2 time.Time
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 CommitIndexMetadata
|
||||
@ -663,7 +669,7 @@ type CommitStoreUpsertMetadataStampFuncCall struct {
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c CommitStoreUpsertMetadataStampFuncCall) Args() []interface{} {
|
||||
return []interface{}{c.Arg0, c.Arg1}
|
||||
return []interface{}{c.Arg0, c.Arg1, c.Arg2}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
|
||||
@ -34,14 +34,15 @@ type CommitIndexer struct {
|
||||
db database.DB
|
||||
limiter *rate.Limiter
|
||||
allReposIterator func(ctx context.Context, each func(repoName string, id api.RepoID) error) error
|
||||
getCommits func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error)
|
||||
getCommits func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, until *time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error)
|
||||
commitStore CommitStore
|
||||
maxHistoricalTime time.Time
|
||||
background context.Context
|
||||
operations *operations
|
||||
clock func() time.Time
|
||||
}
|
||||
|
||||
func NewCommitIndexer(background context.Context, base database.DB, insights dbutil.DB, observationContext *observation.Context) *CommitIndexer {
|
||||
func NewCommitIndexer(background context.Context, base database.DB, insights dbutil.DB, clock func() time.Time, observationContext *observation.Context) *CommitIndexer {
|
||||
//TODO(insights): add a setting for historical index length
|
||||
startTime := time.Now().AddDate(-1, 0, 0)
|
||||
|
||||
@ -74,12 +75,14 @@ func NewCommitIndexer(background context.Context, base database.DB, insights dbu
|
||||
background: background,
|
||||
getCommits: getCommits,
|
||||
operations: operations,
|
||||
clock: clock,
|
||||
}
|
||||
|
||||
return &indexer
|
||||
}
|
||||
|
||||
func NewCommitIndexerWorker(ctx context.Context, base database.DB, insights dbutil.DB, observationContext *observation.Context) goroutine.BackgroundRoutine {
|
||||
indexer := NewCommitIndexer(ctx, base, insights, observationContext)
|
||||
func NewCommitIndexerWorker(ctx context.Context, base database.DB, insights dbutil.DB, clock func() time.Time, observationContext *observation.Context) goroutine.BackgroundRoutine {
|
||||
indexer := NewCommitIndexer(ctx, base, insights, clock, observationContext)
|
||||
|
||||
return indexer.Handler(ctx, observationContext)
|
||||
}
|
||||
@ -106,24 +109,38 @@ func (i *CommitIndexer) indexAll(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// indexRepository attempts to index the commits given a repository name. This method will absorb any errors that
|
||||
// occur during execution and skip the index for this repository.
|
||||
// maxWindowsPerRepo Limits the number of windows of commits indexRepository can process per run
|
||||
const maxWindowsPerRepo = 25
|
||||
|
||||
// indexRepository attempts to index the commits given a repository name one time window at a time.
|
||||
// This method will absorb any errors that occur during execution and skip any remaining windows.
|
||||
// If this repository already has some commits indexed, only commits made more recently than the previous index will be added.
|
||||
func (i *CommitIndexer) indexRepository(name string, id api.RepoID) error {
|
||||
err := i.index(name, id)
|
||||
if err != nil {
|
||||
log15.Error(err.Error())
|
||||
windowsProccssed := 0
|
||||
additionalWindows := true
|
||||
// It is important that the window size stays consistent during processing
|
||||
// so that it can correctly determine the time the repository has been indexed though
|
||||
windowDuration := conf.Get().InsightsCommitIndexerWindowDuration
|
||||
for additionalWindows && windowsProccssed < maxWindowsPerRepo {
|
||||
var err error
|
||||
additionalWindows, err = i.indexNextWindow(name, id, windowDuration)
|
||||
windowsProccssed++
|
||||
if err != nil {
|
||||
log15.Error(err.Error())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (i *CommitIndexer) index(name string, id api.RepoID) (err error) {
|
||||
func (i *CommitIndexer) indexNextWindow(name string, id api.RepoID, windowDuration int) (moreWindows bool, err error) {
|
||||
ctx, cancel := context.WithTimeout(i.background, time.Second*45)
|
||||
defer cancel()
|
||||
|
||||
err = i.limiter.Wait(ctx)
|
||||
if err != nil {
|
||||
return nil
|
||||
return false, err
|
||||
}
|
||||
|
||||
logger := log15.Root().New("worker", "insights-commit-indexer")
|
||||
@ -133,50 +150,63 @@ func (i *CommitIndexer) index(name string, id api.RepoID) (err error) {
|
||||
|
||||
metadata, err := getMetadata(ctx, repoId, i.commitStore)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to fetch commit index metadata repo_id: %v", repoId)
|
||||
return false, errors.Wrapf(err, "unable to fetch commit index metadata repo_id: %v", repoId)
|
||||
}
|
||||
|
||||
if !metadata.Enabled {
|
||||
logger.Debug("commit indexing disabled", "repo_id", repoId)
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
searchTime := max(i.maxHistoricalTime, metadata.LastIndexedAt)
|
||||
searchStartTime := max(i.maxHistoricalTime, metadata.LastIndexedAt)
|
||||
commitLogRequestTime := i.clock().UTC()
|
||||
|
||||
logger.Debug("fetching commits", "repo_id", repoId, "after", searchTime)
|
||||
commits, err := i.getCommits(ctx, i.db, repoName, searchTime, i.operations.getCommits)
|
||||
var searchEndTime *time.Time
|
||||
if windowDuration > 0 {
|
||||
endTime := searchStartTime.Add(time.Duration(24*windowDuration) * time.Hour)
|
||||
searchEndTime = &endTime
|
||||
}
|
||||
|
||||
logger.Debug("fetching commits", "repo_id", repoId, "after", searchStartTime, "until", searchEndTime)
|
||||
commits, err := i.getCommits(ctx, i.db, repoName, searchStartTime, searchEndTime, i.operations.getCommits)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error fetching commits from gitserver repo_id: %v", repoId)
|
||||
return false, errors.Wrapf(err, "error fetching commits from gitserver repo_id: %v", repoId)
|
||||
}
|
||||
|
||||
i.operations.countCommits.WithLabelValues().Add(float64(len(commits)))
|
||||
|
||||
if len(commits) == 0 {
|
||||
logger.Debug("commit index up to date", "repo_id", repoId)
|
||||
|
||||
if _, err = i.commitStore.UpsertMetadataStamp(ctx, repoId); err != nil {
|
||||
return err
|
||||
// default to thinking indexing is done
|
||||
indexedThrough := commitLogRequestTime // The time we issued the git log request
|
||||
moreWindows = false
|
||||
|
||||
// If we are looking at a window of time determine if reached the end
|
||||
if searchEndTime != nil {
|
||||
moreWindows = searchEndTime.Before(commitLogRequestTime)
|
||||
if moreWindows {
|
||||
indexedThrough = *searchEndTime
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
log15.Debug("indexing commits", "repo_id", repoId, "count", len(commits))
|
||||
err = i.commitStore.InsertCommits(ctx, repoId, commits, fmt.Sprintf("|repoName:%s|repoId:%d", repoName, repoId))
|
||||
log15.Debug("indexing commits", "repo_id", repoId, "count", len(commits), "indexedThrough", indexedThrough)
|
||||
err = i.commitStore.InsertCommits(ctx, repoId, commits, indexedThrough, fmt.Sprintf("|repoName:%s|repoId:%d", repoName, repoId))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to update commit index repo_id: %v", repoId)
|
||||
return false, errors.Wrapf(err, "unable to update commit index repo_id: %v", repoId)
|
||||
}
|
||||
|
||||
return nil
|
||||
return moreWindows, nil
|
||||
}
|
||||
|
||||
// getCommits fetches the commits from the remote gitserver for a repository after a certain time.
|
||||
func getCommits(ctx context.Context, db database.DB, name api.RepoName, after time.Time, operation *observation.Operation) (_ []*gitdomain.Commit, err error) {
|
||||
func getCommits(ctx context.Context, db database.DB, name api.RepoName, after time.Time, until *time.Time, operation *observation.Operation) (_ []*gitdomain.Commit, err error) {
|
||||
ctx, endObservation := operation.With(ctx, &err, observation.Args{})
|
||||
defer endObservation(1, observation.Args{})
|
||||
|
||||
return git.Commits(ctx, db, name, git.CommitsOptions{N: 0, DateOrder: true, NoEnsureRevision: true, After: after.Format(time.RFC3339)}, authz.DefaultSubRepoPermsChecker)
|
||||
before := ""
|
||||
if until != nil {
|
||||
before = until.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
return git.Commits(ctx, db, name, git.CommitsOptions{N: 0, DateOrder: true, NoEnsureRevision: true, After: after.Format(time.RFC3339), Before: before}, authz.DefaultSubRepoPermsChecker)
|
||||
}
|
||||
|
||||
// getMetadata gets the index metadata for a repository. The metadata will be generated if it doesn't already exist, such as
|
||||
@ -184,7 +214,7 @@ func getCommits(ctx context.Context, db database.DB, name api.RepoName, after ti
|
||||
func getMetadata(ctx context.Context, id api.RepoID, store CommitStore) (CommitIndexMetadata, error) {
|
||||
metadata, err := store.GetMetadata(ctx, id)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
metadata, err = store.UpsertMetadataStamp(ctx, id)
|
||||
metadata, err = store.UpsertMetadataStamp(ctx, id, time.Time{}.UTC())
|
||||
}
|
||||
if err != nil {
|
||||
return CommitIndexMetadata{}, err
|
||||
|
||||
@ -10,23 +10,29 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
|
||||
"github.com/sourcegraph/sourcegraph/internal/observation"
|
||||
"github.com/sourcegraph/sourcegraph/schema"
|
||||
)
|
||||
|
||||
var ops *operations = newOperations(&observation.TestContext)
|
||||
|
||||
func TestCommitIndexer_indexAll(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
commitStore := NewMockCommitStore()
|
||||
|
||||
maxHistorical := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
clock := func() time.Time { return time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) }
|
||||
|
||||
indexer := CommitIndexer{
|
||||
limiter: rate.NewLimiter(10, 1),
|
||||
commitStore: commitStore,
|
||||
maxHistoricalTime: maxHistorical,
|
||||
background: context.Background(),
|
||||
operations: newOperations(&observation.TestContext),
|
||||
operations: ops,
|
||||
clock: clock,
|
||||
}
|
||||
|
||||
// Testing a scenario with 3 repos
|
||||
@ -54,22 +60,29 @@ func TestCommitIndexer_indexAll(t *testing.T) {
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 1,
|
||||
Enabled: false,
|
||||
LastIndexedAt: time.Now(),
|
||||
LastIndexedAt: time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 2,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Now(),
|
||||
LastIndexedAt: time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 3,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Now(),
|
||||
LastIndexedAt: time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
t.Run("multi_repository", func(t *testing.T) {
|
||||
windowDuration := 0
|
||||
conf.Mock(&conf.Unified{
|
||||
SiteConfiguration: schema.SiteConfiguration{
|
||||
InsightsCommitIndexerWindowDuration: windowDuration,
|
||||
},
|
||||
})
|
||||
defer conf.Mock(nil)
|
||||
err := indexer.indexAll(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -80,27 +93,28 @@ func TestCommitIndexer_indexAll(t *testing.T) {
|
||||
t.Errorf("got GetMetadata invocations: %v want %v", got, want)
|
||||
}
|
||||
|
||||
// Only one repository should actually update any commits
|
||||
if got, want := len(commitStore.InsertCommitsFunc.history), 1; got != want {
|
||||
// Both enabled repositories should call insert commits
|
||||
if got, want := len(commitStore.InsertCommitsFunc.history), 2; got != want {
|
||||
t.Errorf("got InsertCommits invocations: %v want %v", got, want)
|
||||
} else {
|
||||
call := commitStore.InsertCommitsFunc.history[0]
|
||||
for i, got := range call.Arg2 {
|
||||
if diff := cmp.Diff(commits["really-big-repo"][i], got); diff != "" {
|
||||
t.Errorf("unexpected commit\n%s", diff)
|
||||
calls := map[string]CommitStoreInsertCommitsFuncCall{
|
||||
"really-big-repo": commitStore.InsertCommitsFunc.history[0],
|
||||
"no-commits": commitStore.InsertCommitsFunc.history[1],
|
||||
}
|
||||
for repo, call := range calls {
|
||||
// Check Indexed though is the clock time
|
||||
if diff := cmp.Diff(clock(), call.Arg3); diff != "" {
|
||||
t.Errorf("unexpected indexed though date/time")
|
||||
}
|
||||
// Check the correct commits
|
||||
for i, got := range call.Arg2 {
|
||||
if diff := cmp.Diff(commits[repo][i], got); diff != "" {
|
||||
t.Errorf("unexpected commit\n%s", diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// One repository had no commits, so only the timestamp would get updated
|
||||
if got, want := len(commitStore.UpsertMetadataStampFunc.history), 1; got != want {
|
||||
t.Errorf("got UpsertMetadataStamp invocations: %v want %v", got, want)
|
||||
} else {
|
||||
call := commitStore.UpsertMetadataStampFunc.history[0]
|
||||
if call.Arg1 != 2 {
|
||||
t.Errorf("unexpected repository for UpsertMetadataStamp repo_id: %v", call.Arg1)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -171,6 +185,187 @@ func Test_getMetadata_NoInsertRequired(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestCommitIndexer_windowing(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
commitStore := NewMockCommitStore()
|
||||
|
||||
maxHistorical := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
clock := func() time.Time { return time.Date(2020, time.June, 1, 0, 0, 0, 0, time.UTC) }
|
||||
|
||||
indexer := CommitIndexer{
|
||||
limiter: rate.NewLimiter(10, 1),
|
||||
commitStore: commitStore,
|
||||
maxHistoricalTime: maxHistorical,
|
||||
background: context.Background(),
|
||||
operations: ops,
|
||||
clock: clock,
|
||||
}
|
||||
|
||||
// Testing a scenario with 3 repos and a window of 30 days
|
||||
// "repo-one" has been recently indexed and all commits are in one window
|
||||
// "really-big-repo" has 2 windows of commits
|
||||
// "no-commits-recent" has no commits and was recently indexed
|
||||
// "no-commits-not-recent" has no commits but is 2 windows behind on indexing
|
||||
commits := map[string][]*gitdomain.Commit{
|
||||
"repo-one": {
|
||||
commit("ref1", "2020-05-10T00:00:00+00:00"),
|
||||
commit("ref2", "2020-05-12T00:00:00+00:00"),
|
||||
},
|
||||
"really-big-repo": {
|
||||
commit("bigref1", "2020-04-17T00:00:00+00:00"),
|
||||
commit("bigref2", "2020-04-18T00:00:00+00:00"),
|
||||
commit("bigref3", "2020-05-17T00:00:00+00:00"),
|
||||
commit("bigref4", "2020-05-18T00:00:00+00:00"),
|
||||
},
|
||||
"no-commits-recent": {},
|
||||
"no-commits-not-recent": {},
|
||||
"only-recent": {
|
||||
commit("bigref4", "2020-05-18T00:00:00+00:00"),
|
||||
},
|
||||
}
|
||||
indexer.getCommits = mockCommits(commits)
|
||||
indexer.allReposIterator = mockIterator([]string{"repo-one", "really-big-repo", "no-commits-recent", "no-commits-not-recent", "only-recent"})
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 1,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 2,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Date(2020, time.April, 5, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 2,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 3,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 4,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Date(2020, time.April, 5, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 4,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 5,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Date(2020, time.April, 5, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
commitStore.GetMetadataFunc.PushReturn(CommitIndexMetadata{
|
||||
RepoId: 5,
|
||||
Enabled: true,
|
||||
LastIndexedAt: time.Date(2020, time.May, 5, 0, 0, 0, 0, time.UTC),
|
||||
}, nil)
|
||||
|
||||
endOfApril5Window := time.Date(2020, time.April, 5, 0, 0, 0, 0, time.UTC).Add(24 * 30 * time.Hour)
|
||||
|
||||
t.Run("multi_repository_paging", func(t *testing.T) {
|
||||
|
||||
conf.Mock(&conf.Unified{
|
||||
SiteConfiguration: schema.SiteConfiguration{
|
||||
InsightsCommitIndexerWindowDuration: 30,
|
||||
},
|
||||
})
|
||||
defer conf.Mock(nil)
|
||||
err := indexer.indexAll(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// 4 enabled repos get metadata, repo 2, 4 and 5 need 2 windows all others just 1
|
||||
if got, want := len(commitStore.GetMetadataFunc.history), 8; got != want {
|
||||
t.Errorf("got GetMetadata invocations: %v want %v", got, want)
|
||||
}
|
||||
|
||||
// Each time though we call insert commits even if there are none repo 2, 4 and 5 need 2 windows so 8 total
|
||||
if got, want := len(commitStore.InsertCommitsFunc.history), 8; got != want {
|
||||
t.Errorf("got InsertCommits invocations: %v want %v", got, want)
|
||||
} else {
|
||||
|
||||
/* repo one
|
||||
** All commits present and sets last indexed to the clock time
|
||||
*/
|
||||
checkCommits(t, commits["repo-one"], commitStore.InsertCommitsFunc.history[0].Arg2)
|
||||
checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[0].Arg3)
|
||||
|
||||
/* really-big-repo
|
||||
** Last indexed more than 1 window ago so needs to make 2 passes
|
||||
** First Pass:
|
||||
** First two commits and sets last indxed to the end of the time window (last_indexed + 30 days)
|
||||
** Second Pass:
|
||||
** Last two commits and sets last indexed to clock time because end of window was greater than clock
|
||||
*/
|
||||
checkCommits(t, commits["really-big-repo"][:2], commitStore.InsertCommitsFunc.history[1].Arg2)
|
||||
checkIndexedThough(t, endOfApril5Window, commitStore.InsertCommitsFunc.history[1].Arg3)
|
||||
checkCommits(t, commits["really-big-repo"][2:], commitStore.InsertCommitsFunc.history[2].Arg2)
|
||||
checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[2].Arg3)
|
||||
|
||||
/* no-commits-recent
|
||||
** There are no commits to save and sets last indexed to the clock time
|
||||
*/
|
||||
checkCommits(t, []*gitdomain.Commit{}, commitStore.InsertCommitsFunc.history[3].Arg2)
|
||||
checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[3].Arg3)
|
||||
|
||||
/* no-commits-not-recent
|
||||
** Last indexed is more than 1 window agao so need to make 2 passes
|
||||
** First Pass:
|
||||
** No commits to save and sets last indxed to the end of the time window (last_indexed + 30 days)
|
||||
** Second Pass:
|
||||
** Still no commits and sets last indexed to clock time
|
||||
*/
|
||||
checkCommits(t, []*gitdomain.Commit{}, commitStore.InsertCommitsFunc.history[4].Arg2)
|
||||
checkIndexedThough(t, endOfApril5Window, commitStore.InsertCommitsFunc.history[4].Arg3)
|
||||
checkCommits(t, []*gitdomain.Commit{}, commitStore.InsertCommitsFunc.history[5].Arg2)
|
||||
checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[5].Arg3)
|
||||
|
||||
/* only-recent
|
||||
** Last indexed is more than 1 window agao so need to make 2 passes
|
||||
** First Pass:
|
||||
** No commits to save and sets last indxed to the end of the time window (last_indexed + 30 days)
|
||||
** Second Pass:
|
||||
** Saves the 1 commit and sets last indexed to clock time
|
||||
*/
|
||||
checkCommits(t, []*gitdomain.Commit{}, commitStore.InsertCommitsFunc.history[6].Arg2)
|
||||
checkIndexedThough(t, endOfApril5Window, commitStore.InsertCommitsFunc.history[6].Arg3)
|
||||
checkCommits(t, commits["only-recent"], commitStore.InsertCommitsFunc.history[7].Arg2)
|
||||
checkIndexedThough(t, clock().UTC(), commitStore.InsertCommitsFunc.history[7].Arg3)
|
||||
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func checkCommits(t *testing.T, want []*gitdomain.Commit, got []*gitdomain.Commit) {
|
||||
for i, commit := range got {
|
||||
if diff := cmp.Diff(want[i], commit); diff != "" {
|
||||
t.Errorf("unexpected commit\n%s", diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkIndexedThough(t *testing.T, want time.Time, got time.Time) {
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Errorf("unexpected indexed through date\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// mockIterator generates iterator methods given a list of repo names for test scenarios
|
||||
func mockIterator(repos []string) func(ctx context.Context, each func(repoName string, id api.RepoID) error) error {
|
||||
return func(ctx context.Context, each func(repoName string, id api.RepoID) error) error {
|
||||
@ -194,8 +389,18 @@ func commit(ref string, commitTime string) *gitdomain.Commit {
|
||||
}
|
||||
}
|
||||
|
||||
func mockCommits(commits map[string][]*gitdomain.Commit) func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) {
|
||||
return func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) {
|
||||
return commits[(string(name))], nil
|
||||
func mockCommits(commits map[string][]*gitdomain.Commit) func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, until *time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) {
|
||||
return func(ctx context.Context, db database.DB, name api.RepoName, after time.Time, until *time.Time, operation *observation.Operation) ([]*gitdomain.Commit, error) {
|
||||
filteredCommits := make([]*gitdomain.Commit, 0)
|
||||
for _, commit := range commits[string(name)] {
|
||||
if commit.Committer.Date.Before(after) {
|
||||
continue
|
||||
}
|
||||
if until != nil && commit.Committer.Date.After(*until) {
|
||||
continue
|
||||
}
|
||||
filteredCommits = append(filteredCommits, commit)
|
||||
}
|
||||
return filteredCommits, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -1809,6 +1809,8 @@ type SiteConfiguration struct {
|
||||
HtmlHeadTop string `json:"htmlHeadTop,omitempty"`
|
||||
// InsightsCommitIndexerInterval description: The interval (in minutes) at which the insights commit indexer will check for new commits.
|
||||
InsightsCommitIndexerInterval int `json:"insights.commit.indexer.interval,omitempty"`
|
||||
// InsightsCommitIndexerWindowDuration description: The number of days of commits the insights commit indexer will pull during each request (0 is no limit).
|
||||
InsightsCommitIndexerWindowDuration int `json:"insights.commit.indexer.windowDuration,omitempty"`
|
||||
// InsightsHistoricalFrameLength description: (debug) duration of historical insights timeframes, one point per repository will be recorded in each timeframe.
|
||||
InsightsHistoricalFrameLength string `json:"insights.historical.frameLength,omitempty"`
|
||||
// InsightsHistoricalFrames description: (debug) number of historical insights timeframes to populate
|
||||
|
||||
@ -1195,6 +1195,13 @@
|
||||
"default": 60,
|
||||
"examples": [120]
|
||||
},
|
||||
"insights.commit.indexer.windowDuration": {
|
||||
"description": "The number of days of commits the insights commit indexer will pull during each request (0 is no limit).",
|
||||
"type": "integer",
|
||||
"group": "CodeInsights",
|
||||
"default": 0,
|
||||
"examples": [30]
|
||||
},
|
||||
"htmlHeadTop": {
|
||||
"description": "HTML to inject at the top of the `<head>` element on each page, for analytics scripts",
|
||||
"type": "string",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user