codeintel: Add job to reconcile lsif_uploads records with missing codeintel-db data (#43838)

This commit is contained in:
Eric Fritz 2022-11-04 07:47:19 -05:00 committed by GitHub
parent fc48070b62
commit fb3e745a58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 901 additions and 75 deletions

View File

@ -28,12 +28,12 @@ func TestGetUploadsForRanking(t *testing.T) {
INSERT INTO repo (id, name, deleted_at) VALUES (51, 'bar', NULL);
INSERT INTO repo (id, name, deleted_at) VALUES (52, 'baz', NULL);
INSERT INTO repo (id, name, deleted_at) VALUES (53, 'del', NOW());
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (100, 50, '0000000000000000000000000000000000000001', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (101, 50, '0000000000000000000000000000000000000002', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (102, 51, '0000000000000000000000000000000000000003', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (103, 52, '0000000000000000000000000000000000000004', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (104, 52, '0000000000000000000000000000000000000005', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (105, 53, '0000000000000000000000000000000000000006', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (100, 50, '0000000000000000000000000000000000000001', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (101, 50, '0000000000000000000000000000000000000002', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (102, 51, '0000000000000000000000000000000000000003', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (103, 52, '0000000000000000000000000000000000000004', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (104, 52, '0000000000000000000000000000000000000005', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (105, 53, '0000000000000000000000000000000000000006', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads_visible_at_tip (upload_id, repository_id, is_default_branch) VALUES (100, 50, true);
INSERT INTO lsif_uploads_visible_at_tip (upload_id, repository_id, is_default_branch) VALUES (102, 51, true);
INSERT INTO lsif_uploads_visible_at_tip (upload_id, repository_id, is_default_branch) VALUES (103, 52, true);
@ -81,12 +81,12 @@ func TestProcessStaleExportedUplods(t *testing.T) {
INSERT INTO repo (id, name, deleted_at) VALUES (50, 'foo', NULL);
INSERT INTO repo (id, name, deleted_at) VALUES (51, 'bar', NULL);
INSERT INTO repo (id, name, deleted_at) VALUES (52, 'baz', NULL);
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (100, 50, '0000000000000000000000000000000000000001', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (101, 50, '0000000000000000000000000000000000000002', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (102, 50, '0000000000000000000000000000000000000003', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (103, 51, '0000000000000000000000000000000000000004', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (104, 51, '0000000000000000000000000000000000000005', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts) VALUES (105, 52, '0000000000000000000000000000000000000006', 'lsif-test', 1, '{}');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (100, 50, '0000000000000000000000000000000000000001', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (101, 50, '0000000000000000000000000000000000000002', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (102, 50, '0000000000000000000000000000000000000003', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (103, 51, '0000000000000000000000000000000000000004', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (104, 51, '0000000000000000000000000000000000000005', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (105, 52, '0000000000000000000000000000000000000006', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads_visible_at_tip (upload_id, repository_id, is_default_branch) VALUES (100, 50, true);
INSERT INTO lsif_uploads_visible_at_tip (upload_id, repository_id, is_default_branch) VALUES (103, 51, true);
INSERT INTO lsif_uploads_visible_at_tip (upload_id, repository_id, is_default_branch) VALUES (105, 52, true);

View File

@ -53,8 +53,10 @@ type UploadService interface {
// Utils
GetWorkerutilStore() dbworkerstore.Store
ReconcileCandidates(ctx context.Context, batchSize int) ([]int, error)
FrontendReconcileCandidates(ctx context.Context, batchSize int) ([]int, error)
CodeIntelDBReconcileCandidates(ctx context.Context, batchSize int) ([]int, error)
GetDumpsByIDs(ctx context.Context, ids []int) ([]types.Dump, error)
IDsWithMeta(ctx context.Context, ids []int) ([]int, error)
}
type GitserverClient interface {

View File

@ -17,12 +17,61 @@ func (b backgroundJob) NewReconciler(
}
func (b backgroundJob) handleReconcile(ctx context.Context, batchSize int) (err error) {
ids, err := b.uploadSvc.ReconcileCandidates(ctx, batchSize)
if err := b.handleReconcileFromFrontend(ctx, batchSize); err != nil {
return err
}
if err := b.handleReconcileFromCodeintelDB(ctx, batchSize); err != nil {
return err
}
return nil
}
// handleReconcileFromFrontend marks upload records that has no resolvable data in the codeintel-db.
func (b backgroundJob) handleReconcileFromFrontend(ctx context.Context, batchSize int) (err error) {
ids, err := b.uploadSvc.FrontendReconcileCandidates(ctx, batchSize)
if err != nil {
return err
}
b.operations.numReconcileScans.Add(float64(len(ids)))
b.operations.numReconcileScansFromFrontend.Add(float64(len(ids)))
idsWithMeta, err := b.uploadSvc.IDsWithMeta(ctx, ids)
if err != nil {
return err
}
found := map[int]struct{}{}
for _, id := range idsWithMeta {
found[id] = struct{}{}
}
abandoned := ids[:0]
for _, id := range ids {
if _, ok := found[id]; ok {
continue
}
abandoned = append(abandoned, id)
}
// In the future we'll also want to explicitly mark these uploads as missing precise data so that
// they can be re-indexed or removed by an automatic janitor process. For now we just want to know
// *IF* this condition happens, so a Prometheus metric is sufficient.
b.operations.numReconcileDeletesFromFrontend.Add(float64(len(abandoned)))
return nil
}
// handleReconcileFromCodeintelDB removes data from the codeintel-db that has no correlated upload
// in the frontend database.
func (b backgroundJob) handleReconcileFromCodeintelDB(ctx context.Context, batchSize int) (err error) {
ids, err := b.uploadSvc.CodeIntelDBReconcileCandidates(ctx, batchSize)
if err != nil {
return err
}
b.operations.numReconcileScansFromCodeIntelDB.Add(float64(len(ids)))
dumps, err := b.uploadSvc.GetDumpsByIDs(ctx, ids)
if err != nil {
@ -47,6 +96,6 @@ func (b backgroundJob) handleReconcile(ctx context.Context, batchSize int) (err
return err
}
b.operations.numReconcileDeletes.Add(float64(len(abandoned)))
b.operations.numReconcileDeletesFromCodeIntelDB.Add(float64(len(abandoned)))
return nil
}

View File

@ -2030,6 +2030,10 @@ type MockUploadService struct {
// BackfillCommittedAtBatchFunc is an instance of a mock function object
// controlling the behavior of the method BackfillCommittedAtBatch.
BackfillCommittedAtBatchFunc *UploadServiceBackfillCommittedAtBatchFunc
// CodeIntelDBReconcileCandidatesFunc is an instance of a mock function
// object controlling the behavior of the method
// CodeIntelDBReconcileCandidates.
CodeIntelDBReconcileCandidatesFunc *UploadServiceCodeIntelDBReconcileCandidatesFunc
// DeleteLsifDataByUploadIdsFunc is an instance of a mock function
// object controlling the behavior of the method
// DeleteLsifDataByUploadIds.
@ -2048,6 +2052,10 @@ type MockUploadService struct {
// object controlling the behavior of the method
// DeleteUploadsWithoutRepository.
DeleteUploadsWithoutRepositoryFunc *UploadServiceDeleteUploadsWithoutRepositoryFunc
// FrontendReconcileCandidatesFunc is an instance of a mock function
// object controlling the behavior of the method
// FrontendReconcileCandidates.
FrontendReconcileCandidatesFunc *UploadServiceFrontendReconcileCandidatesFunc
// GetDirtyRepositoriesFunc is an instance of a mock function object
// controlling the behavior of the method GetDirtyRepositories.
GetDirtyRepositoriesFunc *UploadServiceGetDirtyRepositoriesFunc
@ -2077,9 +2085,9 @@ type MockUploadService struct {
// HardDeleteUploadsByIDsFunc is an instance of a mock function object
// controlling the behavior of the method HardDeleteUploadsByIDs.
HardDeleteUploadsByIDsFunc *UploadServiceHardDeleteUploadsByIDsFunc
// ReconcileCandidatesFunc is an instance of a mock function object
// controlling the behavior of the method ReconcileCandidates.
ReconcileCandidatesFunc *UploadServiceReconcileCandidatesFunc
// IDsWithMetaFunc is an instance of a mock function object controlling
// the behavior of the method IDsWithMeta.
IDsWithMetaFunc *UploadServiceIDsWithMetaFunc
// SetRepositoriesForRetentionScanFunc is an instance of a mock function
// object controlling the behavior of the method
// SetRepositoriesForRetentionScan.
@ -2109,6 +2117,11 @@ func NewMockUploadService() *MockUploadService {
return
},
},
CodeIntelDBReconcileCandidatesFunc: &UploadServiceCodeIntelDBReconcileCandidatesFunc{
defaultHook: func(context.Context, int) (r0 []int, r1 error) {
return
},
},
DeleteLsifDataByUploadIdsFunc: &UploadServiceDeleteLsifDataByUploadIdsFunc{
defaultHook: func(context.Context, ...int) (r0 error) {
return
@ -2134,6 +2147,11 @@ func NewMockUploadService() *MockUploadService {
return
},
},
FrontendReconcileCandidatesFunc: &UploadServiceFrontendReconcileCandidatesFunc{
defaultHook: func(context.Context, int) (r0 []int, r1 error) {
return
},
},
GetDirtyRepositoriesFunc: &UploadServiceGetDirtyRepositoriesFunc{
defaultHook: func(context.Context) (r0 map[int]int, r1 error) {
return
@ -2179,8 +2197,8 @@ func NewMockUploadService() *MockUploadService {
return
},
},
ReconcileCandidatesFunc: &UploadServiceReconcileCandidatesFunc{
defaultHook: func(context.Context, int) (r0 []int, r1 error) {
IDsWithMetaFunc: &UploadServiceIDsWithMetaFunc{
defaultHook: func(context.Context, []int) (r0 []int, r1 error) {
return
},
},
@ -2221,6 +2239,11 @@ func NewStrictMockUploadService() *MockUploadService {
panic("unexpected invocation of MockUploadService.BackfillCommittedAtBatch")
},
},
CodeIntelDBReconcileCandidatesFunc: &UploadServiceCodeIntelDBReconcileCandidatesFunc{
defaultHook: func(context.Context, int) ([]int, error) {
panic("unexpected invocation of MockUploadService.CodeIntelDBReconcileCandidates")
},
},
DeleteLsifDataByUploadIdsFunc: &UploadServiceDeleteLsifDataByUploadIdsFunc{
defaultHook: func(context.Context, ...int) error {
panic("unexpected invocation of MockUploadService.DeleteLsifDataByUploadIds")
@ -2246,6 +2269,11 @@ func NewStrictMockUploadService() *MockUploadService {
panic("unexpected invocation of MockUploadService.DeleteUploadsWithoutRepository")
},
},
FrontendReconcileCandidatesFunc: &UploadServiceFrontendReconcileCandidatesFunc{
defaultHook: func(context.Context, int) ([]int, error) {
panic("unexpected invocation of MockUploadService.FrontendReconcileCandidates")
},
},
GetDirtyRepositoriesFunc: &UploadServiceGetDirtyRepositoriesFunc{
defaultHook: func(context.Context) (map[int]int, error) {
panic("unexpected invocation of MockUploadService.GetDirtyRepositories")
@ -2291,9 +2319,9 @@ func NewStrictMockUploadService() *MockUploadService {
panic("unexpected invocation of MockUploadService.HardDeleteUploadsByIDs")
},
},
ReconcileCandidatesFunc: &UploadServiceReconcileCandidatesFunc{
defaultHook: func(context.Context, int) ([]int, error) {
panic("unexpected invocation of MockUploadService.ReconcileCandidates")
IDsWithMetaFunc: &UploadServiceIDsWithMetaFunc{
defaultHook: func(context.Context, []int) ([]int, error) {
panic("unexpected invocation of MockUploadService.IDsWithMeta")
},
},
SetRepositoriesForRetentionScanFunc: &UploadServiceSetRepositoriesForRetentionScanFunc{
@ -2332,6 +2360,9 @@ func NewMockUploadServiceFrom(i UploadService) *MockUploadService {
BackfillCommittedAtBatchFunc: &UploadServiceBackfillCommittedAtBatchFunc{
defaultHook: i.BackfillCommittedAtBatch,
},
CodeIntelDBReconcileCandidatesFunc: &UploadServiceCodeIntelDBReconcileCandidatesFunc{
defaultHook: i.CodeIntelDBReconcileCandidates,
},
DeleteLsifDataByUploadIdsFunc: &UploadServiceDeleteLsifDataByUploadIdsFunc{
defaultHook: i.DeleteLsifDataByUploadIds,
},
@ -2347,6 +2378,9 @@ func NewMockUploadServiceFrom(i UploadService) *MockUploadService {
DeleteUploadsWithoutRepositoryFunc: &UploadServiceDeleteUploadsWithoutRepositoryFunc{
defaultHook: i.DeleteUploadsWithoutRepository,
},
FrontendReconcileCandidatesFunc: &UploadServiceFrontendReconcileCandidatesFunc{
defaultHook: i.FrontendReconcileCandidates,
},
GetDirtyRepositoriesFunc: &UploadServiceGetDirtyRepositoriesFunc{
defaultHook: i.GetDirtyRepositories,
},
@ -2374,8 +2408,8 @@ func NewMockUploadServiceFrom(i UploadService) *MockUploadService {
HardDeleteUploadsByIDsFunc: &UploadServiceHardDeleteUploadsByIDsFunc{
defaultHook: i.HardDeleteUploadsByIDs,
},
ReconcileCandidatesFunc: &UploadServiceReconcileCandidatesFunc{
defaultHook: i.ReconcileCandidates,
IDsWithMetaFunc: &UploadServiceIDsWithMetaFunc{
defaultHook: i.IDsWithMeta,
},
SetRepositoriesForRetentionScanFunc: &UploadServiceSetRepositoriesForRetentionScanFunc{
defaultHook: i.SetRepositoriesForRetentionScan,
@ -2504,6 +2538,118 @@ func (c UploadServiceBackfillCommittedAtBatchFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// UploadServiceCodeIntelDBReconcileCandidatesFunc describes the behavior
// when the CodeIntelDBReconcileCandidates method of the parent
// MockUploadService instance is invoked.
type UploadServiceCodeIntelDBReconcileCandidatesFunc struct {
defaultHook func(context.Context, int) ([]int, error)
hooks []func(context.Context, int) ([]int, error)
history []UploadServiceCodeIntelDBReconcileCandidatesFuncCall
mutex sync.Mutex
}
// CodeIntelDBReconcileCandidates delegates to the next hook function in the
// queue and stores the parameter and result values of this invocation.
func (m *MockUploadService) CodeIntelDBReconcileCandidates(v0 context.Context, v1 int) ([]int, error) {
r0, r1 := m.CodeIntelDBReconcileCandidatesFunc.nextHook()(v0, v1)
m.CodeIntelDBReconcileCandidatesFunc.appendCall(UploadServiceCodeIntelDBReconcileCandidatesFuncCall{v0, v1, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the
// CodeIntelDBReconcileCandidates method of the parent MockUploadService
// instance is invoked and the hook queue is empty.
func (f *UploadServiceCodeIntelDBReconcileCandidatesFunc) SetDefaultHook(hook func(context.Context, int) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// CodeIntelDBReconcileCandidates method of the parent MockUploadService
// instance invokes the hook at the front of the queue and discards it.
// After the queue is empty, the default hook function is invoked for any
// future action.
func (f *UploadServiceCodeIntelDBReconcileCandidatesFunc) PushHook(hook func(context.Context, int) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *UploadServiceCodeIntelDBReconcileCandidatesFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, int) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *UploadServiceCodeIntelDBReconcileCandidatesFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, int) ([]int, error) {
return r0, r1
})
}
func (f *UploadServiceCodeIntelDBReconcileCandidatesFunc) nextHook() func(context.Context, int) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *UploadServiceCodeIntelDBReconcileCandidatesFunc) appendCall(r0 UploadServiceCodeIntelDBReconcileCandidatesFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of
// UploadServiceCodeIntelDBReconcileCandidatesFuncCall objects describing
// the invocations of this function.
func (f *UploadServiceCodeIntelDBReconcileCandidatesFunc) History() []UploadServiceCodeIntelDBReconcileCandidatesFuncCall {
f.mutex.Lock()
history := make([]UploadServiceCodeIntelDBReconcileCandidatesFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// UploadServiceCodeIntelDBReconcileCandidatesFuncCall is an object that
// describes an invocation of method CodeIntelDBReconcileCandidates on an
// instance of MockUploadService.
type UploadServiceCodeIntelDBReconcileCandidatesFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 int
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c UploadServiceCodeIntelDBReconcileCandidatesFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c UploadServiceCodeIntelDBReconcileCandidatesFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// UploadServiceDeleteLsifDataByUploadIdsFunc describes the behavior when
// the DeleteLsifDataByUploadIds method of the parent MockUploadService
// instance is invoked.
@ -3081,6 +3227,118 @@ func (c UploadServiceDeleteUploadsWithoutRepositoryFuncCall) Results() []interfa
return []interface{}{c.Result0, c.Result1}
}
// UploadServiceFrontendReconcileCandidatesFunc describes the behavior when
// the FrontendReconcileCandidates method of the parent MockUploadService
// instance is invoked.
type UploadServiceFrontendReconcileCandidatesFunc struct {
defaultHook func(context.Context, int) ([]int, error)
hooks []func(context.Context, int) ([]int, error)
history []UploadServiceFrontendReconcileCandidatesFuncCall
mutex sync.Mutex
}
// FrontendReconcileCandidates delegates to the next hook function in the
// queue and stores the parameter and result values of this invocation.
func (m *MockUploadService) FrontendReconcileCandidates(v0 context.Context, v1 int) ([]int, error) {
r0, r1 := m.FrontendReconcileCandidatesFunc.nextHook()(v0, v1)
m.FrontendReconcileCandidatesFunc.appendCall(UploadServiceFrontendReconcileCandidatesFuncCall{v0, v1, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the
// FrontendReconcileCandidates method of the parent MockUploadService
// instance is invoked and the hook queue is empty.
func (f *UploadServiceFrontendReconcileCandidatesFunc) SetDefaultHook(hook func(context.Context, int) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// FrontendReconcileCandidates method of the parent MockUploadService
// instance invokes the hook at the front of the queue and discards it.
// After the queue is empty, the default hook function is invoked for any
// future action.
func (f *UploadServiceFrontendReconcileCandidatesFunc) PushHook(hook func(context.Context, int) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *UploadServiceFrontendReconcileCandidatesFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, int) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *UploadServiceFrontendReconcileCandidatesFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, int) ([]int, error) {
return r0, r1
})
}
func (f *UploadServiceFrontendReconcileCandidatesFunc) nextHook() func(context.Context, int) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *UploadServiceFrontendReconcileCandidatesFunc) appendCall(r0 UploadServiceFrontendReconcileCandidatesFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of
// UploadServiceFrontendReconcileCandidatesFuncCall objects describing the
// invocations of this function.
func (f *UploadServiceFrontendReconcileCandidatesFunc) History() []UploadServiceFrontendReconcileCandidatesFuncCall {
f.mutex.Lock()
history := make([]UploadServiceFrontendReconcileCandidatesFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// UploadServiceFrontendReconcileCandidatesFuncCall is an object that
// describes an invocation of method FrontendReconcileCandidates on an
// instance of MockUploadService.
type UploadServiceFrontendReconcileCandidatesFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 int
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c UploadServiceFrontendReconcileCandidatesFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c UploadServiceFrontendReconcileCandidatesFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// UploadServiceGetDirtyRepositoriesFunc describes the behavior when the
// GetDirtyRepositories method of the parent MockUploadService instance is
// invoked.
@ -4081,37 +4339,35 @@ func (c UploadServiceHardDeleteUploadsByIDsFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// UploadServiceReconcileCandidatesFunc describes the behavior when the
// ReconcileCandidates method of the parent MockUploadService instance is
// invoked.
type UploadServiceReconcileCandidatesFunc struct {
defaultHook func(context.Context, int) ([]int, error)
hooks []func(context.Context, int) ([]int, error)
history []UploadServiceReconcileCandidatesFuncCall
// UploadServiceIDsWithMetaFunc describes the behavior when the IDsWithMeta
// method of the parent MockUploadService instance is invoked.
type UploadServiceIDsWithMetaFunc struct {
defaultHook func(context.Context, []int) ([]int, error)
hooks []func(context.Context, []int) ([]int, error)
history []UploadServiceIDsWithMetaFuncCall
mutex sync.Mutex
}
// ReconcileCandidates delegates to the next hook function in the queue and
// stores the parameter and result values of this invocation.
func (m *MockUploadService) ReconcileCandidates(v0 context.Context, v1 int) ([]int, error) {
r0, r1 := m.ReconcileCandidatesFunc.nextHook()(v0, v1)
m.ReconcileCandidatesFunc.appendCall(UploadServiceReconcileCandidatesFuncCall{v0, v1, r0, r1})
// IDsWithMeta delegates to the next hook function in the queue and stores
// the parameter and result values of this invocation.
func (m *MockUploadService) IDsWithMeta(v0 context.Context, v1 []int) ([]int, error) {
r0, r1 := m.IDsWithMetaFunc.nextHook()(v0, v1)
m.IDsWithMetaFunc.appendCall(UploadServiceIDsWithMetaFuncCall{v0, v1, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the ReconcileCandidates
// method of the parent MockUploadService instance is invoked and the hook
// queue is empty.
func (f *UploadServiceReconcileCandidatesFunc) SetDefaultHook(hook func(context.Context, int) ([]int, error)) {
// SetDefaultHook sets function that is called when the IDsWithMeta method
// of the parent MockUploadService instance is invoked and the hook queue is
// empty.
func (f *UploadServiceIDsWithMetaFunc) SetDefaultHook(hook func(context.Context, []int) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// ReconcileCandidates method of the parent MockUploadService instance
// invokes the hook at the front of the queue and discards it. After the
// queue is empty, the default hook function is invoked for any future
// action.
func (f *UploadServiceReconcileCandidatesFunc) PushHook(hook func(context.Context, int) ([]int, error)) {
// IDsWithMeta method of the parent MockUploadService instance invokes the
// hook at the front of the queue and discards it. After the queue is empty,
// the default hook function is invoked for any future action.
func (f *UploadServiceIDsWithMetaFunc) PushHook(hook func(context.Context, []int) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
@ -4119,20 +4375,20 @@ func (f *UploadServiceReconcileCandidatesFunc) PushHook(hook func(context.Contex
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *UploadServiceReconcileCandidatesFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, int) ([]int, error) {
func (f *UploadServiceIDsWithMetaFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, []int) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *UploadServiceReconcileCandidatesFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, int) ([]int, error) {
func (f *UploadServiceIDsWithMetaFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, []int) ([]int, error) {
return r0, r1
})
}
func (f *UploadServiceReconcileCandidatesFunc) nextHook() func(context.Context, int) ([]int, error) {
func (f *UploadServiceIDsWithMetaFunc) nextHook() func(context.Context, []int) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
@ -4145,33 +4401,32 @@ func (f *UploadServiceReconcileCandidatesFunc) nextHook() func(context.Context,
return hook
}
func (f *UploadServiceReconcileCandidatesFunc) appendCall(r0 UploadServiceReconcileCandidatesFuncCall) {
func (f *UploadServiceIDsWithMetaFunc) appendCall(r0 UploadServiceIDsWithMetaFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of UploadServiceReconcileCandidatesFuncCall
// objects describing the invocations of this function.
func (f *UploadServiceReconcileCandidatesFunc) History() []UploadServiceReconcileCandidatesFuncCall {
// History returns a sequence of UploadServiceIDsWithMetaFuncCall objects
// describing the invocations of this function.
func (f *UploadServiceIDsWithMetaFunc) History() []UploadServiceIDsWithMetaFuncCall {
f.mutex.Lock()
history := make([]UploadServiceReconcileCandidatesFuncCall, len(f.history))
history := make([]UploadServiceIDsWithMetaFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// UploadServiceReconcileCandidatesFuncCall is an object that describes an
// invocation of method ReconcileCandidates on an instance of
// MockUploadService.
type UploadServiceReconcileCandidatesFuncCall struct {
// UploadServiceIDsWithMetaFuncCall is an object that describes an
// invocation of method IDsWithMeta on an instance of MockUploadService.
type UploadServiceIDsWithMetaFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 int
Arg1 []int
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
@ -4182,13 +4437,13 @@ type UploadServiceReconcileCandidatesFuncCall struct {
// Args returns an interface slice containing the arguments of this
// invocation.
func (c UploadServiceReconcileCandidatesFuncCall) Args() []interface{} {
func (c UploadServiceIDsWithMetaFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c UploadServiceReconcileCandidatesFuncCall) Results() []interface{} {
func (c UploadServiceIDsWithMetaFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}

View File

@ -17,8 +17,10 @@ type operations struct {
uploadProcessor *observation.Operation
uploadSizeGuage prometheus.Gauge
numReconcileScans prometheus.Counter
numReconcileDeletes prometheus.Counter
numReconcileScansFromFrontend prometheus.Counter
numReconcileDeletesFromFrontend prometheus.Counter
numReconcileScansFromCodeIntelDB prometheus.Counter
numReconcileDeletesFromCodeIntelDB prometheus.Counter
}
func newOperations(observationContext *observation.Context) *operations {
@ -47,12 +49,20 @@ func newOperations(observationContext *observation.Context) *operations {
return counter
}
numReconcileScans := counter(
"src_codeintel_uploads_reconciliation_records_scanned_total",
"The number of upload records read from codeintel-db for reconciliation.",
numReconcileScansFromFrontend := counter(
"src_codeintel_uploads_frontend_reconciliation_records_scanned_total",
"The number of upload records read from the frontend db for reconciliation.",
)
numReconcileDeletes := counter(
"src_codeintel_uploads_reconciliation_records_deleted_total",
numReconcileDeletesFromFrontend := counter(
"src_codeintel_uploads_frontend_reconciliation_records_deleted_total",
"The number of abandoned uploads deleted from the frontend db.",
)
numReconcileScansFromCodeIntelDB := counter(
"src_codeintel_uploads_codeinteldb_reconciliation_records_scanned_total",
"The number of upload records read from the codeintel-db for reconciliation.",
)
numReconcileDeletesFromCodeIntelDB := counter(
"src_codeintel_uploads_codeinteldb_reconciliation_records_deleted_total",
"The number of abandoned uploads deleted from the codeintel-db.",
)
@ -78,7 +88,9 @@ func newOperations(observationContext *observation.Context) *operations {
uploadProcessor: uploadProcessor,
uploadSizeGuage: uploadSizeGuage,
numReconcileScans: numReconcileScans,
numReconcileDeletes: numReconcileDeletes,
numReconcileScansFromFrontend: numReconcileScansFromFrontend,
numReconcileDeletesFromFrontend: numReconcileDeletesFromFrontend,
numReconcileScansFromCodeIntelDB: numReconcileScansFromCodeIntelDB,
numReconcileDeletesFromCodeIntelDB: numReconcileDeletesFromCodeIntelDB,
}
}

View File

@ -23,6 +23,7 @@ type LsifStore interface {
WriteReferences(ctx context.Context, bundleID int, monikerLocations chan precise.MonikerLocations) (count uint32, err error)
WriteImplementations(ctx context.Context, bundleID int, monikerLocations chan precise.MonikerLocations) (count uint32, err error)
IDsWithMeta(ctx context.Context, ids []int) ([]int, error)
ReconcileCandidates(ctx context.Context, batchSize int) ([]int, error)
}

View File

@ -4,10 +4,19 @@ import (
"context"
"github.com/keegancsmith/sqlf"
"github.com/lib/pq"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
)
func (s *store) IDsWithMeta(ctx context.Context, ids []int) (_ []int, error error) {
return basestore.ScanInts(s.db.Query(ctx, sqlf.Sprintf(idsWithMetaQuery, pq.Array(ids))))
}
const idsWithMetaQuery = `
SELECT m.dump_id FROM lsif_data_metadata m WHERE m.dump_id = ANY(%s)
`
func (s *store) ReconcileCandidates(ctx context.Context, batchSize int) (_ []int, err error) {
return basestore.ScanInts(s.db.Query(ctx, sqlf.Sprintf(reconcileQuery, batchSize)))
}
@ -17,7 +26,7 @@ WITH candidates AS (
SELECT m.dump_id
FROM lsif_data_metadata m
LEFT JOIN codeintel_last_reconcile lr ON lr.dump_id = m.dump_id
ORDER BY lr.last_reconcile_at DESC NULLS FIRST
ORDER BY lr.last_reconcile_at DESC NULLS FIRST, m.dump_id
LIMIT %s
)
INSERT INTO codeintel_last_reconcile

View File

@ -0,0 +1,99 @@
package lsifstore
import (
"context"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/sourcegraph/log/logtest"
codeintelshared "github.com/sourcegraph/sourcegraph/internal/codeintel/shared"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
)
func TestIDsWithMeta(t *testing.T) {
logger := logtest.Scoped(t)
codeIntelDB := codeintelshared.NewCodeIntelDB(dbtest.NewDB(logger, t))
store := New(codeIntelDB, &observation.TestContext)
ctx := context.Background()
if _, err := codeIntelDB.ExecContext(ctx, `
INSERT INTO lsif_data_metadata (dump_id, num_result_chunks) VALUES (100, 0);
INSERT INTO lsif_data_metadata (dump_id, num_result_chunks) VALUES (102, 0);
INSERT INTO lsif_data_metadata (dump_id, num_result_chunks) VALUES (104, 0);
`); err != nil {
t.Fatalf("unexpected error setting up test: %s", err)
}
candidates := []int{
100, // exists
101,
103,
104, // exists
105,
}
ids, err := store.IDsWithMeta(ctx, candidates)
if err != nil {
t.Fatalf("failed to find upload IDs with metadata: %s", err)
}
expectedIDs := []int{
100,
104,
}
sort.Ints(ids)
if diff := cmp.Diff(expectedIDs, ids); diff != "" {
t.Fatalf("unexpected IDs (-want +got):\n%s", diff)
}
}
func TestReconcileCandidates(t *testing.T) {
logger := logtest.Scoped(t)
codeIntelDB := codeintelshared.NewCodeIntelDB(dbtest.NewDB(logger, t))
store := New(codeIntelDB, &observation.TestContext)
ctx := context.Background()
if _, err := codeIntelDB.ExecContext(ctx, `
INSERT INTO lsif_data_metadata (dump_id, num_result_chunks) VALUES (100, 0);
INSERT INTO lsif_data_metadata (dump_id, num_result_chunks) VALUES (101, 0);
INSERT INTO lsif_data_metadata (dump_id, num_result_chunks) VALUES (102, 0);
INSERT INTO lsif_data_metadata (dump_id, num_result_chunks) VALUES (103, 0);
INSERT INTO lsif_data_metadata (dump_id, num_result_chunks) VALUES (104, 0);
INSERT INTO lsif_data_metadata (dump_id, num_result_chunks) VALUES (105, 0);
`); err != nil {
t.Fatalf("unexpected error setting up test: %s", err)
}
// Initial batch of records
ids, err := store.ReconcileCandidates(ctx, 4)
if err != nil {
t.Fatalf("failed to get candidate IDs for reconciliation: %s", err)
}
expectedIDs := []int{
100,
101,
102,
103,
}
sort.Ints(ids)
if diff := cmp.Diff(expectedIDs, ids); diff != "" {
t.Fatalf("unexpected IDs (-want +got):\n%s", diff)
}
// Remaining records, wrap around
ids, err = store.ReconcileCandidates(ctx, 4)
if err != nil {
t.Fatalf("failed to get candidate IDs for reconciliation: %s", err)
}
expectedIDs = []int{
100,
101,
104,
105,
}
sort.Ints(ids)
if diff := cmp.Diff(expectedIDs, ids); diff != "" {
t.Fatalf("unexpected IDs (-want +got):\n%s", diff)
}
}

View File

@ -91,6 +91,8 @@ type Store interface {
// Workerutil
WorkerutilStore(observationContext *observation.Context) dbworkerstore.Store
ReconcileCandidates(ctx context.Context, batchSize int) (_ []int, err error)
}
// store manages the database operations for uploads.

View File

@ -0,0 +1,35 @@
package store
import (
"context"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
)
func (s *store) ReconcileCandidates(ctx context.Context, batchSize int) (_ []int, err error) {
return basestore.ScanInts(s.db.Query(ctx, sqlf.Sprintf(reconcileQuery, batchSize)))
}
const reconcileQuery = `
WITH
candidates AS (
SELECT u.id
FROM lsif_uploads u
WHERE u.state = 'completed'
ORDER BY u.last_reconcile_at DESC NULLS FIRST, u.id
LIMIT %s
),
locked_candidates AS (
SELECT u.id
FROM lsif_uploads u
WHERE id = ANY(SELECT id FROM candidates)
ORDER BY u.id
FOR UPDATE
)
UPDATE lsif_uploads
SET last_reconcile_at = NOW()
WHERE id = ANY(SELECT id FROM locked_candidates)
RETURNING id
`

View File

@ -0,0 +1,64 @@
package store
import (
"context"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/sourcegraph/log/logtest"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
)
func TestReconcileCandidates(t *testing.T) {
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))
store := New(db, &observation.TestContext)
ctx := context.Background()
if _, err := db.ExecContext(ctx, `
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (100, 50, '0000000000000000000000000000000000000001', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (101, 50, '0000000000000000000000000000000000000002', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (102, 50, '0000000000000000000000000000000000000003', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (103, 50, '0000000000000000000000000000000000000004', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (104, 50, '0000000000000000000000000000000000000005', 'lsif-test', 1, '{}', 'completed');
INSERT INTO lsif_uploads (id, repository_id, commit, indexer, num_parts, uploaded_parts, state) VALUES (105, 50, '0000000000000000000000000000000000000006', 'lsif-test', 1, '{}', 'completed');
`); err != nil {
t.Fatalf("unexpected error setting up test: %s", err)
}
// Initial batch of records
ids, err := store.ReconcileCandidates(ctx, 4)
if err != nil {
t.Fatalf("failed to get candidate IDs for reconciliation: %s", err)
}
expectedIDs := []int{
100,
101,
102,
103,
}
sort.Ints(ids)
if diff := cmp.Diff(expectedIDs, ids); diff != "" {
t.Fatalf("unexpected IDs (-want +got):\n%s", diff)
}
// Remaining records, wrap around
ids, err = store.ReconcileCandidates(ctx, 4)
if err != nil {
t.Fatalf("failed to get candidate IDs for reconciliation: %s", err)
}
expectedIDs = []int{
100,
101,
104,
105,
}
sort.Ints(ids)
if diff := cmp.Diff(expectedIDs, ids); diff != "" {
t.Fatalf("unexpected IDs (-want +got):\n%s", diff)
}
}

View File

@ -154,6 +154,9 @@ type MockStore struct {
// MarkQueuedFunc is an instance of a mock function object controlling
// the behavior of the method MarkQueued.
MarkQueuedFunc *StoreMarkQueuedFunc
// ReconcileCandidatesFunc is an instance of a mock function object
// controlling the behavior of the method ReconcileCandidates.
ReconcileCandidatesFunc *StoreReconcileCandidatesFunc
// ReferencesForUploadFunc is an instance of a mock function object
// controlling the behavior of the method ReferencesForUpload.
ReferencesForUploadFunc *StoreReferencesForUploadFunc
@ -391,6 +394,11 @@ func NewMockStore() *MockStore {
return
},
},
ReconcileCandidatesFunc: &StoreReconcileCandidatesFunc{
defaultHook: func(context.Context, int) (r0 []int, r1 error) {
return
},
},
ReferencesForUploadFunc: &StoreReferencesForUploadFunc{
defaultHook: func(context.Context, int) (r0 shared.PackageReferenceScanner, r1 error) {
return
@ -658,6 +666,11 @@ func NewStrictMockStore() *MockStore {
panic("unexpected invocation of MockStore.MarkQueued")
},
},
ReconcileCandidatesFunc: &StoreReconcileCandidatesFunc{
defaultHook: func(context.Context, int) ([]int, error) {
panic("unexpected invocation of MockStore.ReconcileCandidates")
},
},
ReferencesForUploadFunc: &StoreReferencesForUploadFunc{
defaultHook: func(context.Context, int) (shared.PackageReferenceScanner, error) {
panic("unexpected invocation of MockStore.ReferencesForUpload")
@ -855,6 +868,9 @@ func NewMockStoreFrom(i store.Store) *MockStore {
MarkQueuedFunc: &StoreMarkQueuedFunc{
defaultHook: i.MarkQueued,
},
ReconcileCandidatesFunc: &StoreReconcileCandidatesFunc{
defaultHook: i.ReconcileCandidates,
},
ReferencesForUploadFunc: &StoreReferencesForUploadFunc{
defaultHook: i.ReferencesForUpload,
},
@ -4851,6 +4867,114 @@ func (c StoreMarkQueuedFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// StoreReconcileCandidatesFunc describes the behavior when the
// ReconcileCandidates method of the parent MockStore instance is invoked.
type StoreReconcileCandidatesFunc struct {
defaultHook func(context.Context, int) ([]int, error)
hooks []func(context.Context, int) ([]int, error)
history []StoreReconcileCandidatesFuncCall
mutex sync.Mutex
}
// ReconcileCandidates delegates to the next hook function in the queue and
// stores the parameter and result values of this invocation.
func (m *MockStore) ReconcileCandidates(v0 context.Context, v1 int) ([]int, error) {
r0, r1 := m.ReconcileCandidatesFunc.nextHook()(v0, v1)
m.ReconcileCandidatesFunc.appendCall(StoreReconcileCandidatesFuncCall{v0, v1, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the ReconcileCandidates
// method of the parent MockStore instance is invoked and the hook queue is
// empty.
func (f *StoreReconcileCandidatesFunc) SetDefaultHook(hook func(context.Context, int) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// ReconcileCandidates method of the parent MockStore instance invokes the
// hook at the front of the queue and discards it. After the queue is empty,
// the default hook function is invoked for any future action.
func (f *StoreReconcileCandidatesFunc) PushHook(hook func(context.Context, int) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *StoreReconcileCandidatesFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, int) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *StoreReconcileCandidatesFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, int) ([]int, error) {
return r0, r1
})
}
func (f *StoreReconcileCandidatesFunc) nextHook() func(context.Context, int) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *StoreReconcileCandidatesFunc) appendCall(r0 StoreReconcileCandidatesFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of StoreReconcileCandidatesFuncCall objects
// describing the invocations of this function.
func (f *StoreReconcileCandidatesFunc) History() []StoreReconcileCandidatesFuncCall {
f.mutex.Lock()
history := make([]StoreReconcileCandidatesFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// StoreReconcileCandidatesFuncCall is an object that describes an
// invocation of method ReconcileCandidates on an instance of MockStore.
type StoreReconcileCandidatesFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 int
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c StoreReconcileCandidatesFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c StoreReconcileCandidatesFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// StoreReferencesForUploadFunc describes the behavior when the
// ReferencesForUpload method of the parent MockStore instance is invoked.
type StoreReferencesForUploadFunc struct {
@ -7029,6 +7153,9 @@ type MockLsifStore struct {
// object controlling the behavior of the method
// GetUploadDocumentsForPath.
GetUploadDocumentsForPathFunc *LsifStoreGetUploadDocumentsForPathFunc
// IDsWithMetaFunc is an instance of a mock function object controlling
// the behavior of the method IDsWithMeta.
IDsWithMetaFunc *LsifStoreIDsWithMetaFunc
// ReconcileCandidatesFunc is an instance of a mock function object
// controlling the behavior of the method ReconcileCandidates.
ReconcileCandidatesFunc *LsifStoreReconcileCandidatesFunc
@ -7074,6 +7201,11 @@ func NewMockLsifStore() *MockLsifStore {
return
},
},
IDsWithMetaFunc: &LsifStoreIDsWithMetaFunc{
defaultHook: func(context.Context, []int) (r0 []int, r1 error) {
return
},
},
ReconcileCandidatesFunc: &LsifStoreReconcileCandidatesFunc{
defaultHook: func(context.Context, int) (r0 []int, r1 error) {
return
@ -7136,6 +7268,11 @@ func NewStrictMockLsifStore() *MockLsifStore {
panic("unexpected invocation of MockLsifStore.GetUploadDocumentsForPath")
},
},
IDsWithMetaFunc: &LsifStoreIDsWithMetaFunc{
defaultHook: func(context.Context, []int) ([]int, error) {
panic("unexpected invocation of MockLsifStore.IDsWithMeta")
},
},
ReconcileCandidatesFunc: &LsifStoreReconcileCandidatesFunc{
defaultHook: func(context.Context, int) ([]int, error) {
panic("unexpected invocation of MockLsifStore.ReconcileCandidates")
@ -7192,6 +7329,9 @@ func NewMockLsifStoreFrom(i lsifstore.LsifStore) *MockLsifStore {
GetUploadDocumentsForPathFunc: &LsifStoreGetUploadDocumentsForPathFunc{
defaultHook: i.GetUploadDocumentsForPath,
},
IDsWithMetaFunc: &LsifStoreIDsWithMetaFunc{
defaultHook: i.IDsWithMeta,
},
ReconcileCandidatesFunc: &LsifStoreReconcileCandidatesFunc{
defaultHook: i.ReconcileCandidates,
},
@ -7552,6 +7692,114 @@ func (c LsifStoreGetUploadDocumentsForPathFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1, c.Result2}
}
// LsifStoreIDsWithMetaFunc describes the behavior when the IDsWithMeta
// method of the parent MockLsifStore instance is invoked.
type LsifStoreIDsWithMetaFunc struct {
defaultHook func(context.Context, []int) ([]int, error)
hooks []func(context.Context, []int) ([]int, error)
history []LsifStoreIDsWithMetaFuncCall
mutex sync.Mutex
}
// IDsWithMeta delegates to the next hook function in the queue and stores
// the parameter and result values of this invocation.
func (m *MockLsifStore) IDsWithMeta(v0 context.Context, v1 []int) ([]int, error) {
r0, r1 := m.IDsWithMetaFunc.nextHook()(v0, v1)
m.IDsWithMetaFunc.appendCall(LsifStoreIDsWithMetaFuncCall{v0, v1, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the IDsWithMeta method
// of the parent MockLsifStore instance is invoked and the hook queue is
// empty.
func (f *LsifStoreIDsWithMetaFunc) SetDefaultHook(hook func(context.Context, []int) ([]int, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// IDsWithMeta method of the parent MockLsifStore instance invokes the hook
// at the front of the queue and discards it. After the queue is empty, the
// default hook function is invoked for any future action.
func (f *LsifStoreIDsWithMetaFunc) PushHook(hook func(context.Context, []int) ([]int, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *LsifStoreIDsWithMetaFunc) SetDefaultReturn(r0 []int, r1 error) {
f.SetDefaultHook(func(context.Context, []int) ([]int, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *LsifStoreIDsWithMetaFunc) PushReturn(r0 []int, r1 error) {
f.PushHook(func(context.Context, []int) ([]int, error) {
return r0, r1
})
}
func (f *LsifStoreIDsWithMetaFunc) nextHook() func(context.Context, []int) ([]int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *LsifStoreIDsWithMetaFunc) appendCall(r0 LsifStoreIDsWithMetaFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of LsifStoreIDsWithMetaFuncCall objects
// describing the invocations of this function.
func (f *LsifStoreIDsWithMetaFunc) History() []LsifStoreIDsWithMetaFuncCall {
f.mutex.Lock()
history := make([]LsifStoreIDsWithMetaFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// LsifStoreIDsWithMetaFuncCall is an object that describes an invocation of
// method IDsWithMeta on an instance of MockLsifStore.
type LsifStoreIDsWithMetaFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 []int
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []int
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c LsifStoreIDsWithMetaFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c LsifStoreIDsWithMetaFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// LsifStoreReconcileCandidatesFunc describes the behavior when the
// ReconcileCandidates method of the parent MockLsifStore instance is
// invoked.

View File

@ -82,7 +82,11 @@ func (s *Service) GetWorkerutilStore() dbworkerstore.Store {
return s.workerutilStore
}
func (s *Service) ReconcileCandidates(ctx context.Context, batchSize int) ([]int, error) {
func (s *Service) FrontendReconcileCandidates(ctx context.Context, batchSize int) ([]int, error) {
return s.store.ReconcileCandidates(ctx, batchSize)
}
func (s *Service) CodeIntelDBReconcileCandidates(ctx context.Context, batchSize int) ([]int, error) {
return s.lsifstore.ReconcileCandidates(ctx, batchSize)
}
@ -546,6 +550,10 @@ func (s *Service) UpdateAllDirtyCommitGraphs(ctx context.Context, maxAgeForNonSt
return updateErr
}
func (s *Service) IDsWithMeta(ctx context.Context, ids []int) ([]int, error) {
return s.lsifstore.IDsWithMeta(ctx, ids)
}
// lockAndUpdateUploadsVisibleToCommits will call UpdateUploadsVisibleToCommits while holding an advisory lock to give exclusive access to the
// update procedure for this repository. If the lock is already held, this method will simply do nothing.
func (s *Service) lockAndUpdateUploadsVisibleToCommits(ctx context.Context, repositoryID, dirtyToken int, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) (err error) {

View File

@ -13607,6 +13607,19 @@
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "last_reconcile_at",
"Index": 33,
"TypeName": "timestamp with time zone",
"IsNullable": true,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "last_referenced_scan_at",
"Index": 31,
@ -13906,6 +13919,16 @@
"ConstraintType": "",
"ConstraintDefinition": ""
},
{
"Name": "lsif_uploads_last_reconcile_at",
"IsPrimaryKey": false,
"IsUnique": false,
"IsExclusion": false,
"IsDeferrable": false,
"IndexDefinition": "CREATE INDEX lsif_uploads_last_reconcile_at ON lsif_uploads USING btree (last_reconcile_at, id) WHERE state = 'completed'::text",
"ConstraintType": "",
"ConstraintDefinition": ""
},
{
"Name": "lsif_uploads_repository_id_commit",
"IsPrimaryKey": false,

View File

@ -2052,12 +2052,14 @@ Stores the retention policy of code intellience data for a repository.
uncompressed_size | bigint | | |
last_referenced_scan_at | timestamp with time zone | | |
last_traversal_scan_at | timestamp with time zone | | |
last_reconcile_at | timestamp with time zone | | |
Indexes:
"lsif_uploads_pkey" PRIMARY KEY, btree (id)
"lsif_uploads_repository_id_commit_root_indexer" UNIQUE, btree (repository_id, commit, root, indexer) WHERE state = 'completed'::text
"lsif_uploads_associated_index_id" btree (associated_index_id)
"lsif_uploads_commit_last_checked_at" btree (commit_last_checked_at) WHERE state <> 'deleted'::text
"lsif_uploads_committed_at" btree (committed_at) WHERE state = 'completed'::text
"lsif_uploads_last_reconcile_at" btree (last_reconcile_at, id) WHERE state = 'completed'::text
"lsif_uploads_repository_id_commit" btree (repository_id, commit)
"lsif_uploads_state" btree (state)
"lsif_uploads_uploaded_at" btree (uploaded_at)

View File

@ -0,0 +1,4 @@
DROP INDEX IF EXISTS lsif_uploads_last_reconcile_at;
ALTER TABLE
lsif_uploads DROP COLUMN IF EXISTS last_reconcile_at;

View File

@ -0,0 +1,2 @@
name: Add last_reconcile to lsif_uploads table
parents: [1667395984]

View File

@ -0,0 +1,8 @@
ALTER TABLE
lsif_uploads
ADD
COLUMN IF NOT EXISTS last_reconcile_at timestamp with time zone;
CREATE INDEX IF NOT EXISTS lsif_uploads_last_reconcile_at ON lsif_uploads(last_reconcile_at, id)
WHERE
state = 'completed';

View File

@ -2471,6 +2471,7 @@ CREATE TABLE lsif_uploads (
uncompressed_size bigint,
last_referenced_scan_at timestamp with time zone,
last_traversal_scan_at timestamp with time zone,
last_reconcile_at timestamp with time zone,
CONSTRAINT lsif_uploads_commit_valid_chars CHECK ((commit ~ '^[a-z0-9]{40}$'::text))
);
@ -4471,6 +4472,8 @@ CREATE INDEX lsif_uploads_commit_last_checked_at ON lsif_uploads USING btree (co
CREATE INDEX lsif_uploads_committed_at ON lsif_uploads USING btree (committed_at) WHERE (state = 'completed'::text);
CREATE INDEX lsif_uploads_last_reconcile_at ON lsif_uploads USING btree (last_reconcile_at, id) WHERE (state = 'completed'::text);
CREATE INDEX lsif_uploads_repository_id_commit ON lsif_uploads USING btree (repository_id, commit);
CREATE UNIQUE INDEX lsif_uploads_repository_id_commit_root_indexer ON lsif_uploads USING btree (repository_id, commit, root, indexer) WHERE (state = 'completed'::text);