codeintel: Collapse background job structs into the service (#42610)

This commit is contained in:
Eric Fritz 2022-10-06 07:48:57 -05:00 committed by GitHub
parent 3f3d8f4d49
commit f6ada2ffb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 640 additions and 902 deletions

View File

@ -0,0 +1,37 @@
package autoindexing
import (
"time"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
)
// NewIndexResetter returns a background routine that periodically resets index
// records that are marked as being processed but are no longer being processed
// by a worker.
func (s *Service) NewIndexResetter(interval time.Duration) *dbworker.Resetter {
return dbworker.NewResetter(s.logger, s.workerutilStore, dbworker.ResetterOptions{
Name: "precise_code_intel_index_worker_resetter",
Interval: interval,
Metrics: dbworker.ResetterMetrics{
RecordResets: s.metrics.numIndexResets,
RecordResetFailures: s.metrics.numIndexResetFailures,
Errors: s.metrics.numIndexResetErrors,
},
})
}
// NewDependencyIndexResetter returns a background routine that periodically resets
// dependency index records that are marked as being processed but are no longer being
// processed by a worker.
func (s *Service) NewDependencyIndexResetter(interval time.Duration) *dbworker.Resetter {
return dbworker.NewResetter(s.logger, s.dependencyIndexingStore, dbworker.ResetterOptions{
Name: "precise_code_intel_dependency_index_worker_resetter",
Interval: interval,
Metrics: dbworker.ResetterMetrics{
RecordResets: s.metrics.numDependencyIndexResets,
RecordResetFailures: s.metrics.numDependencyIndexResetFailures,
Errors: s.metrics.numDependencyIndexResetErrors,
},
})
}

View File

@ -1,40 +0,0 @@
package autoindexing
import (
"context"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func (s *Service) NewOnDemandScheduler(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, &onDemandScheduler{
autoindexingSvc: s,
batchSize: batchSize,
})
}
type onDemandScheduler struct {
autoindexingSvc *Service
batchSize int
logger log.Logger
}
var (
_ goroutine.Handler = &scheduler{}
_ goroutine.ErrorHandler = &scheduler{}
)
func (s *onDemandScheduler) Handle(ctx context.Context) error {
if !autoIndexingEnabled() {
return nil
}
return s.autoindexingSvc.ProcessRepoRevs(ctx, s.batchSize)
}
func (s *onDemandScheduler) HandleError(err error) {
s.logger.Error("Failed to schedule on-demand index jobs", log.Error(err))
}

View File

@ -4,9 +4,6 @@ import (
"context"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/types"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
@ -21,35 +18,17 @@ func (s *Service) NewScheduler(
repositoryBatchSize int,
policyBatchSize int,
) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutineWithMetrics(context.Background(), interval, &scheduler{
autoindexingSvc: s,
policySvc: s.policiesSvc,
uploadSvc: s.uploadSvc,
policyMatcher: s.policyMatcher,
logger: log.Scoped("autoindexing-scheduler", ""),
repositoryProcessDelay: repositoryProcessDelay,
repositoryBatchSize: repositoryBatchSize,
policyBatchSize: policyBatchSize,
}, s.operations.handleIndexScheduler)
return goroutine.NewPeriodicGoroutineWithMetrics(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.handleScheduler(ctx, repositoryProcessDelay, repositoryBatchSize, policyBatchSize)
}), s.operations.handleIndexScheduler)
}
type scheduler struct {
autoindexingSvc *Service
policySvc PoliciesService
uploadSvc shared.UploadService
policyMatcher PolicyMatcher
logger log.Logger
repositoryProcessDelay time.Duration
repositoryBatchSize int
policyBatchSize int
}
var (
_ goroutine.Handler = &scheduler{}
_ goroutine.ErrorHandler = &scheduler{}
)
func (s *scheduler) Handle(ctx context.Context) error {
func (s *Service) handleScheduler(
ctx context.Context,
repositoryProcessDelay time.Duration,
repositoryBatchSize int,
policyBatchSize int,
) error {
if !autoIndexingEnabled() {
return nil
}
@ -67,10 +46,10 @@ func (s *scheduler) Handle(ctx context.Context) error {
ctx,
"lsif_last_index_scan",
"last_index_scan_at",
s.repositoryProcessDelay,
repositoryProcessDelay,
conf.CodeIntelAutoIndexingAllowGlobalPolicies(),
repositoryMatchLimit,
s.repositoryBatchSize,
repositoryBatchSize,
time.Now(),
)
if err != nil {
@ -84,7 +63,7 @@ func (s *scheduler) Handle(ctx context.Context) error {
now := timeutil.Now()
for _, repositoryID := range repositories {
if repositoryErr := s.handleRepository(ctx, repositoryID, now); repositoryErr != nil {
if repositoryErr := s.handleRepository(ctx, repositoryID, policyBatchSize, now); repositoryErr != nil {
if err == nil {
err = repositoryErr
} else {
@ -96,15 +75,15 @@ func (s *scheduler) Handle(ctx context.Context) error {
return err
}
func (s *scheduler) handleRepository(ctx context.Context, repositoryID int, now time.Time) error {
func (s *Service) handleRepository(ctx context.Context, repositoryID, policyBatchSize int, now time.Time) error {
offset := 0
for {
// Retrieve the set of configuration policies that affect indexing for this repository.
policies, totalCount, err := s.policySvc.GetConfigurationPolicies(ctx, types.GetConfigurationPoliciesOptions{
policies, totalCount, err := s.policiesSvc.GetConfigurationPolicies(ctx, types.GetConfigurationPoliciesOptions{
RepositoryID: repositoryID,
ForIndexing: true,
Limit: s.policyBatchSize,
Limit: policyBatchSize,
Offset: offset,
})
if err != nil {
@ -124,7 +103,7 @@ func (s *scheduler) handleRepository(ctx context.Context, repositoryID int, now
}
// Attempt to queue an index if one does not exist for each of the matching commits
if _, err := s.autoindexingSvc.QueueIndexes(ctx, repositoryID, commit, "", false, false); err != nil {
if _, err := s.QueueIndexes(ctx, repositoryID, commit, "", false, false); err != nil {
if errors.HasType(err, &gitdomain.RevisionNotFoundError{}) {
continue
}
@ -139,6 +118,12 @@ func (s *scheduler) handleRepository(ctx context.Context, repositoryID int, now
}
}
func (s *scheduler) HandleError(err error) {
s.logger.Error("Failed to schedule index jobs", log.Error(err))
func (s *Service) NewOnDemandScheduler(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
if !autoIndexingEnabled() {
return nil
}
return s.ProcessRepoRevs(ctx, batchSize)
}))
}

View File

@ -140,36 +140,6 @@ func (s *Service) WorkerutilStore() dbworkerstore.Store { return s.worke
func (s *Service) DependencySyncStore() dbworkerstore.Store { return s.dependencySyncStore }
func (s *Service) DependencyIndexingStore() dbworkerstore.Store { return s.dependencyIndexingStore }
// NewIndexResetter returns a background routine that periodically resets index
// records that are marked as being processed but are no longer being processed
// by a worker.
func (s *Service) NewIndexResetter(interval time.Duration) *dbworker.Resetter {
return dbworker.NewResetter(s.logger, s.workerutilStore, dbworker.ResetterOptions{
Name: "precise_code_intel_index_worker_resetter",
Interval: interval,
Metrics: dbworker.ResetterMetrics{
RecordResets: s.metrics.numIndexResets,
RecordResetFailures: s.metrics.numIndexResetFailures,
Errors: s.metrics.numIndexResetErrors,
},
})
}
// NewDependencyIndexResetter returns a background routine that periodically resets
// dependency index records that are marked as being processed but are no longer being
// processed by a worker.
func (s *Service) NewDependencyIndexResetter(interval time.Duration) *dbworker.Resetter {
return dbworker.NewResetter(s.logger, s.dependencyIndexingStore, dbworker.ResetterOptions{
Name: "precise_code_intel_dependency_index_worker_resetter",
Interval: interval,
Metrics: dbworker.ResetterMetrics{
RecordResets: s.metrics.numDependencyIndexResets,
RecordResetFailures: s.metrics.numDependencyIndexResetFailures,
Errors: s.metrics.numDependencyIndexResetErrors,
},
})
}
func (s *Service) InsertDependencyIndexingJob(ctx context.Context, uploadID int, externalServiceKind string, syncTime time.Time) (id int, err error) {
ctx, _, endObservation := s.operations.insertDependencyIndexingJob.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})

View File

@ -24,15 +24,6 @@ import (
"github.com/sourcegraph/sourcegraph/schema"
)
type syncer struct {
depsSvc *Service
externalServicesStore ExternalServiceStore
gitClient GitserverClient
interval time.Duration
}
var _ goroutine.Handler = &syncer{}
func (s *Service) NewCrateSyncer() goroutine.BackgroundRoutine {
if s.gitClient == nil {
panic("illegal service construction - NewCrateSyncer called without a registered gitClient")
@ -52,16 +43,13 @@ func (s *Service) NewCrateSyncer() goroutine.BackgroundRoutine {
}
}
return goroutine.NewPeriodicGoroutine(context.Background(), interval, &syncer{
depsSvc: s,
externalServicesStore: s.extSvcStore,
gitClient: s.gitClient,
interval: interval,
})
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.handleCrateSyncer(ctx, interval)
}))
}
func (s *syncer) Handle(ctx context.Context) error {
exists, externalService, err := singleRustExternalService(ctx, s.externalServicesStore)
func (s *Service) handleCrateSyncer(ctx context.Context, interval time.Duration) error {
exists, externalService, err := singleRustExternalService(ctx, s.extSvcStore)
if !exists || err != nil {
// err can be nil when there is no RUSTPACKAGES code host.
return err
@ -78,7 +66,7 @@ func (s *syncer) Handle(ctx context.Context) error {
}
repoName := api.RepoName(config.IndexRepositoryName)
update, err := s.gitClient.RequestRepoUpdate(ctx, repoName, s.interval)
update, err := s.gitClient.RequestRepoUpdate(ctx, repoName, interval)
if err != nil {
return err
}
@ -141,7 +129,7 @@ func (s *syncer) Handle(ctx context.Context) error {
if err != nil {
return err
}
new, err := s.depsSvc.UpsertDependencyRepos(ctx, pkgs)
new, err := s.dependenciesStore.UpsertDependencyRepos(ctx, pkgs)
if err != nil {
return errors.Wrapf(err, "failed to insert Rust crate")
}
@ -152,7 +140,7 @@ func (s *syncer) Handle(ctx context.Context) error {
// We picked up new crates so we trigger a new sync for the RUSTPACKAGES code host.
nextSync := time.Now()
externalService.NextSyncAt = nextSync
if err := s.externalServicesStore.Upsert(ctx, externalService); err != nil {
if err := s.extSvcStore.Upsert(ctx, externalService); err != nil {
return err
}
}

View File

@ -8,27 +8,14 @@ import (
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
type matcher struct {
policySvc *Service
configurationPolicyMembershipBatchSize int
metrics *matcherMetrics
}
var (
_ goroutine.Handler = &matcher{}
_ goroutine.ErrorHandler = &matcher{}
)
func (s *Service) NewRepoMatcher(interval time.Duration, configurationPolicyMembershipBatchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, &matcher{
policySvc: s,
configurationPolicyMembershipBatchSize: configurationPolicyMembershipBatchSize,
metrics: s.matcherMetrics,
})
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.handleRepoMatcherBatch(ctx, configurationPolicyMembershipBatchSize)
}))
}
func (m *matcher) Handle(ctx context.Context) error {
policies, err := m.policySvc.SelectPoliciesForRepositoryMembershipUpdate(ctx, m.configurationPolicyMembershipBatchSize)
func (s *Service) handleRepoMatcherBatch(ctx context.Context, batchSize int) error {
policies, err := s.SelectPoliciesForRepositoryMembershipUpdate(ctx, batchSize)
if err != nil {
return err
}
@ -47,14 +34,12 @@ func (m *matcher) Handle(ctx context.Context) error {
// Always call this even if patterns are not supplied. Otherwise we run into the
// situation where we have deleted all of the patterns associated with a policy
// but it still has entries in the lookup table.
if err := m.policySvc.UpdateReposMatchingPatterns(ctx, patterns, policy.ID, repositoryMatchLimit); err != nil {
if err := s.UpdateReposMatchingPatterns(ctx, patterns, policy.ID, repositoryMatchLimit); err != nil {
return err
}
m.metrics.numPoliciesUpdated.Inc()
s.matcherMetrics.numPoliciesUpdated.Inc()
}
return nil
}
func (m *matcher) HandleError(err error) {}

View File

@ -1,29 +0,0 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// HandleAbandonedUpload removes upload records which have not left the uploading state within the given TTL.
func (j *janitor) HandleAbandonedUpload(ctx context.Context) error {
count, err := j.uploadSvc.DeleteUploadsStuckUploading(ctx, time.Now().UTC().Add(-j.uploadTimeout))
if err != nil {
return errors.Wrap(err, "dbstore.DeleteUploadsStuckUploading")
}
if count > 0 {
j.logger.Debug("Deleted abandoned upload records", log.Int("count", count))
j.metrics.numUploadRecordsRemoved.Add(float64(count))
}
return nil
}
// func (j *janitor) HandleError(err error) {
// h.metrics.numErrors.Inc()
// log.Error("Failed to delete abandoned uploads", "error", err)
// }

View File

@ -1,23 +0,0 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (j *janitor) HandleAuditLog(ctx context.Context) (err error) {
count, err := j.uploadSvc.DeleteOldAuditLogs(ctx, j.auditLogMaxAge, time.Now())
if err != nil {
return errors.Wrap(err, "dbstore.DeleteOldAuditLogs")
}
j.metrics.numAuditLogRecordsExpired.Add(float64(count))
return nil
}
// func (j *janitor) HandleError(err error) {
// j.metrics.numErrors.Inc()
// log15.Error("Failed to delete codeintel audit log records", "error", err)
// }

View File

@ -0,0 +1,14 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func (s *Service) NewCommittedAtBackfiller(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.BackfillCommittedAtBatch(ctx, batchSize)
}))
}

View File

@ -0,0 +1,20 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
// Handle periodically re-calculates the commit and upload visibility graph for repositories
// that are marked as dirty by the worker process. This is done out-of-band from the rest of
// the upload processing as it is likely that we are processing multiple uploads concurrently
// for the same repository and should not repeat the work since the last calculation performed
// will always be the one we want.
func (s *Service) NewUpdater(interval time.Duration, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.UpdateDirtyRepositories(ctx, maxAgeForNonStaleBranches, maxAgeForNonStaleTags)
}))
}

View File

@ -1,32 +0,0 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
type committedAtBackfiller struct {
uploadSvc *Service
batchSize int
}
var (
_ goroutine.Handler = &committedAtBackfiller{}
_ goroutine.ErrorHandler = &committedAtBackfiller{}
)
func (s *Service) NewCommittedAtBackfiller(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, &committedAtBackfiller{
uploadSvc: s,
batchSize: batchSize,
})
}
func (u *committedAtBackfiller) Handle(ctx context.Context) error {
return u.uploadSvc.BackfillCommittedAtBatch(ctx, u.batchSize)
}
func (u *committedAtBackfiller) HandleError(err error) {
}

View File

@ -1,75 +0,0 @@
package uploads
import (
"context"
"sort"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (j *janitor) HandleDeletedRepository(ctx context.Context) (err error) {
uploadsCounts, err := j.uploadSvc.DeleteUploadsWithoutRepository(ctx, time.Now())
if err != nil {
return errors.Wrap(err, "uploadSvc.DeleteUploadsWithoutRepository")
}
indexesCounts, err := j.indexSvc.DeleteIndexesWithoutRepository(ctx, time.Now())
if err != nil {
return errors.Wrap(err, "indexSvc.DeleteIndexesWithoutRepository")
}
for _, counts := range gatherCounts(uploadsCounts, indexesCounts) {
j.logger.Debug(
"Deleted codeintel records with a deleted repository",
log.Int("repository_id", counts.repoID),
log.Int("uploads_count", counts.uploadsCount),
log.Int("indexes_count", counts.indexesCount),
)
j.metrics.numUploadRecordsRemoved.Add(float64(counts.uploadsCount))
j.metrics.numIndexRecordsRemoved.Add(float64(counts.indexesCount))
}
return nil
}
// func (j *janitor) HandleError(err error) {
// j.metrics.numErrors.Inc()
// log15.Error("Failed to delete codeintel records with a deleted repository", "error", err)
// }
type recordCount struct {
repoID int
uploadsCount int
indexesCount int
}
func gatherCounts(uploadsCounts, indexesCounts map[int]int) []recordCount {
repoIDsMap := map[int]struct{}{}
for repoID := range uploadsCounts {
repoIDsMap[repoID] = struct{}{}
}
for repoID := range indexesCounts {
repoIDsMap[repoID] = struct{}{}
}
var repoIDs []int
for repoID := range repoIDsMap {
repoIDs = append(repoIDs, repoID)
}
sort.Ints(repoIDs)
recordCounts := make([]recordCount, 0, len(repoIDs))
for _, repoID := range repoIDs {
recordCounts = append(recordCounts, recordCount{
repoID: repoID,
uploadsCount: uploadsCounts[repoID],
indexesCount: indexesCounts[repoID],
})
}
return recordCounts
}

View File

@ -1,27 +0,0 @@
package uploads
import (
"context"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (j *janitor) HandleExpiredUploadDeleter(ctx context.Context) error {
count, err := j.uploadSvc.SoftDeleteExpiredUploads(ctx)
if err != nil {
return errors.Wrap(err, "SoftDeleteExpiredUploads")
}
if count > 0 {
j.logger.Info("Deleted expired codeintel uploads", log.Int("count", count))
j.metrics.numUploadRecordsRemoved.Add(float64(count))
}
return nil
}
// func (j *janitor) HandleError(err error) {
// j.metrics.numErrors.Inc()
// log.Error("Failed to delete expired codeintel uploads", "error", err)
// }

View File

@ -6,27 +6,11 @@ import (
"github.com/sourcegraph/log"
policiesEnterprise "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
"github.com/sourcegraph/sourcegraph/internal/codeintel/types"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
type expirer struct {
uploadSvc UploadServiceForExpiration
policySvc PolicyService
metrics *expirationMetrics
policyMatcher PolicyMatcher
logger log.Logger
repositoryProcessDelay time.Duration
repositoryBatchSize int
uploadProcessDelay time.Duration
uploadBatchSize int
commitBatchSize int
policyBatchSize int
}
var (
_ goroutine.Handler = &expirer{}
_ goroutine.ErrorHandler = &expirer{}
"github.com/sourcegraph/sourcegraph/internal/timeutil"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (s *Service) NewExpirer(
@ -38,29 +22,245 @@ func (s *Service) NewExpirer(
commitBatchSize int,
policyBatchSize int,
) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, &expirer{
uploadSvc: s,
policySvc: s.policySvc,
policyMatcher: s.policyMatcher,
metrics: s.expirationMetrics,
logger: log.Scoped("Expirer", ""),
repositoryProcessDelay: repositoryProcessDelay,
repositoryBatchSize: repositoryBatchSize,
uploadProcessDelay: uploadProcessDelay,
uploadBatchSize: uploadBatchSize,
commitBatchSize: commitBatchSize,
policyBatchSize: policyBatchSize,
})
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.handleUploadExpirer(ctx, expirerConfig{
repositoryProcessDelay: repositoryProcessDelay,
repositoryBatchSize: repositoryBatchSize,
uploadProcessDelay: uploadProcessDelay,
uploadBatchSize: uploadBatchSize,
commitBatchSize: commitBatchSize,
policyBatchSize: policyBatchSize,
})
}))
}
func (r *expirer) Handle(ctx context.Context) error {
if err := r.HandleUploadExpirer(ctx); err != nil {
type expirerConfig struct {
repositoryProcessDelay time.Duration
repositoryBatchSize int
uploadProcessDelay time.Duration
uploadBatchSize int
commitBatchSize int
policyBatchSize int
}
// HandleUploadExpirer compares the age of upload records against the age of uploads
// protected by global and repository specific data retention policies.
//
// Uploads that are older than the protected retention age are marked as expired. Expired records with
// no dependents will be removed by the expiredUploadDeleter.
func (e *Service) handleUploadExpirer(ctx context.Context, cfg expirerConfig) (err error) {
// Get the batch of repositories that we'll handle in this invocation of the periodic goroutine. This
// set should contain repositories that have yet to be updated, or that have been updated least recently.
// This allows us to update every repository reliably, even if it takes a long time to process through
// the backlog. Note that this set of repositories require a fresh commit graph, so we're not trying to
// process records that have been uploaded but the commits from which they are visible have yet to be
// determined (and appearing as if they are visible to no commit).
repositories, err := e.SetRepositoriesForRetentionScan(ctx, cfg.repositoryProcessDelay, cfg.repositoryBatchSize)
if err != nil {
return errors.Wrap(err, "uploadSvc.SelectRepositoriesForRetentionScan")
}
if len(repositories) == 0 {
// All repositories updated recently enough
return nil
}
now := timeutil.Now()
for _, repositoryID := range repositories {
if repositoryErr := e.handleRepository(ctx, repositoryID, cfg, now); repositoryErr != nil {
if err == nil {
err = repositoryErr
} else {
err = errors.Append(err, repositoryErr)
}
}
}
return err
}
func (e *Service) handleRepository(ctx context.Context, repositoryID int, cfg expirerConfig, now time.Time) error {
e.expirationMetrics.numRepositoriesScanned.Inc()
// Build a map from commits to the set of policies that affect them. Note that this map should
// never be empty as we have multiple protected data retention policies on the global scope so
// that all data visible from a tag or branch tip is protected for at least a short amount of
// time after upload.
commitMap, err := e.buildCommitMap(ctx, repositoryID, cfg, now)
if err != nil {
return err
}
return nil
// Mark the time after which all unprocessed uploads for this repository will not be touched.
// This timestamp field is used as a rate limiting device so we do not busy-loop over the same
// protected records in the background.
//
// This value should be assigned OUTSIDE of the following loop to prevent the case where the
// upload process delay is shorter than the time it takes to process one batch of uploads. This
// is obviously a mis-configuration, but one we can make a bit less catastrophic by not updating
// this value in the loop.
lastRetentionScanBefore := now.Add(-cfg.uploadProcessDelay)
for {
// Each record pulled back by this query will either have its expired flag or its last
// retention scan timestamp updated by the following handleUploads call. This guarantees
// that the loop will terminate naturally after the entire set of candidate uploads have
// been seen and updated with a time necessarily greater than lastRetentionScanBefore.
//
// Additionally, we skip the set of uploads that have finished processing strictly after
// the last update to the commit graph for that repository. This ensures we do not throw
// out new uploads that would happen to be visible to no commits since they were never
// installed into the commit graph.
uploads, _, err := e.GetUploads(ctx, types.GetUploadsOptions{
State: "completed",
RepositoryID: repositoryID,
AllowExpired: false,
OldestFirst: true,
Limit: cfg.uploadBatchSize,
LastRetentionScanBefore: &lastRetentionScanBefore,
InCommitGraph: true,
})
if err != nil || len(uploads) == 0 {
return err
}
if err := e.handleUploads(ctx, commitMap, uploads, cfg, now); err != nil {
// Note that we collect errors in the lop of the handleUploads call, but we will still terminate
// this loop on any non-nil error from that function. This is required to prevent us from pullling
// back the same set of failing records from the database in a tight loop.
return err
}
}
}
func (r *expirer) HandleError(err error) {
// buildCommitMap will iterate the complete set of configuration policies that apply to a particular
// repository and build a map from commits to the policies that apply to them.
func (e *Service) buildCommitMap(ctx context.Context, repositoryID int, cfg expirerConfig, now time.Time) (map[string][]policiesEnterprise.PolicyMatch, error) {
var (
offset int
policies []types.ConfigurationPolicy
)
for {
// Retrieve the complete set of configuration policies that affect data retention for this repository
policyBatch, totalCount, err := e.policySvc.GetConfigurationPolicies(ctx, types.GetConfigurationPoliciesOptions{
RepositoryID: repositoryID,
ForDataRetention: true,
Limit: cfg.policyBatchSize,
Offset: offset,
})
if err != nil {
return nil, errors.Wrap(err, "policySvc.GetConfigurationPolicies")
}
offset += len(policyBatch)
policies = append(policies, policyBatch...)
if len(policyBatch) == 0 || offset >= totalCount {
break
}
}
// Get the set of commits within this repository that match a data retention policy
return e.policyMatcher.CommitsDescribedByPolicy(ctx, repositoryID, policies, now)
}
func (e *Service) handleUploads(
ctx context.Context,
commitMap map[string][]policiesEnterprise.PolicyMatch,
uploads []types.Upload,
cfg expirerConfig,
now time.Time,
) (err error) {
// Categorize each upload as protected or expired
var (
protectedUploadIDs = make([]int, 0, len(uploads))
expiredUploadIDs = make([]int, 0, len(uploads))
)
for _, upload := range uploads {
protected, checkErr := e.isUploadProtectedByPolicy(ctx, commitMap, upload, cfg, now)
if checkErr != nil {
if err == nil {
err = checkErr
} else {
err = errors.Append(err, checkErr)
}
// Collect errors but not prevent other commits from being successfully processed. We'll leave the
// ones that fail here alone to be re-checked the next time records for this repository are scanned.
continue
}
if protected {
protectedUploadIDs = append(protectedUploadIDs, upload.ID)
} else {
expiredUploadIDs = append(expiredUploadIDs, upload.ID)
}
}
// Update the last data retention scan timestamp on the upload records with the given protected identifiers
// (so that we do not re-select the same uploads on the next batch) and sets the expired field on the upload
// records with the given expired identifiers so that the expiredUploadDeleter process can remove then once
// they are no longer referenced.
if updateErr := e.UpdateUploadRetention(ctx, protectedUploadIDs, expiredUploadIDs); updateErr != nil {
if updateErr := errors.Wrap(err, "uploadSvc.UpdateUploadRetention"); err == nil {
err = updateErr
} else {
err = errors.Append(err, updateErr)
}
}
if count := len(expiredUploadIDs); count > 0 {
e.logger.Info("Expiring codeintel uploads", log.Int("count", count))
e.expirationMetrics.numUploadsExpired.Add(float64(count))
}
return err
}
func (e *Service) isUploadProtectedByPolicy(
ctx context.Context,
commitMap map[string][]policiesEnterprise.PolicyMatch,
upload types.Upload,
cfg expirerConfig,
now time.Time,
) (bool, error) {
e.expirationMetrics.numUploadsScanned.Inc()
var token *string
for first := true; first || token != nil; first = false {
// Fetch the set of commits for which this upload can resolve code intelligence queries. This will necessarily
// include the exact commit indicated by the upload, but may also provide best-effort code intelligence to
// nearby commits.
//
// We need to consider all visible commits, as we may otherwise delete the uploads providing code intelligence
// for the tip of a branch between the time gitserver is updated and new the associated code intelligence index
// is processed.
//
// We check the set of commits visible to an upload in batches as in some cases it can be very large; for
// example, a single historic commit providing code intelligence for all descendants.
commits, nextToken, err := e.GetCommitsVisibleToUpload(ctx, upload.ID, cfg.commitBatchSize, token)
if err != nil {
return false, errors.Wrap(err, "uploadSvc.CommitsVisibleToUpload")
}
token = nextToken
e.expirationMetrics.numCommitsScanned.Add(float64(len(commits)))
for _, commit := range commits {
if policyMatches, ok := commitMap[commit]; ok {
for _, policyMatch := range policyMatches {
if policyMatch.PolicyDuration == nil || now.Sub(upload.UploadedAt) < *policyMatch.PolicyDuration {
return true, nil
}
}
}
}
}
return false, nil
}

View File

@ -1,18 +0,0 @@
package uploads
import (
"context"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (j *janitor) HandleHardDeleter(ctx context.Context) error {
count, err := j.uploadSvc.HardDeleteExpiredUploads(ctx)
if err != nil {
return errors.Wrap(err, "uploadSvc.HardDeleteExpiredUploads")
}
j.metrics.numUploadsPurged.Add(float64(count))
return nil
}

View File

@ -2,24 +2,19 @@ package uploads
import (
"context"
"sort"
"time"
"github.com/derision-test/glock"
"github.com/sourcegraph/log"
autoindexingshared "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/shared"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type janitor struct {
gsc GitserverClient
uploadSvc UploadServiceForCleanup
indexSvc AutoIndexingService
metrics *janitorMetrics
logger log.Logger
clock glock.Clock
type janitorConfig struct {
uploadTimeout time.Duration
auditLogMaxAge time.Duration
minimumTimeSinceLastCheck time.Duration
@ -27,11 +22,6 @@ type janitor struct {
commitResolverMaximumCommitLag time.Duration
}
var (
_ goroutine.Handler = &janitor{}
_ goroutine.ErrorHandler = &janitor{}
)
func (s *Service) NewJanitor(
interval time.Duration,
uploadTimeout time.Duration,
@ -40,47 +30,257 @@ func (s *Service) NewJanitor(
commitResolverBatchSize int,
commitResolverMaximumCommitLag time.Duration,
) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, &janitor{
gsc: s.gitserverClient,
uploadSvc: s,
indexSvc: s.autoIndexingSvc,
metrics: s.janitorMetrics,
clock: glock.NewRealClock(),
logger: s.logger,
uploadTimeout: uploadTimeout,
auditLogMaxAge: auditLogMaxAge,
minimumTimeSinceLastCheck: minimumTimeSinceLastCheck,
commitResolverBatchSize: commitResolverBatchSize,
commitResolverMaximumCommitLag: commitResolverMaximumCommitLag,
})
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.handleCleanup(ctx, janitorConfig{
uploadTimeout: uploadTimeout,
auditLogMaxAge: auditLogMaxAge,
minimumTimeSinceLastCheck: minimumTimeSinceLastCheck,
commitResolverBatchSize: commitResolverBatchSize,
commitResolverMaximumCommitLag: commitResolverMaximumCommitLag,
})
}))
}
func (j *janitor) Handle(ctx context.Context) (errs error) {
func (s *Service) handleCleanup(ctx context.Context, cfg janitorConfig) (errs error) {
// Reconciliation and denormalization
if err := j.HandleDeletedRepository(ctx); err != nil {
if err := s.handleDeletedRepository(ctx); err != nil {
errs = errors.Append(errs, err)
}
if err := j.HandleUnknownCommit(ctx); err != nil {
if err := s.handleUnknownCommit(ctx, cfg); err != nil {
errs = errors.Append(errs, err)
}
// Expiration
if err := j.HandleAbandonedUpload(ctx); err != nil {
if err := s.handleAbandonedUpload(ctx, cfg); err != nil {
errs = errors.Append(errs, err)
}
if err := j.HandleExpiredUploadDeleter(ctx); err != nil {
if err := s.handleExpiredUploadDeleter(ctx); err != nil {
errs = errors.Append(errs, err)
}
if err := j.HandleHardDeleter(ctx); err != nil {
if err := s.handleHardDeleter(ctx); err != nil {
errs = errors.Append(errs, err)
}
if err := j.HandleAuditLog(ctx); err != nil {
if err := s.handleAuditLog(ctx, cfg); err != nil {
errs = errors.Append(errs, err)
}
return errs
}
func (r *janitor) HandleError(err error) {
func (j *Service) handleDeletedRepository(ctx context.Context) (err error) {
uploadsCounts, err := j.DeleteUploadsWithoutRepository(ctx, time.Now())
if err != nil {
return errors.Wrap(err, "uploadSvc.DeleteUploadsWithoutRepository")
}
indexesCounts, err := j.autoIndexingSvc.DeleteIndexesWithoutRepository(ctx, time.Now())
if err != nil {
return errors.Wrap(err, "indexSvc.DeleteIndexesWithoutRepository")
}
for _, counts := range gatherCounts(uploadsCounts, indexesCounts) {
j.logger.Debug(
"Deleted codeintel records with a deleted repository",
log.Int("repository_id", counts.repoID),
log.Int("uploads_count", counts.uploadsCount),
log.Int("indexes_count", counts.indexesCount),
)
j.janitorMetrics.numUploadRecordsRemoved.Add(float64(counts.uploadsCount))
j.janitorMetrics.numIndexRecordsRemoved.Add(float64(counts.indexesCount))
}
return nil
}
type recordCount struct {
repoID int
uploadsCount int
indexesCount int
}
func gatherCounts(uploadsCounts, indexesCounts map[int]int) []recordCount {
repoIDsMap := map[int]struct{}{}
for repoID := range uploadsCounts {
repoIDsMap[repoID] = struct{}{}
}
for repoID := range indexesCounts {
repoIDsMap[repoID] = struct{}{}
}
var repoIDs []int
for repoID := range repoIDsMap {
repoIDs = append(repoIDs, repoID)
}
sort.Ints(repoIDs)
recordCounts := make([]recordCount, 0, len(repoIDs))
for _, repoID := range repoIDs {
recordCounts = append(recordCounts, recordCount{
repoID: repoID,
uploadsCount: uploadsCounts[repoID],
indexesCount: indexesCounts[repoID],
})
}
return recordCounts
}
func (j *Service) handleUnknownCommit(ctx context.Context, cfg janitorConfig) (err error) {
staleUploads, err := j.GetStaleSourcedCommits(ctx, cfg.minimumTimeSinceLastCheck, cfg.commitResolverBatchSize, j.clock.Now())
if err != nil {
return errors.Wrap(err, "uploadSvc.StaleSourcedCommits")
}
staleIndexes, err := j.autoIndexingSvc.GetStaleSourcedCommits(ctx, cfg.minimumTimeSinceLastCheck, cfg.commitResolverBatchSize, j.clock.Now())
if err != nil {
return errors.Wrap(err, "indexSvc.StaleSourcedCommits")
}
batch := mergeSourceCommits(staleUploads, staleIndexes)
for _, sourcedCommits := range batch {
if err := j.handleSourcedCommits(ctx, sourcedCommits, cfg); err != nil {
return err
}
}
return nil
}
func mergeSourceCommits(usc []shared.SourcedCommits, isc []autoindexingshared.SourcedCommits) []SourcedCommits {
var sourceCommits []SourcedCommits
for _, uc := range usc {
sourceCommits = append(sourceCommits, SourcedCommits{
RepositoryID: uc.RepositoryID,
RepositoryName: uc.RepositoryName,
Commits: uc.Commits,
})
}
for _, ic := range isc {
sourceCommits = append(sourceCommits, SourcedCommits{
RepositoryID: ic.RepositoryID,
RepositoryName: ic.RepositoryName,
Commits: ic.Commits,
})
}
return sourceCommits
}
type SourcedCommits struct {
RepositoryID int
RepositoryName string
Commits []string
}
func (j *Service) handleSourcedCommits(ctx context.Context, sc SourcedCommits, cfg janitorConfig) error {
for _, commit := range sc.Commits {
if err := j.handleCommit(ctx, sc.RepositoryID, sc.RepositoryName, commit, cfg); err != nil {
return err
}
}
return nil
}
func (j *Service) handleCommit(ctx context.Context, repositoryID int, repositoryName, commit string, cfg janitorConfig) error {
var shouldDelete bool
_, err := j.gitserverClient.ResolveRevision(ctx, repositoryID, commit)
if err == nil {
// If we have no error then the commit is resolvable and we shouldn't touch it.
shouldDelete = false
} else if gitdomain.IsRepoNotExist(err) {
// If we have a repository not found error, then we'll just update the timestamp
// of the record so we can move on to other data; we deleted records associated
// with deleted repositories in a separate janitor process.
shouldDelete = false
} else if errors.HasType(err, &gitdomain.RevisionNotFoundError{}) {
// Target condition: repository is resolvable bu the commit is not; was probably
// force-pushed away and the commit was gc'd after some time or after a re-clone
// in gitserver.
shouldDelete = true
} else {
// unexpected error
return errors.Wrap(err, "git.ResolveRevision")
}
if shouldDelete {
_, uploadsDeleted, err := j.DeleteSourcedCommits(ctx, repositoryID, commit, cfg.commitResolverMaximumCommitLag, j.clock.Now())
if err != nil {
return errors.Wrap(err, "uploadSvc.DeleteSourcedCommits")
}
if uploadsDeleted > 0 {
// log.Debug("Deleted upload records with unresolvable commits", "count", uploadsDeleted)
j.janitorMetrics.numUploadRecordsRemoved.Add(float64(uploadsDeleted))
}
indexesDeleted, err := j.autoIndexingSvc.DeleteSourcedCommits(ctx, repositoryID, commit, cfg.commitResolverMaximumCommitLag, j.clock.Now())
if err != nil {
return errors.Wrap(err, "indexSvc.DeleteSourcedCommits")
}
if indexesDeleted > 0 {
// log.Debug("Deleted index records with unresolvable commits", "count", indexesDeleted)
j.janitorMetrics.numIndexRecordsRemoved.Add(float64(indexesDeleted))
}
return nil
}
if _, err := j.UpdateSourcedCommits(ctx, repositoryID, commit, j.clock.Now()); err != nil {
return errors.Wrap(err, "uploadSvc.UpdateSourcedCommits")
}
if _, err := j.autoIndexingSvc.UpdateSourcedCommits(ctx, repositoryID, commit, j.clock.Now()); err != nil {
return errors.Wrap(err, "indexSvc.UpdateSourcedCommits")
}
return nil
}
// handleAbandonedUpload removes upload records which have not left the uploading state within the given TTL.
func (j *Service) handleAbandonedUpload(ctx context.Context, cfg janitorConfig) error {
count, err := j.DeleteUploadsStuckUploading(ctx, time.Now().UTC().Add(-cfg.uploadTimeout))
if err != nil {
return errors.Wrap(err, "dbstore.DeleteUploadsStuckUploading")
}
if count > 0 {
j.logger.Debug("Deleted abandoned upload records", log.Int("count", count))
j.janitorMetrics.numUploadRecordsRemoved.Add(float64(count))
}
return nil
}
func (j *Service) handleExpiredUploadDeleter(ctx context.Context) error {
count, err := j.SoftDeleteExpiredUploads(ctx)
if err != nil {
return errors.Wrap(err, "SoftDeleteExpiredUploads")
}
if count > 0 {
j.logger.Info("Deleted expired codeintel uploads", log.Int("count", count))
j.janitorMetrics.numUploadRecordsRemoved.Add(float64(count))
}
return nil
}
func (j *Service) handleHardDeleter(ctx context.Context) error {
count, err := j.HardDeleteExpiredUploads(ctx)
if err != nil {
return errors.Wrap(err, "uploadSvc.HardDeleteExpiredUploads")
}
j.janitorMetrics.numUploadsPurged.Add(float64(count))
return nil
}
func (j *Service) handleAuditLog(ctx context.Context, cfg janitorConfig) (err error) {
count, err := j.DeleteOldAuditLogs(ctx, cfg.auditLogMaxAge, time.Now())
if err != nil {
return errors.Wrap(err, "dbstore.DeleteOldAuditLogs")
}
j.janitorMetrics.numAuditLogRecordsExpired.Add(float64(count))
return nil
}

View File

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/derision-test/glock"
"github.com/google/go-cmp/cmp"
"github.com/sourcegraph/log/logtest"
@ -18,25 +19,29 @@ import (
func TestUploadExpirer(t *testing.T) {
now := timeutil.Now()
clock := glock.NewMockClock()
clock.SetCurrent(now)
uploadSvc := setupMockUploadService(now)
policySvc := setupMockPolicyService()
policyMatcher := testUploadExpirerMockPolicyMatcher()
uploadExpirer := &expirer{
uploadSvc: uploadSvc,
policySvc: policySvc,
policyMatcher: policyMatcher,
metrics: newExpirationMetrics(&observation.TestContext),
logger: logtest.Scoped(t),
uploadExpirer := &Service{
store: uploadSvc,
policySvc: policySvc,
policyMatcher: policyMatcher,
expirationMetrics: newExpirationMetrics(&observation.TestContext),
logger: logtest.Scoped(t),
operations: newOperations(&observation.TestContext),
clock: clock,
}
if err := uploadExpirer.handleUploadExpirer(context.Background(), expirerConfig{
repositoryProcessDelay: 24 * time.Hour,
repositoryBatchSize: 100,
uploadProcessDelay: 24 * time.Hour,
uploadBatchSize: 100,
commitBatchSize: 100,
}
if err := uploadExpirer.Handle(context.Background()); err != nil {
}); err != nil {
t.Fatalf("unexpected error from handle: %s", err)
}
@ -110,7 +115,7 @@ func setupMockPolicyService() *MockPolicyService {
return policySvc
}
func setupMockUploadService(now time.Time) *MockUploadServiceForExpiration {
func setupMockUploadService(now time.Time) *MockStore {
uploads := []types.Upload{
{ID: 11, State: "completed", RepositoryID: 50, Commit: "deadbeef01", UploadedAt: daysAgo(now, 1)}, // repo 50
{ID: 12, State: "completed", RepositoryID: 50, Commit: "deadbeef02", UploadedAt: daysAgo(now, 2)},
@ -205,7 +210,7 @@ func setupMockUploadService(now time.Time) *MockUploadServiceForExpiration {
return nil, nil, nil
}
uploadSvc := NewMockUploadServiceForExpiration()
uploadSvc := NewMockStore()
uploadSvc.SetRepositoriesForRetentionScanFunc.SetDefaultHook(setRepositoriesForRetentionScanFunc)
uploadSvc.GetUploadsFunc.SetDefaultHook(getUploads)
uploadSvc.UpdateUploadRetentionFunc.SetDefaultHook(updateUploadRetention)

View File

@ -98,37 +98,41 @@ func testUnknownCommitsJanitor(t *testing.T, resolveRevisionFunc func(commit str
return api.CommitID(spec), resolveRevisionFunc(spec)
})
uploadSvc := NewMockUploadServiceForCleanup()
uploadSvc.GetStaleSourcedCommitsFunc.SetDefaultReturn(testSourcedCommits, nil)
store := NewMockStore()
lsifStore := NewMockLsifStore()
store.GetStaleSourcedCommitsFunc.SetDefaultReturn(testSourcedCommits, nil)
autoIndexingSvc := NewMockAutoIndexingService()
janitor := &janitor{
gsc: gitserverClient,
uploadSvc: uploadSvc,
indexSvc: autoIndexingSvc,
clock: glock.NewRealClock(),
logger: logtest.Scoped(t),
metrics: newJanitorMetrics(&observation.TestContext),
minimumTimeSinceLastCheck: 1 * time.Hour,
commitResolverBatchSize: 10,
auditLogMaxAge: 1 * time.Hour,
commitResolverMaximumCommitLag: 1 * time.Hour,
uploadTimeout: 1 * time.Hour,
janitor := &Service{
store: store,
lsifstore: lsifStore,
gitserverClient: gitserverClient,
autoIndexingSvc: autoIndexingSvc,
clock: glock.NewRealClock(),
logger: logtest.Scoped(t),
operations: newOperations(&observation.TestContext),
janitorMetrics: newJanitorMetrics(&observation.TestContext),
}
if err := janitor.Handle(context.Background()); err != nil {
if err := janitor.handleCleanup(
context.Background(), janitorConfig{
minimumTimeSinceLastCheck: 1 * time.Hour,
commitResolverBatchSize: 10,
auditLogMaxAge: 1 * time.Hour,
commitResolverMaximumCommitLag: 1 * time.Hour,
uploadTimeout: 1 * time.Hour,
}); err != nil {
t.Fatalf("unexpected error running janitor: %s", err)
}
var sanitizedCalls []updateInvocation
for _, call := range uploadSvc.UpdateSourcedCommitsFunc.History() {
for _, call := range store.UpdateSourcedCommitsFunc.History() {
sanitizedCalls = append(sanitizedCalls, updateInvocation{
RepositoryID: call.Arg1,
Commit: call.Arg2,
Delete: false,
})
}
for _, call := range uploadSvc.DeleteSourcedCommitsFunc.History() {
for _, call := range store.DeleteSourcedCommitsFunc.History() {
sanitizedCalls = append(sanitizedCalls, updateInvocation{
RepositoryID: call.Arg1,
Commit: call.Arg2,

View File

@ -0,0 +1,14 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func (s *Service) NewReferenceCountUpdater(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.BackfillReferenceCountBatch(ctx, batchSize)
}))
}

View File

@ -1,32 +0,0 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
type referenceCountUpdater struct {
uploadSvc UploadServiceForExpiration
batchSize int
}
var (
_ goroutine.Handler = &referenceCountUpdater{}
_ goroutine.ErrorHandler = &referenceCountUpdater{}
)
func (s *Service) NewReferenceCountUpdater(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, &referenceCountUpdater{
uploadSvc: s,
batchSize: batchSize,
})
}
func (u *referenceCountUpdater) Handle(ctx context.Context) error {
return u.uploadSvc.BackfillReferenceCountBatch(ctx, u.batchSize)
}
func (u *referenceCountUpdater) HandleError(err error) {
}

View File

@ -6,6 +6,7 @@ import (
"sort"
"time"
"github.com/derision-test/glock"
"github.com/opentracing/opentracing-go/log"
logger "github.com/sourcegraph/log"
@ -98,6 +99,7 @@ type Service struct {
locker Locker
logger logger.Logger
operations *operations
clock glock.Clock
}
func newService(
@ -131,6 +133,7 @@ func newService(
locker: locker,
logger: observationContext.Logger,
operations: newOperations(observationContext),
clock: glock.NewRealClock(),
}
}

View File

@ -1,127 +0,0 @@
package uploads
import (
"context"
autoindexing "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/shared"
uploads "github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/shared"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (j *janitor) HandleUnknownCommit(ctx context.Context) (err error) {
staleUploads, err := j.uploadSvc.GetStaleSourcedCommits(ctx, j.minimumTimeSinceLastCheck, j.commitResolverBatchSize, j.clock.Now())
if err != nil {
return errors.Wrap(err, "uploadSvc.StaleSourcedCommits")
}
staleIndexes, err := j.indexSvc.GetStaleSourcedCommits(ctx, j.minimumTimeSinceLastCheck, j.commitResolverBatchSize, j.clock.Now())
if err != nil {
return errors.Wrap(err, "indexSvc.StaleSourcedCommits")
}
batch := mergeSourceCommits(staleUploads, staleIndexes)
for _, sourcedCommits := range batch {
if err := j.handleSourcedCommits(ctx, sourcedCommits); err != nil {
return err
}
}
return nil
}
func mergeSourceCommits(usc []uploads.SourcedCommits, isc []autoindexing.SourcedCommits) []SourcedCommits {
var sourceCommits []SourcedCommits
for _, uc := range usc {
sourceCommits = append(sourceCommits, SourcedCommits{
RepositoryID: uc.RepositoryID,
RepositoryName: uc.RepositoryName,
Commits: uc.Commits,
})
}
for _, ic := range isc {
sourceCommits = append(sourceCommits, SourcedCommits{
RepositoryID: ic.RepositoryID,
RepositoryName: ic.RepositoryName,
Commits: ic.Commits,
})
}
return sourceCommits
}
// func (j *janitor) HandleError(err error) {
// j.metrics.numErrors.Inc()
// log.Error("Failed to delete codeintel records with an unknown commit", "error", err)
// }
type SourcedCommits struct {
RepositoryID int
RepositoryName string
Commits []string
}
func (j *janitor) handleSourcedCommits(ctx context.Context, sc SourcedCommits) error {
for _, commit := range sc.Commits {
if err := j.handleCommit(ctx, sc.RepositoryID, sc.RepositoryName, commit); err != nil {
return err
}
}
return nil
}
func (j *janitor) handleCommit(ctx context.Context, repositoryID int, repositoryName, commit string) error {
var shouldDelete bool
_, err := j.gsc.ResolveRevision(ctx, repositoryID, commit)
if err == nil {
// If we have no error then the commit is resolvable and we shouldn't touch it.
shouldDelete = false
} else if gitdomain.IsRepoNotExist(err) {
// If we have a repository not found error, then we'll just update the timestamp
// of the record so we can move on to other data; we deleted records associated
// with deleted repositories in a separate janitor process.
shouldDelete = false
} else if errors.HasType(err, &gitdomain.RevisionNotFoundError{}) {
// Target condition: repository is resolvable bu the commit is not; was probably
// force-pushed away and the commit was gc'd after some time or after a re-clone
// in gitserver.
shouldDelete = true
} else {
// unexpected error
return errors.Wrap(err, "git.ResolveRevision")
}
if shouldDelete {
_, uploadsDeleted, err := j.uploadSvc.DeleteSourcedCommits(ctx, repositoryID, commit, j.commitResolverMaximumCommitLag, j.clock.Now())
if err != nil {
return errors.Wrap(err, "uploadSvc.DeleteSourcedCommits")
}
if uploadsDeleted > 0 {
// log.Debug("Deleted upload records with unresolvable commits", "count", uploadsDeleted)
j.metrics.numUploadRecordsRemoved.Add(float64(uploadsDeleted))
}
indexesDeleted, err := j.indexSvc.DeleteSourcedCommits(ctx, repositoryID, commit, j.commitResolverMaximumCommitLag, j.clock.Now())
if err != nil {
return errors.Wrap(err, "indexSvc.DeleteSourcedCommits")
}
if indexesDeleted > 0 {
// log.Debug("Deleted index records with unresolvable commits", "count", indexesDeleted)
j.metrics.numIndexRecordsRemoved.Add(float64(indexesDeleted))
}
return nil
}
if _, err := j.uploadSvc.UpdateSourcedCommits(ctx, repositoryID, commit, j.clock.Now()); err != nil {
return errors.Wrap(err, "uploadSvc.UpdateSourcedCommits")
}
if _, err := j.indexSvc.UpdateSourcedCommits(ctx, repositoryID, commit, j.clock.Now()); err != nil {
return errors.Wrap(err, "indexSvc.UpdateSourcedCommits")
}
return nil
}

View File

@ -1,46 +0,0 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type updater struct {
uploadSvc *Service
maxAgeForNonStaleBranches time.Duration
maxAgeForNonStaleTags time.Duration
}
var (
_ goroutine.Handler = &updater{}
_ goroutine.ErrorHandler = &updater{}
)
func (s *Service) NewUpdater(interval time.Duration, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, &updater{
uploadSvc: s,
maxAgeForNonStaleBranches: maxAgeForNonStaleBranches,
maxAgeForNonStaleTags: maxAgeForNonStaleTags,
})
}
// Handle periodically re-calculates the commit and upload visibility graph for repositories
// that are marked as dirty by the worker process. This is done out-of-band from the rest of
// the upload processing as it is likely that we are processing multiple uploads concurrently
// for the same repository and should not repeat the work since the last calculation performed
// will always be the one we want.
// Handle checks for dirty repositories and invokes the underlying updater on each one.
func (u *updater) Handle(ctx context.Context) error {
err := u.uploadSvc.UpdateDirtyRepositories(ctx, u.maxAgeForNonStaleBranches, u.maxAgeForNonStaleTags)
if err != nil {
return errors.Wrap(err, "uploadSvc.UpdateDirtyRepositories")
}
return nil
}
func (u *updater) HandleError(err error) {}

View File

@ -1,238 +0,0 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/log"
policiesEnterprise "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
"github.com/sourcegraph/sourcegraph/internal/codeintel/types"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// HandleUploadExpirer compares the age of upload records against the age of uploads
// protected by global and repository specific data retention policies.
//
// Uploads that are older than the protected retention age are marked as expired. Expired records with
// no dependents will be removed by the expiredUploadDeleter.
func (e *expirer) HandleUploadExpirer(ctx context.Context) (err error) {
// Get the batch of repositories that we'll handle in this invocation of the periodic goroutine. This
// set should contain repositories that have yet to be updated, or that have been updated least recently.
// This allows us to update every repository reliably, even if it takes a long time to process through
// the backlog. Note that this set of repositories require a fresh commit graph, so we're not trying to
// process records that have been uploaded but the commits from which they are visible have yet to be
// determined (and appearing as if they are visible to no commit).
repositories, err := e.uploadSvc.SetRepositoriesForRetentionScan(ctx, e.repositoryProcessDelay, e.repositoryBatchSize)
if err != nil {
return errors.Wrap(err, "uploadSvc.SelectRepositoriesForRetentionScan")
}
if len(repositories) == 0 {
// All repositories updated recently enough
return nil
}
now := timeutil.Now()
for _, repositoryID := range repositories {
if repositoryErr := e.handleRepository(ctx, repositoryID, now); repositoryErr != nil {
if err == nil {
err = repositoryErr
} else {
err = errors.Append(err, repositoryErr)
}
}
}
return err
}
// func (e *expirer) HandleError(err error) {
// e.metrics.numErrors.Inc()
// log15.Error("Failed to expire old codeintel records", "error", err)
// }
func (e *expirer) handleRepository(ctx context.Context, repositoryID int, now time.Time) error {
e.metrics.numRepositoriesScanned.Inc()
// Build a map from commits to the set of policies that affect them. Note that this map should
// never be empty as we have multiple protected data retention policies on the global scope so
// that all data visible from a tag or branch tip is protected for at least a short amount of
// time after upload.
commitMap, err := e.buildCommitMap(ctx, repositoryID, now)
if err != nil {
return err
}
// Mark the time after which all unprocessed uploads for this repository will not be touched.
// This timestamp field is used as a rate limiting device so we do not busy-loop over the same
// protected records in the background.
//
// This value should be assigned OUTSIDE of the following loop to prevent the case where the
// upload process delay is shorter than the time it takes to process one batch of uploads. This
// is obviously a mis-configuration, but one we can make a bit less catastrophic by not updating
// this value in the loop.
lastRetentionScanBefore := now.Add(-e.uploadProcessDelay)
for {
// Each record pulled back by this query will either have its expired flag or its last
// retention scan timestamp updated by the following handleUploads call. This guarantees
// that the loop will terminate naturally after the entire set of candidate uploads have
// been seen and updated with a time necessarily greater than lastRetentionScanBefore.
//
// Additionally, we skip the set of uploads that have finished processing strictly after
// the last update to the commit graph for that repository. This ensures we do not throw
// out new uploads that would happen to be visible to no commits since they were never
// installed into the commit graph.
uploads, _, err := e.uploadSvc.GetUploads(ctx, types.GetUploadsOptions{
State: "completed",
RepositoryID: repositoryID,
AllowExpired: false,
OldestFirst: true,
Limit: e.uploadBatchSize,
LastRetentionScanBefore: &lastRetentionScanBefore,
InCommitGraph: true,
})
if err != nil || len(uploads) == 0 {
return err
}
if err := e.handleUploads(ctx, commitMap, uploads, now); err != nil {
// Note that we collect errors in the lop of the handleUploads call, but we will still terminate
// this loop on any non-nil error from that function. This is required to prevent us from pullling
// back the same set of failing records from the database in a tight loop.
return err
}
}
}
// buildCommitMap will iterate the complete set of configuration policies that apply to a particular
// repository and build a map from commits to the policies that apply to them.
func (e *expirer) buildCommitMap(ctx context.Context, repositoryID int, now time.Time) (map[string][]policiesEnterprise.PolicyMatch, error) {
var (
offset int
policies []types.ConfigurationPolicy
)
for {
// Retrieve the complete set of configuration policies that affect data retention for this repository
policyBatch, totalCount, err := e.policySvc.GetConfigurationPolicies(ctx, types.GetConfigurationPoliciesOptions{
RepositoryID: repositoryID,
ForDataRetention: true,
Limit: e.policyBatchSize,
Offset: offset,
})
if err != nil {
return nil, errors.Wrap(err, "policySvc.GetConfigurationPolicies")
}
offset += len(policyBatch)
policies = append(policies, policyBatch...)
if len(policyBatch) == 0 || offset >= totalCount {
break
}
}
// Get the set of commits within this repository that match a data retention policy
return e.policyMatcher.CommitsDescribedByPolicy(ctx, repositoryID, policies, now)
}
func (e *expirer) handleUploads(
ctx context.Context,
commitMap map[string][]policiesEnterprise.PolicyMatch,
uploads []types.Upload,
now time.Time,
) (err error) {
// Categorize each upload as protected or expired
var (
protectedUploadIDs = make([]int, 0, len(uploads))
expiredUploadIDs = make([]int, 0, len(uploads))
)
for _, upload := range uploads {
protected, checkErr := e.isUploadProtectedByPolicy(ctx, commitMap, upload, now)
if checkErr != nil {
if err == nil {
err = checkErr
} else {
err = errors.Append(err, checkErr)
}
// Collect errors but not prevent other commits from being successfully processed. We'll leave the
// ones that fail here alone to be re-checked the next time records for this repository are scanned.
continue
}
if protected {
protectedUploadIDs = append(protectedUploadIDs, upload.ID)
} else {
expiredUploadIDs = append(expiredUploadIDs, upload.ID)
}
}
// Update the last data retention scan timestamp on the upload records with the given protected identifiers
// (so that we do not re-select the same uploads on the next batch) and sets the expired field on the upload
// records with the given expired identifiers so that the expiredUploadDeleter process can remove then once
// they are no longer referenced.
if updateErr := e.uploadSvc.UpdateUploadRetention(ctx, protectedUploadIDs, expiredUploadIDs); updateErr != nil {
if updateErr := errors.Wrap(err, "uploadSvc.UpdateUploadRetention"); err == nil {
err = updateErr
} else {
err = errors.Append(err, updateErr)
}
}
if count := len(expiredUploadIDs); count > 0 {
e.logger.Info("Expiring codeintel uploads", log.Int("count", count))
e.metrics.numUploadsExpired.Add(float64(count))
}
return err
}
func (e *expirer) isUploadProtectedByPolicy(
ctx context.Context,
commitMap map[string][]policiesEnterprise.PolicyMatch,
upload types.Upload,
now time.Time,
) (bool, error) {
e.metrics.numUploadsScanned.Inc()
var token *string
for first := true; first || token != nil; first = false {
// Fetch the set of commits for which this upload can resolve code intelligence queries. This will necessarily
// include the exact commit indicated by the upload, but may also provide best-effort code intelligence to
// nearby commits.
//
// We need to consider all visible commits, as we may otherwise delete the uploads providing code intelligence
// for the tip of a branch between the time gitserver is updated and new the associated code intelligence index
// is processed.
//
// We check the set of commits visible to an upload in batches as in some cases it can be very large; for
// example, a single historic commit providing code intelligence for all descendants.
commits, nextToken, err := e.uploadSvc.GetCommitsVisibleToUpload(ctx, upload.ID, e.commitBatchSize, token)
if err != nil {
return false, errors.Wrap(err, "uploadSvc.CommitsVisibleToUpload")
}
token = nextToken
e.metrics.numCommitsScanned.Add(float64(len(commits)))
for _, commit := range commits {
if policyMatches, ok := commitMap[commit]; ok {
for _, policyMatch := range policyMatches {
if policyMatch.PolicyDuration == nil || now.Sub(upload.UploadedAt) < *policyMatch.PolicyDuration {
return true, nil
}
}
}
}
}
return false, nil
}