[language-platform] move uploads background jobs to its own layer (#43016)

This commit is contained in:
Cesar Jimenez 2022-10-17 13:41:13 -04:00 committed by GitHub
parent fee8647eb8
commit e680320987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 5765 additions and 3183 deletions

View File

@ -20549,7 +20549,7 @@ Query: `sum by (op)(increase(src_codeintel_uploads_errors_total{job=~"^.*"}[5m])
<br />
### Code Intelligence > Uploads: Codeintel: Uploads > Store
### Code Intelligence > Uploads: Codeintel: Uploads > Store (internal)
#### codeintel-uploads: codeintel_uploads_store_total
@ -20703,6 +20703,160 @@ Query: `sum by (op)(increase(src_codeintel_uploads_store_errors_total{job=~"^.*"
<br />
### Code Intelligence > Uploads: Codeintel: Uploads > Background (internal)
#### codeintel-uploads: codeintel_uploads_background_total
<p class="subtitle">Aggregate background operations every 5m</p>
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100200` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
<details>
<summary>Technical details</summary>
Query: `sum(increase(src_codeintel_uploads_background_total{job=~"^.*"}[5m]))`
</details>
<br />
#### codeintel-uploads: codeintel_uploads_background_99th_percentile_duration
<p class="subtitle">Aggregate successful background operation duration distribution over 5m</p>
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100201` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
<details>
<summary>Technical details</summary>
Query: `sum by (le)(rate(src_codeintel_uploads_background_duration_seconds_bucket{job=~"^.*"}[5m]))`
</details>
<br />
#### codeintel-uploads: codeintel_uploads_background_errors_total
<p class="subtitle">Aggregate background operation errors every 5m</p>
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100202` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
<details>
<summary>Technical details</summary>
Query: `sum(increase(src_codeintel_uploads_background_errors_total{job=~"^.*"}[5m]))`
</details>
<br />
#### codeintel-uploads: codeintel_uploads_background_error_rate
<p class="subtitle">Aggregate background operation error rate over 5m</p>
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100203` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
<details>
<summary>Technical details</summary>
Query: `sum(increase(src_codeintel_uploads_background_errors_total{job=~"^.*"}[5m])) / (sum(increase(src_codeintel_uploads_background_total{job=~"^.*"}[5m])) + sum(increase(src_codeintel_uploads_background_errors_total{job=~"^.*"}[5m]))) * 100`
</details>
<br />
#### codeintel-uploads: codeintel_uploads_background_total
<p class="subtitle">Background operations every 5m</p>
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100210` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
<details>
<summary>Technical details</summary>
Query: `sum by (op)(increase(src_codeintel_uploads_background_total{job=~"^.*"}[5m]))`
</details>
<br />
#### codeintel-uploads: codeintel_uploads_background_99th_percentile_duration
<p class="subtitle">99th percentile successful background operation duration over 5m</p>
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100211` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
<details>
<summary>Technical details</summary>
Query: `histogram_quantile(0.99, sum by (le,op)(rate(src_codeintel_uploads_background_duration_seconds_bucket{job=~"^.*"}[5m])))`
</details>
<br />
#### codeintel-uploads: codeintel_uploads_background_errors_total
<p class="subtitle">Background operation errors every 5m</p>
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100212` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
<details>
<summary>Technical details</summary>
Query: `sum by (op)(increase(src_codeintel_uploads_background_errors_total{job=~"^.*"}[5m]))`
</details>
<br />
#### codeintel-uploads: codeintel_uploads_background_error_rate
<p class="subtitle">Background operation error rate over 5m</p>
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100213` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
<details>
<summary>Technical details</summary>
Query: `sum by (op)(increase(src_codeintel_uploads_background_errors_total{job=~"^.*"}[5m])) / (sum by (op)(increase(src_codeintel_uploads_background_total{job=~"^.*"}[5m])) + sum by (op)(increase(src_codeintel_uploads_background_errors_total{job=~"^.*"}[5m]))) * 100`
</details>
<br />
### Code Intelligence > Uploads: Codeintel: Uploads > GQL Transport
#### codeintel-uploads: codeintel_uploads_transport_graphql_total
@ -20711,7 +20865,7 @@ Query: `sum by (op)(increase(src_codeintel_uploads_store_errors_total{job=~"^.*"
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100200` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100300` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20730,7 +20884,7 @@ Query: `sum(increase(src_codeintel_uploads_transport_graphql_total{job=~"^.*"}[5
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100201` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100301` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20749,7 +20903,7 @@ Query: `sum by (le)(rate(src_codeintel_uploads_transport_graphql_duration_secon
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100202` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100302` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20768,7 +20922,7 @@ Query: `sum(increase(src_codeintel_uploads_transport_graphql_errors_total{job=~"
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100203` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100303` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20787,7 +20941,7 @@ Query: `sum(increase(src_codeintel_uploads_transport_graphql_errors_total{job=~"
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100210` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100310` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20806,7 +20960,7 @@ Query: `sum by (op)(increase(src_codeintel_uploads_transport_graphql_total{job=~
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100211` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100311` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20825,7 +20979,7 @@ Query: `histogram_quantile(0.99, sum by (le,op)(rate(src_codeintel_uploads_tran
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100212` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100312` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20844,7 +20998,7 @@ Query: `sum by (op)(increase(src_codeintel_uploads_transport_graphql_errors_tota
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100213` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100313` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20865,7 +21019,7 @@ Query: `sum by (op)(increase(src_codeintel_uploads_transport_graphql_errors_tota
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100300` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100400` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20884,7 +21038,7 @@ Query: `sum(increase(src_codeintel_uploads_transport_http_total{job=~"^.*"}[5m])
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100301` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100401` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20903,7 +21057,7 @@ Query: `sum by (le)(rate(src_codeintel_uploads_transport_http_duration_seconds_
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100302` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100402` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20922,7 +21076,7 @@ Query: `sum(increase(src_codeintel_uploads_transport_http_errors_total{job=~"^.*
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100303` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100403` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20941,7 +21095,7 @@ Query: `sum(increase(src_codeintel_uploads_transport_http_errors_total{job=~"^.*
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100310` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100410` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20960,7 +21114,7 @@ Query: `sum by (op)(increase(src_codeintel_uploads_transport_http_total{job=~"^.
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100311` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100411` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20979,7 +21133,7 @@ Query: `histogram_quantile(0.99, sum by (le,op)(rate(src_codeintel_uploads_tran
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100312` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100412` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -20998,7 +21152,7 @@ Query: `sum by (op)(increase(src_codeintel_uploads_transport_http_errors_total{j
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100313` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100413` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21021,7 +21175,7 @@ Number of LSIF upload records deleted due to expiration or unreachability every
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100400` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100500` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21042,7 +21196,7 @@ Number of LSIF index records deleted due to expiration or unreachability every 5
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100401` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100501` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21063,7 +21217,7 @@ Number of LSIF upload data bundles purged from the codeintel-db database every 5
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100402` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100502` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21084,7 +21238,7 @@ Number of LSIF upload audit log records deleted due to expiration every 5m
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100403` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100503` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21105,7 +21259,7 @@ Number of code intelligence uploads cleanup task errors every 5m
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100410` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100510` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21126,7 +21280,7 @@ Number of code intelligence autoindexing cleanup task errors every 5m
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100411` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100511` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21147,7 +21301,7 @@ Query: `sum(increase(src_codeintel_autoindexing_background_cleanup_errors_total{
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100500` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100600` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21172,7 +21326,7 @@ This value compares the rate of enqueues against the rate of finished jobs.
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100501` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100601` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21191,7 +21345,7 @@ Query: `sum(increase(src_codeintel_commit_graph_total{job=~"^.*"}[30m])) / sum(i
Refer to the [alerts reference](./alerts.md#codeintel-uploads-codeintel-commit-graph-queued-max-age) for 1 alert related to this panel.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100502` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100602` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21214,7 +21368,7 @@ Number of repositories scanned for data retention
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100600` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100700` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21235,7 +21389,7 @@ Number of codeintel upload records scanned for data retention
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100601` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100701` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21256,7 +21410,7 @@ Number of commits reachable from a codeintel upload record scanned for data rete
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100602` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100702` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>
@ -21277,7 +21431,7 @@ Number of codeintel upload records marked as expired
This panel has no related alerts.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100603` on your Sourcegraph instance.
To see this panel, visit `/-/debug/grafana/d/codeintel-uploads/codeintel-uploads?viewPanel=100703` on your Sourcegraph instance.
<sub>*Managed by the [Sourcegraph Code intelligence team](https://handbook.sourcegraph.com/departments/engineering/teams/code-intelligence).*</sub>

View File

@ -16,6 +16,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/codeintel"
codeintelshared "github.com/sourcegraph/sourcegraph/internal/codeintel/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/shared/lsifuploadstore"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
"github.com/sourcegraph/sourcegraph/internal/database"
@ -116,7 +117,7 @@ func main() {
}
// Initialize worker
worker := services.UploadsService.NewWorker(
worker := uploads.GetBackgroundJob(services.UploadsService).NewWorker(
uploadStore,
config.WorkerConcurrency,
config.WorkerBudget,

View File

@ -7,6 +7,7 @@ import (
"github.com/sourcegraph/sourcegraph/cmd/worker/job"
"github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/codeintel"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/background/commitgraph"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
@ -34,5 +35,5 @@ func (j *commitGraphUpdaterJob) Routines(startupCtx context.Context, logger log.
return nil, err
}
return commitgraph.NewUpdater(services.UploadsService), nil
return commitgraph.NewUpdater(uploads.GetBackgroundJob(services.UploadsService)), nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/worker/internal/executorqueue"
"github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/observation"
@ -42,7 +43,7 @@ func (j *metricsReporterJob) Routines(startupCtx context.Context, logger log.Log
logger.Scoped("routines", "metrics reporting routines"),
)
services.UploadsService.MetricReporters(observationContext)
uploads.GetBackgroundJob(services.UploadsService).SetMetricReporters(observationContext)
dbworker.InitPrometheusMetric(observationContext, autoindexing.GetDependencySyncStore(services.AutoIndexingService), "codeintel", "dependency_sync", nil)
dbworker.InitPrometheusMetric(observationContext, autoindexing.GetDependencyIndexingStore(services.AutoIndexingService), "codeintel", "dependency_index", nil)

View File

@ -8,6 +8,7 @@ import (
"github.com/sourcegraph/sourcegraph/cmd/worker/job"
"github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/codeintel"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/background/backfill"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
@ -35,5 +36,5 @@ func (j *uploadBackfillerJob) Routines(startupCtx context.Context, logger log.Lo
return nil, err
}
return backfill.NewCommittedAtBackfiller(services.UploadsService), nil
return backfill.NewCommittedAtBackfiller(uploads.GetBackgroundJob(services.UploadsService)), nil
}

View File

@ -8,6 +8,7 @@ import (
"github.com/sourcegraph/sourcegraph/cmd/worker/job"
"github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/codeintel"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/background/expiration"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
@ -35,5 +36,5 @@ func (j *uploadExpirerJob) Routines(startupCtx context.Context, logger log.Logge
return nil, err
}
return expiration.NewExpirationTasks(services.UploadsService), nil
return expiration.NewExpirationTasks(uploads.GetBackgroundJob(services.UploadsService)), nil
}

View File

@ -7,6 +7,7 @@ import (
"github.com/sourcegraph/sourcegraph/cmd/worker/job"
"github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/codeintel"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/background/cleanup"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
@ -35,7 +36,7 @@ func (j *uploadJanitorJob) Routines(startupCtx context.Context, logger log.Logge
}
return append(
cleanup.NewJanitor(services.UploadsService),
cleanup.NewResetters(services.UploadsService)...,
cleanup.NewJanitor(uploads.GetBackgroundJob(services.UploadsService)),
cleanup.NewResetters(uploads.GetBackgroundJob(services.UploadsService))...,
), nil
}

View File

@ -1,70 +0,0 @@
package uploads
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (s *Service) NewCommittedAtBackfiller(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.backfillCommittedAtBatch(ctx, batchSize)
}))
}
// backfillCommittedAtBatch calculates the commit dates for a batch of upload records that do not have this value
// set. This method is used to backfill old upload records prior to this value being reliably set during processing.
func (s *Service) backfillCommittedAtBatch(ctx context.Context, batchSize int) (err error) {
tx, err := s.store.Transact(ctx)
defer func() {
err = tx.Done(err)
}()
batch, err := tx.SourcedCommitsWithoutCommittedAt(ctx, batchSize)
if err != nil {
return errors.Wrap(err, "store.SourcedCommitsWithoutCommittedAt")
}
for _, sourcedCommits := range batch {
for _, commit := range sourcedCommits.Commits {
commitDateString, err := s.getCommitDate(ctx, sourcedCommits.RepositoryID, commit)
if err != nil {
return err
}
// Update commit date of all uploads attached to this this repository and commit
if err := tx.UpdateCommittedAt(ctx, sourcedCommits.RepositoryID, commit, commitDateString); err != nil {
return errors.Wrap(err, "store.UpdateCommittedAt")
}
}
// Mark repository as dirty so the commit graph is recalculated with fresh data
if err := tx.SetRepositoryAsDirty(ctx, sourcedCommits.RepositoryID); err != nil {
return errors.Wrap(err, "store.SetRepositoryAsDirty")
}
}
return nil
}
func (s *Service) getCommitDate(ctx context.Context, repositoryID int, commit string) (string, error) {
_, commitDate, revisionExists, err := s.gitserverClient.CommitDate(ctx, repositoryID, commit)
if err != nil {
return "", errors.Wrap(err, "gitserver.CommitDate")
}
var commitDateString string
if revisionExists {
commitDateString = commitDate.Format(time.RFC3339)
} else {
// Set a value here that we'll filter out on the query side so that we don't
// reprocess the same failing batch infinitely. We could alternatively soft
// delete the record, but it would be better to keep record deletion behavior
// together in the same place (so we have unified metrics on that event).
commitDateString = "-infinity"
}
return commitDateString, nil
}

View File

@ -6,7 +6,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
type UploadService interface {
type UploadServiceBackgroundJobs interface {
NewCommittedAtBackfiller(
interval time.Duration,
batchSize int,

View File

@ -4,9 +4,9 @@ import (
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func NewCommittedAtBackfiller(uploadSvc UploadService) []goroutine.BackgroundRoutine {
func NewCommittedAtBackfiller(backgroundJobs UploadServiceBackgroundJobs) []goroutine.BackgroundRoutine {
return []goroutine.BackgroundRoutine{
uploadSvc.NewCommittedAtBackfiller(
backgroundJobs.NewCommittedAtBackfiller(
ConfigInst.Interval,
ConfigInst.BatchSize,
),

View File

@ -7,7 +7,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
)
type UploadService interface {
type UploadServiceBackgroundJobs interface {
NewJanitor(
interval time.Duration,
uploadTimeout time.Duration,

View File

@ -4,9 +4,9 @@ import (
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func NewJanitor(uploadSvc UploadService) []goroutine.BackgroundRoutine {
func NewJanitor(backgroundJobs UploadServiceBackgroundJobs) []goroutine.BackgroundRoutine {
return []goroutine.BackgroundRoutine{
uploadSvc.NewJanitor(
backgroundJobs.NewJanitor(
ConfigInst.Interval,
ConfigInst.UploadTimeout,
ConfigInst.AuditLogMaxAge,
@ -17,8 +17,8 @@ func NewJanitor(uploadSvc UploadService) []goroutine.BackgroundRoutine {
}
}
func NewResetters(uploadSvc UploadService) []goroutine.BackgroundRoutine {
func NewResetters(backgroundJobs UploadServiceBackgroundJobs) []goroutine.BackgroundRoutine {
return []goroutine.BackgroundRoutine{
uploadSvc.NewUploadResetter(ConfigInst.Interval),
backgroundJobs.NewUploadResetter(ConfigInst.Interval),
}
}

View File

@ -6,7 +6,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
type UploadService interface {
type UploadServiceBackgroundJobs interface {
NewCommitGraphUpdater(
interval time.Duration,
maxAgeForNonStaleBranches time.Duration,

View File

@ -4,9 +4,9 @@ import (
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func NewUpdater(uploadSvc UploadService) []goroutine.BackgroundRoutine {
func NewUpdater(backgroundJobs UploadServiceBackgroundJobs) []goroutine.BackgroundRoutine {
return []goroutine.BackgroundRoutine{
uploadSvc.NewCommitGraphUpdater(
backgroundJobs.NewCommitGraphUpdater(
ConfigInst.CommitGraphUpdateTaskInterval,
ConfigInst.MaxAgeForNonStaleBranches,
ConfigInst.MaxAgeForNonStaleTags,

View File

@ -6,7 +6,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
type UploadService interface {
type UploadServiceBackgroundJobs interface {
NewUploadExpirer(
interval time.Duration,
repositoryProcessDelay time.Duration,

View File

@ -4,9 +4,9 @@ import (
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func NewExpirationTasks(uploadSvc UploadService) []goroutine.BackgroundRoutine {
func NewExpirationTasks(backgroundJobs UploadServiceBackgroundJobs) []goroutine.BackgroundRoutine {
return []goroutine.BackgroundRoutine{
uploadSvc.NewUploadExpirer(
backgroundJobs.NewUploadExpirer(
ConfigInst.ExpirerInterval,
ConfigInst.RepositoryProcessDelay,
ConfigInst.RepositoryBatchSize,
@ -15,7 +15,7 @@ func NewExpirationTasks(uploadSvc UploadService) []goroutine.BackgroundRoutine {
ConfigInst.CommitBatchSize,
ConfigInst.PolicyBatchSize,
),
uploadSvc.NewReferenceCountUpdater(
backgroundJobs.NewReferenceCountUpdater(
ConfigInst.ReferenceCountUpdaterInterval,
ConfigInst.ReferenceCountUpdaterBatchSize,
),

View File

@ -1,133 +0,0 @@
package uploads
import (
"context"
"time"
"github.com/opentracing/opentracing-go/log"
gitserverOptions "github.com/sourcegraph/sourcegraph/internal/gitserver"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (s *Service) NewCommitGraphUpdater(interval time.Duration, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.updateAllDirtyCommitGraphs(ctx, maxAgeForNonStaleBranches, maxAgeForNonStaleTags)
}))
}
// Handle periodically re-calculates the commit and upload visibility graph for repositories
// that are marked as dirty by the worker process. This is done out-of-band from the rest of
// the upload processing as it is likely that we are processing multiple uploads concurrently
// for the same repository and should not repeat the work since the last calculation performed
// will always be the one we want.
func (s *Service) updateAllDirtyCommitGraphs(ctx context.Context, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) (err error) {
repositoryIDs, err := s.GetDirtyRepositories(ctx)
if err != nil {
return errors.Wrap(err, "uploadSvc.DirtyRepositories")
}
var updateErr error
for repositoryID, dirtyFlag := range repositoryIDs {
if err := s.lockAndUpdateUploadsVisibleToCommits(ctx, repositoryID, dirtyFlag, maxAgeForNonStaleBranches, maxAgeForNonStaleTags); err != nil {
if updateErr == nil {
updateErr = err
} else {
updateErr = errors.Append(updateErr, err)
}
}
}
return updateErr
}
// lockAndUpdateUploadsVisibleToCommits will call UpdateUploadsVisibleToCommits while holding an advisory lock to give exclusive access to the
// update procedure for this repository. If the lock is already held, this method will simply do nothing.
func (s *Service) lockAndUpdateUploadsVisibleToCommits(ctx context.Context, repositoryID, dirtyToken int, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) (err error) {
ctx, trace, endObservation := s.operations.updateUploadsVisibleToCommits.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.Int("repositoryID", repositoryID),
log.Int("dirtyToken", dirtyToken),
},
})
defer endObservation(1, observation.Args{})
ok, unlock, err := s.locker.Lock(ctx, int32(repositoryID), false)
if err != nil || !ok {
return errors.Wrap(err, "locker.Lock")
}
defer func() {
err = unlock(err)
}()
// The following process pulls the commit graph for the given repository from gitserver, pulls the set of LSIF
// upload objects for the given repository from Postgres, and correlates them into a visibility
// graph. This graph is then upserted back into Postgres for use by find closest dumps queries.
//
// The user should supply a dirty token that is associated with the given repository so that
// the repository can be unmarked as long as the repository is not marked as dirty again before
// the update completes.
// Construct a view of the git graph that we will later decorate with upload information.
commitGraph, err := s.getCommitGraph(ctx, repositoryID)
if err != nil {
return err
}
trace.Log(log.Int("numCommitGraphKeys", len(commitGraph.Order())))
refDescriptions, err := s.gitserverClient.RefDescriptions(ctx, repositoryID)
if err != nil {
return errors.Wrap(err, "gitserver.RefDescriptions")
}
trace.Log(log.Int("numRefDescriptions", len(refDescriptions)))
// Decorate the commit graph with the set of processed uploads are visible from each commit,
// then bulk update the denormalized view in Postgres. We call this with an empty graph as well
// so that we end up clearing the stale data and bulk inserting nothing.
if err := s.store.UpdateUploadsVisibleToCommits(ctx, repositoryID, commitGraph, refDescriptions, maxAgeForNonStaleBranches, maxAgeForNonStaleTags, dirtyToken, time.Time{}); err != nil {
return errors.Wrap(err, "uploadSvc.UpdateUploadsVisibleToCommits")
}
return nil
}
// getCommitGraph builds a partial commit graph that includes the most recent commits on each branch
// extending back as as the date of the oldest commit for which we have a processed upload for this
// repository.
//
// This optimization is necessary as decorating the commit graph is an operation that scales with
// the size of both the git graph and the number of uploads (multiplicatively). For repositories with
// a very large number of commits or distinct roots (most monorepos) this is a necessary optimization.
//
// The number of commits pulled back here should not grow over time unless the repo is growing at an
// accelerating rate, as we routinely expire old information for active repositories in a janitor
// process.
func (s *Service) getCommitGraph(ctx context.Context, repositoryID int) (*gitdomain.CommitGraph, error) {
commitDate, ok, err := s.store.GetOldestCommitDate(ctx, repositoryID)
if err != nil {
return nil, err
}
if !ok {
// No uploads exist for this repository
return gitdomain.ParseCommitGraph(nil), nil
}
// The --since flag for git log is exclusive, but we want to include the commit where the
// oldest dump is defined. This flag only has second resolution, so we shouldn't be pulling
// back any more data than we wanted.
commitDate = commitDate.Add(-time.Second)
commitGraph, err := s.gitserverClient.CommitGraph(ctx, repositoryID, gitserverOptions.CommitGraphOptions{
AllRefs: true,
Since: &commitDate,
})
if err != nil {
return nil, errors.Wrap(err, "gitserver.CommitGraph")
}
return commitGraph, nil
}

View File

@ -9,29 +9,20 @@ import (
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/authz"
sharedIndexes "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/shared"
policies "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
policiesshared "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared"
codeintelgitserver "github.com/sourcegraph/sourcegraph/internal/codeintel/shared/gitserver"
codeinteltypes "github.com/sourcegraph/sourcegraph/internal/codeintel/shared/types"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/shared"
sharedUploads "github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/shared"
"github.com/sourcegraph/sourcegraph/internal/database/locker"
"github.com/sourcegraph/sourcegraph/internal/gitserver"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
"github.com/sourcegraph/sourcegraph/internal/gitserver/protocol"
"github.com/sourcegraph/sourcegraph/internal/types"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
type Locker interface {
Lock(ctx context.Context, key int32, blocking bool) (bool, locker.UnlockFunc, error)
}
type CommitCache interface {
ExistsBatch(ctx context.Context, commits []codeintelgitserver.RepositoryCommit) ([]bool, error)
}
type GitserverClient interface {
CommitGraph(ctx context.Context, repositoryID int, opts gitserver.CommitGraphOptions) (_ *gitdomain.CommitGraph, err error)
RefDescriptions(ctx context.Context, repositoryID int, pointedAt ...string) (_ map[string][]gitdomain.RefDescription, err error)
@ -58,19 +49,6 @@ type RepoStore interface {
ResolveRev(ctx context.Context, repo *types.Repo, rev string) (api.CommitID, error)
}
type UploadServiceForExpiration interface {
// Uploads
GetUploads(ctx context.Context, opts shared.GetUploadsOptions) (uploads []codeinteltypes.Upload, totalCount int, err error)
UpdateUploadRetention(ctx context.Context, protectedIDs, expiredIDs []int) (err error)
BackfillReferenceCountBatch(ctx context.Context, batchSize int) error
// Commits
GetCommitsVisibleToUpload(ctx context.Context, uploadID, limit int, token *string) (_ []string, nextToken *string, err error)
// Repositories
SetRepositoriesForRetentionScan(ctx context.Context, processDelay time.Duration, limit int) (_ []int, err error)
}
type PolicyService interface {
GetConfigurationPolicies(ctx context.Context, opts policiesshared.GetConfigurationPoliciesOptions) ([]codeinteltypes.ConfigurationPolicy, int, error)
}
@ -78,31 +56,3 @@ type PolicyService interface {
type PolicyMatcher interface {
CommitsDescribedByPolicy(ctx context.Context, repositoryID int, policies []codeinteltypes.ConfigurationPolicy, now time.Time, filterCommits ...string) (map[string][]policies.PolicyMatch, error)
}
type UploadServiceForCleanup interface {
GetStaleSourcedCommits(ctx context.Context, threshold time.Duration, limit int, now time.Time) ([]sharedUploads.SourcedCommits, error)
DeleteSourcedCommits(ctx context.Context, repositoryID int, commit string, maximumCommitLag time.Duration, now time.Time) (int, int, error)
UpdateSourcedCommits(ctx context.Context, repositoryID int, commit string, now time.Time) (int, error)
DeleteUploadsWithoutRepository(ctx context.Context, now time.Time) (map[int]int, error)
DeleteUploadsStuckUploading(ctx context.Context, uploadedBefore time.Time) (int, error)
SoftDeleteExpiredUploads(ctx context.Context) (int, error)
HardDeleteExpiredUploads(ctx context.Context) (int, error)
DeleteOldAuditLogs(ctx context.Context, maxAge time.Duration, now time.Time) (int, error)
// Workerutil
WorkerutilStore() dbworkerstore.Store
}
type AutoIndexingService interface {
GetStaleSourcedCommits(ctx context.Context, threshold time.Duration, limit int, now time.Time) ([]sharedIndexes.SourcedCommits, error)
DeleteSourcedCommits(ctx context.Context, repositoryID int, commit string, maximumCommitLag time.Duration, now time.Time) (int, error)
UpdateSourcedCommits(ctx context.Context, repositoryID int, commit string, now time.Time) (int, error)
DeleteIndexesWithoutRepository(ctx context.Context, now time.Time) (map[int]int, error)
}
type RepoUpdaterClient interface {
EnqueueRepoUpdate(ctx context.Context, repo api.RepoName) (*protocol.RepoUpdateResponse, error)
}

View File

@ -5,6 +5,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/codeintel/policies"
policiesEnterprise "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
codeintelshared "github.com/sourcegraph/sourcegraph/internal/codeintel/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/background"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/lsifstore"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/store"
"github.com/sourcegraph/sourcegraph/internal/database"
@ -43,6 +44,7 @@ var initServiceMemo = memo.NewMemoizedConstructorWithArg(func(deps serviceDepend
policyMatcher := policiesEnterprise.NewMatcher(deps.gsc, policiesEnterprise.RetentionExtractor, true, false)
locker := locker.NewWith(deps.db, "codeintel")
backgroundJobs := background.New(deps.db, deps.gsc, scopedContext("background"))
svc := newService(
store,
repoStore,
@ -51,10 +53,12 @@ var initServiceMemo = memo.NewMemoizedConstructorWithArg(func(deps serviceDepend
nil, // written in circular fashion
policyMatcher,
locker,
backgroundJobs,
scopedContext("service"),
)
svc.policySvc = policies.GetService(deps.db, svc, deps.gsc)
backgroundJobs.SetUploadsService(svc)
return svc, nil
})

View File

@ -0,0 +1,90 @@
package background
import (
"time"
"github.com/derision-test/glock"
logger "github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
)
type BackgroundJob interface {
NewCommittedAtBackfiller(interval time.Duration, batchSize int) goroutine.BackgroundRoutine
NewUploadResetter(interval time.Duration) *dbworker.Resetter
NewReferenceCountUpdater(interval time.Duration, batchSize int) goroutine.BackgroundRoutine
NewCommitGraphUpdater(
interval time.Duration,
maxAgeForNonStaleBranches time.Duration,
maxAgeForNonStaleTags time.Duration,
) goroutine.BackgroundRoutine
NewJanitor(
interval time.Duration,
uploadTimeout time.Duration,
auditLogMaxAge time.Duration,
minimumTimeSinceLastCheck time.Duration,
commitResolverBatchSize int,
commitResolverMaximumCommitLag time.Duration,
) goroutine.BackgroundRoutine
NewWorker(
uploadStore uploadstore.Store,
workerConcurrency int,
workerBudget int64,
workerPollInterval time.Duration,
maximumRuntimePerJob time.Duration,
) *workerutil.Worker
NewUploadExpirer(
interval time.Duration,
repositoryProcessDelay time.Duration,
repositoryBatchSize int,
uploadProcessDelay time.Duration,
uploadBatchSize int,
commitBatchSize int,
policyBatchSize int,
) goroutine.BackgroundRoutine
SetUploadsService(s UploadService)
SetMetricReporters(observationContext *observation.Context)
}
type backgroundJob struct {
uploadSvc UploadService
gitserverClient GitserverClient
janitorMetrics *janitorMetrics
resetterMetrics *resetterMetrics
workerMetrics workerutil.WorkerObservability
expirationMetrics *ExpirationMetrics
clock glock.Clock
logger logger.Logger
operations *operations
}
func New(db database.DB, gsc GitserverClient, observationContext *observation.Context) BackgroundJob {
return &backgroundJob{
gitserverClient: gsc,
janitorMetrics: newJanitorMetrics(observationContext),
resetterMetrics: newResetterMetrics(observationContext),
workerMetrics: workerutil.NewMetrics(observationContext, "codeintel_upload_processor", workerutil.WithSampler(func(job workerutil.Record) bool { return true })),
expirationMetrics: NewExpirationMetrics(observationContext),
clock: glock.NewRealClock(),
logger: observationContext.Logger,
operations: newOperations(observationContext),
}
}
func (b *backgroundJob) SetUploadsService(s UploadService) {
b.uploadSvc = s
}

View File

@ -0,0 +1,79 @@
package background
import (
"context"
"io"
"time"
"github.com/grafana/regexp"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/authz"
"github.com/sourcegraph/sourcegraph/internal/codeintel/shared/types"
codeinteltypes "github.com/sourcegraph/sourcegraph/internal/codeintel/shared/types"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/shared"
"github.com/sourcegraph/sourcegraph/internal/gitserver"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
"github.com/sourcegraph/sourcegraph/internal/gitserver/protocol"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
type UploadService interface {
// Commits
GetStaleSourcedCommits(ctx context.Context, minimumTimeSinceLastCheck time.Duration, limit int, now time.Time) (_ []shared.SourcedCommits, err error)
DeleteSourcedCommits(ctx context.Context, repositoryID int, commit string, maximumCommitLag time.Duration, now time.Time) (uploadsUpdated int, uploadsDeleted int, err error)
UpdateSourcedCommits(ctx context.Context, repositoryID int, commit string, now time.Time) (uploadsUpdated int, err error)
BackfillCommittedAtBatch(ctx context.Context, batchSize int) (err error)
// Uploads
GetUploads(ctx context.Context, opts shared.GetUploadsOptions) (uploads []types.Upload, totalCount int, err error)
SoftDeleteExpiredUploads(ctx context.Context) (int, error)
DeleteUploadsWithoutRepository(ctx context.Context, now time.Time) (_ map[int]int, err error)
DeleteUploadsStuckUploading(ctx context.Context, uploadedBefore time.Time) (_ int, err error)
DeleteLsifDataByUploadIds(ctx context.Context, bundleIDs ...int) (err error)
HardDeleteUploadsByIDs(ctx context.Context, ids ...int) error
HandleRawUpload(ctx context.Context, logger log.Logger, upload codeinteltypes.Upload, uploadStore uploadstore.Store, trace observation.TraceLogger) (requeued bool, err error)
HandleExpiredUploadsBatch(ctx context.Context, metrics *ExpirationMetrics, cfg ExpirerConfig) (err error)
// Commitgraph
UpdateAllDirtyCommitGraphs(ctx context.Context, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) (err error)
// Repositories
GetDirtyRepositories(ctx context.Context) (_ map[int]int, err error)
GetRepositoriesMaxStaleAge(ctx context.Context) (_ time.Duration, err error)
SetRepositoriesForRetentionScan(ctx context.Context, processDelay time.Duration, limit int) (_ []int, err error)
// References
BackfillReferenceCountBatch(ctx context.Context, batchSize int) error
// Audit logs
DeleteOldAuditLogs(ctx context.Context, maxAge time.Duration, now time.Time) (count int, err error)
// Utils
GetWorkerutilStore() dbworkerstore.Store
}
type GitserverClient interface {
CommitGraph(ctx context.Context, repositoryID int, opts gitserver.CommitGraphOptions) (_ *gitdomain.CommitGraph, err error)
RefDescriptions(ctx context.Context, repositoryID int, pointedAt ...string) (_ map[string][]gitdomain.RefDescription, err error)
ListTags(ctx context.Context, repo api.RepoName, commitObjs ...string) (_ []*gitdomain.Tag, err error)
DirectoryChildren(ctx context.Context, repositoryID int, commit string, dirnames []string) (map[string][]string, error)
CommitDate(ctx context.Context, repositoryID int, commit string) (string, time.Time, bool, error)
ResolveRevision(ctx context.Context, repositoryID int, versionString string) (api.CommitID, error)
DefaultBranchContains(ctx context.Context, repositoryID int, commit string) (bool, error)
CommitsUniqueToBranch(ctx context.Context, repositoryID int, branchName string, isDefaultBranch bool, maxAge *time.Time) (map[string]time.Time, error)
Head(ctx context.Context, repositoryID int) (string, bool, error)
CommitExists(ctx context.Context, repositoryID int, commit string) (bool, error)
ListFiles(ctx context.Context, repositoryID int, commit string, pattern *regexp.Regexp) ([]string, error)
FileExists(ctx context.Context, repositoryID int, commit, file string) (bool, error)
RawContents(ctx context.Context, repositoryID int, commit, file string) ([]byte, error)
ArchiveReader(ctx context.Context, checker authz.SubRepoPermissionChecker, repo api.RepoName, options gitserver.ArchiveOptions) (io.ReadCloser, error)
RequestRepoUpdate(context.Context, api.RepoName, time.Duration) (*protocol.RepoUpdateResponse, error)
}

View File

@ -1,4 +1,4 @@
package uploads
package background
import (
"context"
@ -7,8 +7,8 @@ import (
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func (s *Service) NewReferenceCountUpdater(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
func (b backgroundJob) NewCommittedAtBackfiller(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.store.BackfillReferenceCountBatch(ctx, batchSize)
return b.uploadSvc.BackfillCommittedAtBatch(ctx, batchSize)
}))
}

View File

@ -1,4 +1,4 @@
package uploads
package background
import (
"context"
@ -22,7 +22,7 @@ type janitorConfig struct {
commitResolverMaximumCommitLag time.Duration
}
func (s *Service) NewJanitor(
func (b backgroundJob) NewJanitor(
interval time.Duration,
uploadTimeout time.Duration,
auditLogMaxAge time.Duration,
@ -31,7 +31,7 @@ func (s *Service) NewJanitor(
commitResolverMaximumCommitLag time.Duration,
) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.handleCleanup(ctx, janitorConfig{
return b.handleCleanup(ctx, janitorConfig{
uploadTimeout: uploadTimeout,
auditLogMaxAge: auditLogMaxAge,
minimumTimeSinceLastCheck: minimumTimeSinceLastCheck,
@ -41,46 +41,46 @@ func (s *Service) NewJanitor(
}))
}
func (s *Service) handleCleanup(ctx context.Context, cfg janitorConfig) (errs error) {
func (b backgroundJob) handleCleanup(ctx context.Context, cfg janitorConfig) (errs error) {
// Reconciliation and denormalization
if err := s.handleDeletedRepository(ctx); err != nil {
if err := b.handleDeletedRepository(ctx); err != nil {
errs = errors.Append(errs, err)
}
if err := s.handleUnknownCommit(ctx, cfg); err != nil {
if err := b.handleUnknownCommit(ctx, cfg); err != nil {
errs = errors.Append(errs, err)
}
// Expiration
if err := s.handleAbandonedUpload(ctx, cfg); err != nil {
if err := b.handleAbandonedUpload(ctx, cfg); err != nil {
errs = errors.Append(errs, err)
}
if err := s.handleExpiredUploadDeleter(ctx); err != nil {
if err := b.handleExpiredUploadDeleter(ctx); err != nil {
errs = errors.Append(errs, err)
}
if err := s.handleHardDeleter(ctx); err != nil {
if err := b.handleHardDeleter(ctx); err != nil {
errs = errors.Append(errs, err)
}
if err := s.handleAuditLog(ctx, cfg); err != nil {
if err := b.handleAuditLog(ctx, cfg); err != nil {
errs = errors.Append(errs, err)
}
return errs
}
func (s *Service) handleDeletedRepository(ctx context.Context) (err error) {
uploadsCounts, err := s.store.DeleteUploadsWithoutRepository(ctx, time.Now())
func (b backgroundJob) handleDeletedRepository(ctx context.Context) (err error) {
uploadsCounts, err := b.uploadSvc.DeleteUploadsWithoutRepository(ctx, time.Now())
if err != nil {
return errors.Wrap(err, "uploadSvc.DeleteUploadsWithoutRepository")
}
for _, counts := range gatherCounts(uploadsCounts) {
s.logger.Debug(
b.logger.Debug(
"Deleted codeintel records with a deleted repository",
log.Int("repository_id", counts.repoID),
log.Int("uploads_count", counts.uploadsCount),
)
s.janitorMetrics.numUploadRecordsRemoved.Add(float64(counts.uploadsCount))
b.janitorMetrics.numUploadRecordsRemoved.Add(float64(counts.uploadsCount))
}
return nil
@ -114,14 +114,14 @@ func gatherCounts(uploadsCounts map[int]int) []recordCount {
return recordCounts
}
func (s *Service) handleUnknownCommit(ctx context.Context, cfg janitorConfig) (err error) {
staleUploads, err := s.store.GetStaleSourcedCommits(ctx, cfg.minimumTimeSinceLastCheck, cfg.commitResolverBatchSize, s.clock.Now())
func (b backgroundJob) handleUnknownCommit(ctx context.Context, cfg janitorConfig) (err error) {
staleUploads, err := b.uploadSvc.GetStaleSourcedCommits(ctx, cfg.minimumTimeSinceLastCheck, cfg.commitResolverBatchSize, b.clock.Now())
if err != nil {
return errors.Wrap(err, "uploadSvc.StaleSourcedCommits")
}
for _, sourcedCommits := range staleUploads {
if err := s.handleSourcedCommits(ctx, sourcedCommits, cfg); err != nil {
if err := b.handleSourcedCommits(ctx, sourcedCommits, cfg); err != nil {
return err
}
}
@ -129,9 +129,9 @@ func (s *Service) handleUnknownCommit(ctx context.Context, cfg janitorConfig) (e
return nil
}
func (s *Service) handleSourcedCommits(ctx context.Context, sc shared.SourcedCommits, cfg janitorConfig) error {
func (b backgroundJob) handleSourcedCommits(ctx context.Context, sc shared.SourcedCommits, cfg janitorConfig) error {
for _, commit := range sc.Commits {
if err := s.handleCommit(ctx, sc.RepositoryID, sc.RepositoryName, commit, cfg); err != nil {
if err := b.handleCommit(ctx, sc.RepositoryID, sc.RepositoryName, commit, cfg); err != nil {
return err
}
}
@ -139,9 +139,9 @@ func (s *Service) handleSourcedCommits(ctx context.Context, sc shared.SourcedCom
return nil
}
func (s *Service) handleCommit(ctx context.Context, repositoryID int, repositoryName, commit string, cfg janitorConfig) error {
func (b backgroundJob) handleCommit(ctx context.Context, repositoryID int, repositoryName, commit string, cfg janitorConfig) error {
var shouldDelete bool
_, err := s.gitserverClient.ResolveRevision(ctx, repositoryID, commit)
_, err := b.gitserverClient.ResolveRevision(ctx, repositoryID, commit)
if err == nil {
// If we have no error then the commit is resolvable and we shouldn't touch it.
shouldDelete = false
@ -161,19 +161,18 @@ func (s *Service) handleCommit(ctx context.Context, repositoryID int, repository
}
if shouldDelete {
_, uploadsDeleted, err := s.store.DeleteSourcedCommits(ctx, repositoryID, commit, cfg.commitResolverMaximumCommitLag, s.clock.Now())
_, uploadsDeleted, err := b.uploadSvc.DeleteSourcedCommits(ctx, repositoryID, commit, cfg.commitResolverMaximumCommitLag, b.clock.Now())
if err != nil {
return errors.Wrap(err, "uploadSvc.DeleteSourcedCommits")
}
if uploadsDeleted > 0 {
// log.Debug("Deleted upload records with unresolvable commits", "count", uploadsDeleted)
s.janitorMetrics.numUploadRecordsRemoved.Add(float64(uploadsDeleted))
b.janitorMetrics.numUploadRecordsRemoved.Add(float64(uploadsDeleted))
}
return nil
}
if _, err := s.store.UpdateSourcedCommits(ctx, repositoryID, commit, s.clock.Now()); err != nil {
if _, err := b.uploadSvc.UpdateSourcedCommits(ctx, repositoryID, commit, b.clock.Now()); err != nil {
return errors.Wrap(err, "uploadSvc.UpdateSourcedCommits")
}
@ -181,43 +180,43 @@ func (s *Service) handleCommit(ctx context.Context, repositoryID int, repository
}
// handleAbandonedUpload removes upload records which have not left the uploading state within the given TTL.
func (s *Service) handleAbandonedUpload(ctx context.Context, cfg janitorConfig) error {
count, err := s.store.DeleteUploadsStuckUploading(ctx, time.Now().UTC().Add(-cfg.uploadTimeout))
func (b backgroundJob) handleAbandonedUpload(ctx context.Context, cfg janitorConfig) error {
count, err := b.uploadSvc.DeleteUploadsStuckUploading(ctx, time.Now().UTC().Add(-cfg.uploadTimeout))
if err != nil {
return errors.Wrap(err, "dbstore.DeleteUploadsStuckUploading")
return errors.Wrap(err, "uploadSvc.DeleteUploadsStuckUploading")
}
if count > 0 {
s.logger.Debug("Deleted abandoned upload records", log.Int("count", count))
s.janitorMetrics.numUploadRecordsRemoved.Add(float64(count))
b.logger.Debug("Deleted abandoned upload records", log.Int("count", count))
b.janitorMetrics.numUploadRecordsRemoved.Add(float64(count))
}
return nil
}
func (s *Service) handleExpiredUploadDeleter(ctx context.Context) error {
count, err := s.store.SoftDeleteExpiredUploads(ctx)
func (b backgroundJob) handleExpiredUploadDeleter(ctx context.Context) error {
count, err := b.uploadSvc.SoftDeleteExpiredUploads(ctx)
if err != nil {
return errors.Wrap(err, "SoftDeleteExpiredUploads")
}
if count > 0 {
s.logger.Info("Deleted expired codeintel uploads", log.Int("count", count))
s.janitorMetrics.numUploadRecordsRemoved.Add(float64(count))
b.logger.Info("Deleted expired codeintel uploads", log.Int("count", count))
b.janitorMetrics.numUploadRecordsRemoved.Add(float64(count))
}
return nil
}
func (s *Service) handleHardDeleter(ctx context.Context) error {
count, err := s.hardDeleteExpiredUploads(ctx)
func (b backgroundJob) handleHardDeleter(ctx context.Context) error {
count, err := b.hardDeleteExpiredUploads(ctx)
if err != nil {
return errors.Wrap(err, "uploadSvc.HardDeleteExpiredUploads")
}
s.janitorMetrics.numUploadsPurged.Add(float64(count))
b.janitorMetrics.numUploadsPurged.Add(float64(count))
return nil
}
func (s *Service) hardDeleteExpiredUploads(ctx context.Context) (count int, err error) {
func (b backgroundJob) hardDeleteExpiredUploads(ctx context.Context) (count int, err error) {
const uploadsBatchSize = 100
options := shared.GetUploadsOptions{
State: "deleted",
@ -231,17 +230,17 @@ func (s *Service) hardDeleteExpiredUploads(ctx context.Context) (count int, err
// the first iteration of the loop, then the previous iteration has
// deleted the records that composed the previous page, and the
// previous "second" page is now the first page.
uploads, totalCount, err := s.store.GetUploads(ctx, options)
uploads, totalCount, err := b.uploadSvc.GetUploads(ctx, options)
if err != nil {
return 0, errors.Wrap(err, "store.GetUploads")
}
ids := uploadIDs(uploads)
if err := s.lsifstore.DeleteLsifDataByUploadIds(ctx, ids...); err != nil {
if err := b.uploadSvc.DeleteLsifDataByUploadIds(ctx, ids...); err != nil {
return 0, errors.Wrap(err, "lsifstore.Clear")
}
if err := s.store.HardDeleteUploadsByIDs(ctx, ids...); err != nil {
if err := b.uploadSvc.HardDeleteUploadsByIDs(ctx, ids...); err != nil {
return 0, errors.Wrap(err, "store.HardDeleteUploadsByIDs")
}
@ -254,13 +253,13 @@ func (s *Service) hardDeleteExpiredUploads(ctx context.Context) (count int, err
return count, nil
}
func (s *Service) handleAuditLog(ctx context.Context, cfg janitorConfig) (err error) {
count, err := s.store.DeleteOldAuditLogs(ctx, cfg.auditLogMaxAge, time.Now())
func (b backgroundJob) handleAuditLog(ctx context.Context, cfg janitorConfig) (err error) {
count, err := b.uploadSvc.DeleteOldAuditLogs(ctx, cfg.auditLogMaxAge, time.Now())
if err != nil {
return errors.Wrap(err, "dbstore.DeleteOldAuditLogs")
return errors.Wrap(err, "uploadSvc.DeleteOldAuditLogs")
}
s.janitorMetrics.numAuditLogRecordsExpired.Add(float64(count))
b.janitorMetrics.numAuditLogRecordsExpired.Add(float64(count))
return nil
}

View File

@ -1,4 +1,4 @@
package uploads
package background
import (
"context"
@ -98,16 +98,13 @@ func testUnknownCommitsJanitor(t *testing.T, resolveRevisionFunc func(commit str
return api.CommitID(spec), resolveRevisionFunc(spec)
})
store := NewMockStore()
lsifStore := NewMockLsifStore()
store.GetStaleSourcedCommitsFunc.SetDefaultReturn(testSourcedCommits, nil)
janitor := &Service{
store: store,
lsifstore: lsifStore,
mockUploadSvc := NewMockUploadService()
mockUploadSvc.GetStaleSourcedCommitsFunc.SetDefaultReturn(testSourcedCommits, nil)
janitor := &backgroundJob{
uploadSvc: mockUploadSvc,
gitserverClient: gitserverClient,
clock: glock.NewRealClock(),
logger: logtest.Scoped(t),
operations: newOperations(&observation.TestContext),
janitorMetrics: newJanitorMetrics(&observation.TestContext),
}
@ -123,14 +120,14 @@ func testUnknownCommitsJanitor(t *testing.T, resolveRevisionFunc func(commit str
}
var sanitizedCalls []updateInvocation
for _, call := range store.UpdateSourcedCommitsFunc.History() {
for _, call := range mockUploadSvc.UpdateSourcedCommitsFunc.History() {
sanitizedCalls = append(sanitizedCalls, updateInvocation{
RepositoryID: call.Arg1,
Commit: call.Arg2,
Delete: false,
})
}
for _, call := range store.DeleteSourcedCommitsFunc.History() {
for _, call := range mockUploadSvc.DeleteSourcedCommitsFunc.History() {
sanitizedCalls = append(sanitizedCalls, updateInvocation{
RepositoryID: call.Arg1,
Commit: call.Arg2,

View File

@ -0,0 +1,14 @@
package background
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func (b backgroundJob) NewCommitGraphUpdater(interval time.Duration, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return b.uploadSvc.UpdateAllDirtyCommitGraphs(ctx, maxAgeForNonStaleBranches, maxAgeForNonStaleTags)
}))
}

View File

@ -0,0 +1,38 @@
package background
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func (b backgroundJob) NewUploadExpirer(
interval time.Duration,
repositoryProcessDelay time.Duration,
repositoryBatchSize int,
uploadProcessDelay time.Duration,
uploadBatchSize int,
commitBatchSize int,
policyBatchSize int,
) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return b.uploadSvc.HandleExpiredUploadsBatch(ctx, b.expirationMetrics, ExpirerConfig{
RepositoryProcessDelay: repositoryProcessDelay,
RepositoryBatchSize: repositoryBatchSize,
UploadProcessDelay: uploadProcessDelay,
UploadBatchSize: uploadBatchSize,
CommitBatchSize: commitBatchSize,
PolicyBatchSize: policyBatchSize,
})
}))
}
type ExpirerConfig struct {
RepositoryProcessDelay time.Duration
RepositoryBatchSize int
UploadProcessDelay time.Duration
UploadBatchSize int
CommitBatchSize int
PolicyBatchSize int
}

View File

@ -0,0 +1,14 @@
package background
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
)
func (b backgroundJob) NewReferenceCountUpdater(interval time.Duration, batchSize int) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return b.uploadSvc.BackfillReferenceCountBatch(ctx, batchSize)
}))
}

View File

@ -1,4 +1,4 @@
package uploads
package background
import (
"time"
@ -9,14 +9,14 @@ import (
// NewUploadResetter returns a background routine that periodically resets upload
// records that are marked as being processed but are no longer being processed
// by a worker.
func (s *Service) NewUploadResetter(interval time.Duration) *dbworker.Resetter {
return dbworker.NewResetter(s.logger, s.workerutilStore, dbworker.ResetterOptions{
func (b backgroundJob) NewUploadResetter(interval time.Duration) *dbworker.Resetter {
return dbworker.NewResetter(b.logger, b.uploadSvc.GetWorkerutilStore(), dbworker.ResetterOptions{
Name: "precise_code_intel_upload_worker_resetter",
Interval: interval,
Metrics: dbworker.ResetterMetrics{
RecordResets: s.resetterMetrics.numUploadResets,
RecordResetFailures: s.resetterMetrics.numUploadResetFailures,
Errors: s.resetterMetrics.numUploadResetErrors,
RecordResets: b.resetterMetrics.numUploadResets,
RecordResetFailures: b.resetterMetrics.numUploadResetFailures,
Errors: b.resetterMetrics.numUploadResetErrors,
},
})
}

View File

@ -0,0 +1,163 @@
package background
import (
"context"
"sync/atomic"
"time"
"github.com/keegancsmith/sqlf"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/actor"
codeinteltypes "github.com/sourcegraph/sourcegraph/internal/codeintel/shared/types"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (b *backgroundJob) NewWorker(
uploadStore uploadstore.Store,
workerConcurrency int,
workerBudget int64,
workerPollInterval time.Duration,
maximumRuntimePerJob time.Duration,
) *workerutil.Worker {
rootContext := actor.WithInternalActor(context.Background())
handler := b.NewWorkerutilHandler(
uploadStore,
workerConcurrency,
workerBudget,
)
return dbworker.NewWorker(rootContext, b.uploadSvc.GetWorkerutilStore(), handler, workerutil.WorkerOptions{
Name: "precise_code_intel_upload_worker",
NumHandlers: workerConcurrency,
Interval: workerPollInterval,
HeartbeatInterval: time.Second,
Metrics: b.workerMetrics,
MaximumRuntimePerJob: maximumRuntimePerJob,
})
}
type handler struct {
uploadsSvc UploadService
uploadStore uploadstore.Store
handleOp *observation.Operation
budgetRemaining int64
enableBudget bool
// Map of upload ID to uncompressed size. Uploads are deleted before
// PostHandle, so we store it here.
// Should only contain entries for processing in-progress uploads.
uncompressedSizes map[int]uint64
uploadSizeGuage prometheus.Gauge
}
var (
_ workerutil.Handler = &handler{}
_ workerutil.WithPreDequeue = &handler{}
_ workerutil.WithHooks = &handler{}
)
func (b *backgroundJob) NewWorkerutilHandler(uploadStore uploadstore.Store, numProcessorRoutines int, budgetMax int64) workerutil.Handler {
return &handler{
uploadsSvc: b.uploadSvc,
uploadStore: uploadStore,
handleOp: b.operations.uploadProcessor,
budgetRemaining: budgetMax,
enableBudget: budgetMax > 0,
uncompressedSizes: make(map[int]uint64, numProcessorRoutines),
uploadSizeGuage: b.operations.uploadSizeGuage,
}
}
func (h *handler) Handle(ctx context.Context, logger log.Logger, record workerutil.Record) (err error) {
upload, ok := record.(codeinteltypes.Upload)
if !ok {
return errors.Newf("unexpected record type %T", record)
}
var requeued bool
ctx, otLogger, endObservation := h.handleOp.With(ctx, &err, observation.Args{})
defer func() {
endObservation(1, observation.Args{
LogFields: append(
createLogFields(upload),
otlog.Bool("requeued", requeued),
),
})
}()
requeued, err = h.uploadsSvc.HandleRawUpload(ctx, logger, upload, h.uploadStore, otLogger)
return err
}
func (h *handler) PreDequeue(ctx context.Context, logger log.Logger) (bool, any, error) {
if !h.enableBudget {
return true, nil, nil
}
budgetRemaining := atomic.LoadInt64(&h.budgetRemaining)
if budgetRemaining <= 0 {
return false, nil, nil
}
return true, []*sqlf.Query{sqlf.Sprintf("(upload_size IS NULL OR upload_size <= %s)", budgetRemaining)}, nil
}
func (h *handler) PreHandle(ctx context.Context, logger log.Logger, record workerutil.Record) {
upload, ok := record.(codeinteltypes.Upload)
if !ok {
return
}
uncompressedSize := h.getUploadSize(upload.UncompressedSize)
h.uploadSizeGuage.Add(float64(uncompressedSize))
gzipSize := h.getUploadSize(upload.UploadSize)
atomic.AddInt64(&h.budgetRemaining, -gzipSize)
}
func (h *handler) PostHandle(ctx context.Context, logger log.Logger, record workerutil.Record) {
upload, ok := record.(codeinteltypes.Upload)
if !ok {
return
}
uncompressedSize := h.getUploadSize(upload.UncompressedSize)
h.uploadSizeGuage.Sub(float64(uncompressedSize))
gzipSize := h.getUploadSize(upload.UploadSize)
atomic.AddInt64(&h.budgetRemaining, +gzipSize)
}
func (h *handler) getUploadSize(field *int64) int64 {
if field != nil {
return *field
}
return 0
}
func createLogFields(upload codeinteltypes.Upload) []otlog.Field {
fields := []otlog.Field{
otlog.Int("uploadID", upload.ID),
otlog.Int("repositoryID", upload.RepositoryID),
otlog.String("commit", upload.Commit),
otlog.String("root", upload.Root),
otlog.String("indexer", upload.Indexer),
otlog.Int("queueDuration", int(time.Since(upload.UploadedAt))),
}
if upload.UploadSize != nil {
fields = append(fields, otlog.Int64("uploadSize", *upload.UploadSize))
}
return fields
}

View File

@ -1,4 +1,4 @@
package uploads
package background
import (
"context"
@ -11,15 +11,15 @@ import (
"github.com/sourcegraph/sourcegraph/internal/observation"
)
type expirationMetrics struct {
type ExpirationMetrics struct {
// Data retention metrics
numRepositoriesScanned prometheus.Counter
numUploadsExpired prometheus.Counter
numUploadsScanned prometheus.Counter
numCommitsScanned prometheus.Counter
NumRepositoriesScanned prometheus.Counter
NumUploadsExpired prometheus.Counter
NumUploadsScanned prometheus.Counter
NumCommitsScanned prometheus.Counter
}
func newExpirationMetrics(observationContext *observation.Context) *expirationMetrics {
func NewExpirationMetrics(observationContext *observation.Context) *ExpirationMetrics {
counter := func(name, help string) prometheus.Counter {
counter := prometheus.NewCounter(prometheus.CounterOpts{
Name: name,
@ -47,20 +47,20 @@ func newExpirationMetrics(observationContext *observation.Context) *expirationMe
"The number of codeintel upload records marked as expired.",
)
return &expirationMetrics{
numRepositoriesScanned: numRepositoriesScanned,
numUploadsScanned: numUploadsScanned,
numCommitsScanned: numCommitsScanned,
numUploadsExpired: numUploadsExpired,
return &ExpirationMetrics{
NumRepositoriesScanned: numRepositoriesScanned,
NumUploadsScanned: numUploadsScanned,
NumCommitsScanned: numCommitsScanned,
NumUploadsExpired: numUploadsExpired,
}
}
func (s *Service) MetricReporters(observationContext *observation.Context) {
func (b backgroundJob) SetMetricReporters(observationContext *observation.Context) {
observationContext.Registerer.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "src_codeintel_commit_graph_total",
Help: "Total number of repositories with stale commit graphs.",
}, func() float64 {
dirtyRepositories, err := s.store.GetDirtyRepositories(context.Background())
dirtyRepositories, err := b.uploadSvc.GetDirtyRepositories(context.Background())
if err != nil {
observationContext.Logger.Error("Failed to determine number of dirty repositories", log.Error(err))
}
@ -72,7 +72,7 @@ func (s *Service) MetricReporters(observationContext *observation.Context) {
Name: "src_codeintel_commit_graph_queued_duration_seconds_total",
Help: "The maximum amount of time a repository has had a stale commit graph.",
}, func() float64 {
age, err := s.store.GetRepositoriesMaxStaleAge(context.Background())
age, err := b.uploadSvc.GetRepositoriesMaxStaleAge(context.Background())
if err != nil {
observationContext.Logger.Error("Failed to determine stale commit graph age", log.Error(err))
return 0

View File

@ -1,4 +1,4 @@
package uploads
package background
import (
"github.com/prometheus/client_golang/prometheus"

View File

@ -1,4 +1,4 @@
package uploads
package background
import (
"github.com/prometheus/client_golang/prometheus"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,59 @@
package background
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/sourcegraph/internal/honey"
"github.com/sourcegraph/sourcegraph/internal/metrics"
"github.com/sourcegraph/sourcegraph/internal/observation"
)
type operations struct {
updateUploadsVisibleToCommits *observation.Operation
// Worker metrics
uploadProcessor *observation.Operation
uploadSizeGuage prometheus.Gauge
}
func newOperations(observationContext *observation.Context) *operations {
m := metrics.NewREDMetrics(
observationContext.Registerer,
"codeintel_uploads_background",
metrics.WithLabels("op"),
metrics.WithCountHelp("Total number of method invocations."),
)
op := func(name string) *observation.Operation {
return observationContext.Operation(observation.Op{
Name: fmt.Sprintf("codeintel.uploads.background.%s", name),
MetricLabelValues: []string{name},
Metrics: m,
})
}
honeyObservationContext := *observationContext
honeyObservationContext.HoneyDataset = &honey.Dataset{Name: "codeintel-worker"}
uploadProcessor := honeyObservationContext.Operation(observation.Op{
Name: "codeintel.uploadHandler",
ErrorFilter: func(err error) observation.ErrorFilterBehaviour {
return observation.EmitForTraces | observation.EmitForHoney
},
})
uploadSizeGuage := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "src_codeintel_upload_processor_upload_size",
Help: "The combined size of uploads being processed at this instant by this worker.",
})
observationContext.Registerer.MustRegister(uploadSizeGuage)
return &operations{
updateUploadsVisibleToCommits: op("UpdateUploadsVisibleToCommits"),
// Worker metrics
uploadProcessor: uploadProcessor,
uploadSizeGuage: uploadSizeGuage,
}
}

File diff suppressed because it is too large Load Diff

View File

@ -3,9 +3,6 @@ package uploads
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/sourcegraph/internal/honey"
"github.com/sourcegraph/sourcegraph/internal/metrics"
"github.com/sourcegraph/sourcegraph/internal/observation"
)
@ -14,6 +11,9 @@ type operations struct {
// Commits
getCommitsVisibleToUpload *observation.Operation
getCommitGraphMetadata *observation.Operation
getStaleSourcedCommits *observation.Operation
updateSourcedCommits *observation.Operation
deleteSourcedCommits *observation.Operation
// Repositories
getRepoName *observation.Operation
@ -21,6 +21,8 @@ type operations struct {
getDirtyRepositories *observation.Operation
getRecentUploadsSummary *observation.Operation
getLastUploadRetentionScanForRepository *observation.Operation
setRepositoriesForRetentionScan *observation.Operation
getRepositoriesMaxStaleAge *observation.Operation
// Uploads
getUploads *observation.Operation
@ -31,23 +33,26 @@ type operations struct {
updateUploadsVisibleToCommits *observation.Operation
deleteUploadByID *observation.Operation
inferClosestUploads *observation.Operation
deleteUploadsWithoutRepository *observation.Operation
deleteUploadsStuckUploading *observation.Operation
softDeleteExpiredUploads *observation.Operation
hardDeleteUploadsByIDs *observation.Operation
deleteLsifDataByUploadIds *observation.Operation
// Dumps
getDumpsWithDefinitionsForMonikers *observation.Operation
getDumpsByIDs *observation.Operation
// References
referencesForUpload *observation.Operation
referencesForUpload *observation.Operation
backfillReferenceCountBatch *observation.Operation
// Audit Logs
getAuditLogsForUpload *observation.Operation
deleteOldAuditLogs *observation.Operation
// Tags
getListTags *observation.Operation
// Worker metrics
uploadProcessor *observation.Operation
uploadSizeGuage prometheus.Gauge
}
func newOperations(observationContext *observation.Context) *operations {
@ -66,25 +71,13 @@ func newOperations(observationContext *observation.Context) *operations {
})
}
honeyObservationContext := *observationContext
honeyObservationContext.HoneyDataset = &honey.Dataset{Name: "codeintel-worker"}
uploadProcessor := honeyObservationContext.Operation(observation.Op{
Name: "codeintel.uploadHandler",
ErrorFilter: func(err error) observation.ErrorFilterBehaviour {
return observation.EmitForTraces | observation.EmitForHoney
},
})
uploadSizeGuage := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "src_codeintel_upload_processor_upload_size",
Help: "The combined size of uploads being processed at this instant by this worker.",
})
observationContext.Registerer.MustRegister(uploadSizeGuage)
return &operations{
// Commits
getCommitsVisibleToUpload: op("GetCommitsVisibleToUpload"),
getCommitGraphMetadata: op("GetCommitGraphMetadata"),
getStaleSourcedCommits: op("GetStaleSourcedCommits"),
updateSourcedCommits: op("UpdateSourcedCommits"),
deleteSourcedCommits: op("DeleteSourcedCommits"),
// Repositories
getRepoName: op("GetRepoName"),
@ -92,6 +85,8 @@ func newOperations(observationContext *observation.Context) *operations {
getDirtyRepositories: op("GetDirtyRepositories"),
getRecentUploadsSummary: op("GetRecentUploadsSummary"),
getLastUploadRetentionScanForRepository: op("GetLastUploadRetentionScanForRepository"),
setRepositoriesForRetentionScan: op("SetRepositoriesForRetentionScan"),
getRepositoriesMaxStaleAge: op("GetRepositoriesMaxStaleAge"),
// Uploads
getUploads: op("GetUploads"),
@ -102,22 +97,25 @@ func newOperations(observationContext *observation.Context) *operations {
updateUploadsVisibleToCommits: op("UpdateUploadsVisibleToCommits"),
deleteUploadByID: op("DeleteUploadByID"),
inferClosestUploads: op("InferClosestUploads"),
deleteUploadsWithoutRepository: op("DeleteUploadsWithoutRepository"),
deleteUploadsStuckUploading: op("DeleteUploadsStuckUploading"),
softDeleteExpiredUploads: op("SoftDeleteExpiredUploads"),
hardDeleteUploadsByIDs: op("HardDeleteUploadsByIDs"),
deleteLsifDataByUploadIds: op("DeleteLsifDataByUploadIds"),
// Dumps
getDumpsWithDefinitionsForMonikers: op("GetDumpsWithDefinitionsForMonikers"),
getDumpsByIDs: op("GetDumpsByIDs"),
// References
referencesForUpload: op("ReferencesForUpload"),
referencesForUpload: op("ReferencesForUpload"),
backfillReferenceCountBatch: op("BackfillReferenceCountBatch"),
// Audit Logs
getAuditLogsForUpload: op("GetAuditLogsForUpload"),
deleteOldAuditLogs: op("DeleteOldAuditLogs"),
// Tags
getListTags: op("GetListTags"),
// Worker metrics
uploadProcessor: uploadProcessor,
uploadSizeGuage: uploadSizeGuage,
}
}

View File

@ -10,14 +10,17 @@ import (
logger "github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/api"
policiesEnterprise "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
policiesshared "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/shared/types"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/background"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/lsifstore"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/store"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/shared"
"github.com/sourcegraph/sourcegraph/internal/gitserver"
gitserverOptions "github.com/sourcegraph/sourcegraph/internal/gitserver"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
"github.com/sourcegraph/sourcegraph/lib/codeintel/precise"
@ -25,21 +28,18 @@ import (
)
type Service struct {
store store.Store
repoStore RepoStore
workerutilStore dbworkerstore.Store
lsifstore lsifstore.LsifStore
gitserverClient GitserverClient
policySvc PolicyService
expirationMetrics *expirationMetrics
resetterMetrics *resetterMetrics
janitorMetrics *janitorMetrics
workerMetrics workerutil.WorkerObservability
policyMatcher PolicyMatcher
locker Locker
logger logger.Logger
operations *operations
clock glock.Clock
store store.Store
repoStore RepoStore
workerutilStore dbworkerstore.Store
lsifstore lsifstore.LsifStore
gitserverClient GitserverClient
policySvc PolicyService
policyMatcher PolicyMatcher
locker Locker
backgroundJob background.BackgroundJob
logger logger.Logger
operations *operations
clock glock.Clock
}
func newService(
@ -50,6 +50,7 @@ func newService(
policySvc PolicyService,
policyMatcher PolicyMatcher,
locker Locker,
backgroundJob background.BackgroundJob,
observationContext *observation.Context,
) *Service {
workerutilStore := store.WorkerutilStore(observationContext)
@ -58,24 +59,29 @@ func newService(
dbworker.InitPrometheusMetric(observationContext, workerutilStore, "codeintel", "upload", nil)
return &Service{
store: store,
repoStore: repoStore,
workerutilStore: workerutilStore,
lsifstore: lsifstore,
gitserverClient: gsc,
policySvc: policySvc,
expirationMetrics: newExpirationMetrics(observationContext),
resetterMetrics: newResetterMetrics(observationContext),
janitorMetrics: newJanitorMetrics(observationContext),
workerMetrics: workerutil.NewMetrics(observationContext, "codeintel_upload_processor", workerutil.WithSampler(func(job workerutil.Record) bool { return true })),
policyMatcher: policyMatcher,
locker: locker,
logger: observationContext.Logger,
operations: newOperations(observationContext),
clock: glock.NewRealClock(),
store: store,
repoStore: repoStore,
workerutilStore: workerutilStore,
lsifstore: lsifstore,
gitserverClient: gsc,
policySvc: policySvc,
backgroundJob: backgroundJob,
policyMatcher: policyMatcher,
locker: locker,
logger: observationContext.Logger,
operations: newOperations(observationContext),
clock: glock.NewRealClock(),
}
}
func GetBackgroundJob(s *Service) background.BackgroundJob {
return s.backgroundJob
}
func (s *Service) GetWorkerutilStore() dbworkerstore.Store {
return s.workerutilStore
}
func (s *Service) GetCommitsVisibleToUpload(ctx context.Context, uploadID, limit int, token *string) (_ []string, nextToken *string, err error) {
ctx, _, endObservation := s.operations.getCommitsVisibleToUpload.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})
@ -175,6 +181,64 @@ func (s *Service) DeleteUploads(ctx context.Context, opts shared.DeleteUploadsOp
return s.store.DeleteUploads(ctx, opts)
}
func (s *Service) SetRepositoriesForRetentionScan(ctx context.Context, processDelay time.Duration, limit int) (_ []int, err error) {
ctx, _, endObservation := s.operations.setRepositoriesForRetentionScan.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.Int("processDelayInMs", int(processDelay.Milliseconds())),
log.Int("limit", limit),
},
})
defer endObservation(1, observation.Args{})
return s.store.SetRepositoriesForRetentionScan(ctx, processDelay, limit)
}
func (s *Service) GetRepositoriesMaxStaleAge(ctx context.Context) (_ time.Duration, err error) {
ctx, _, endObservation := s.operations.getRepositoriesMaxStaleAge.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})
return s.store.GetRepositoriesMaxStaleAge(ctx)
}
// buildCommitMap will iterate the complete set of configuration policies that apply to a particular
// repository and build a map from commits to the policies that apply to them.
func (s *Service) BuildCommitMap(ctx context.Context, repositoryID int, cfg background.ExpirerConfig, now time.Time) (map[string][]policiesEnterprise.PolicyMatch, error) {
var (
offset int
policies []types.ConfigurationPolicy
)
for {
// Retrieve the complete set of configuration policies that affect data retention for this repository
policyBatch, totalCount, err := s.policySvc.GetConfigurationPolicies(ctx, policiesshared.GetConfigurationPoliciesOptions{
RepositoryID: repositoryID,
ForDataRetention: true,
Limit: cfg.PolicyBatchSize,
Offset: offset,
})
if err != nil {
return nil, errors.Wrap(err, "policySvc.GetConfigurationPolicies")
}
offset += len(policyBatch)
policies = append(policies, policyBatch...)
if len(policyBatch) == 0 || offset >= totalCount {
break
}
}
// Get the set of commits within this repository that match a data retention policy
return s.policyMatcher.CommitsDescribedByPolicy(ctx, repositoryID, policies, now)
}
func (s *Service) BackfillReferenceCountBatch(ctx context.Context, batchSize int) error {
ctx, _, endObservation := s.operations.backfillReferenceCountBatch.With(ctx, nil, observation.Args{LogFields: []log.Field{log.Int("batchSize", batchSize)}})
defer endObservation(1, observation.Args{})
return s.store.BackfillReferenceCountBatch(ctx, batchSize)
}
// numAncestors is the number of ancestors to query from gitserver when trying to find the closest
// ancestor we have data for. Setting this value too low (relative to a repository's commit rate)
// will cause requests for an unknown commit return too few results; setting this value too high
@ -320,3 +384,247 @@ func (s *Service) GetListTags(ctx context.Context, repo api.RepoName, commitObjs
return s.gitserverClient.ListTags(ctx, repo, commitObjs...)
}
func (s *Service) DeleteOldAuditLogs(ctx context.Context, maxAge time.Duration, now time.Time) (count int, err error) {
ctx, _, endObservation := s.operations.deleteOldAuditLogs.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})
return s.store.DeleteOldAuditLogs(ctx, maxAge, now)
}
func (s *Service) HardDeleteUploadsByIDs(ctx context.Context, ids ...int) error {
ctx, _, endObservation := s.operations.hardDeleteUploadsByIDs.With(ctx, nil, observation.Args{})
defer endObservation(1, observation.Args{})
return s.store.HardDeleteUploadsByIDs(ctx, ids...)
}
func (s *Service) DeleteLsifDataByUploadIds(ctx context.Context, bundleIDs ...int) (err error) {
ctx, _, endObservation := s.operations.deleteLsifDataByUploadIds.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("bundleIDs", len(bundleIDs))},
})
defer endObservation(1, observation.Args{})
return s.lsifstore.DeleteLsifDataByUploadIds(ctx, bundleIDs...)
}
func (s *Service) GetStaleSourcedCommits(ctx context.Context, minimumTimeSinceLastCheck time.Duration, limit int, now time.Time) (_ []shared.SourcedCommits, err error) {
ctx, _, endObservation := s.operations.getStaleSourcedCommits.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.String("minimumTimeSinceLastCheck", minimumTimeSinceLastCheck.String()), log.Int("limit", limit), log.String("now", now.String())},
})
defer endObservation(1, observation.Args{})
return s.store.GetStaleSourcedCommits(ctx, minimumTimeSinceLastCheck, limit, now)
}
func (s *Service) DeleteSourcedCommits(ctx context.Context, repositoryID int, commit string, maximumCommitLag time.Duration, now time.Time) (uploadsUpdated int, uploadsDeleted int, err error) {
ctx, _, endObservation := s.operations.deleteSourcedCommits.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID), log.String("commit", commit), log.String("maximumCommitLag", maximumCommitLag.String()), log.String("now", now.String())},
})
defer endObservation(1, observation.Args{})
return s.store.DeleteSourcedCommits(ctx, repositoryID, commit, maximumCommitLag, now)
}
func (s *Service) UpdateSourcedCommits(ctx context.Context, repositoryID int, commit string, now time.Time) (uploadsUpdated int, err error) {
ctx, _, endObservation := s.operations.updateSourcedCommits.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.Int("repositoryID", repositoryID), log.String("commit", commit), log.String("now", now.String())},
})
defer endObservation(1, observation.Args{})
return s.store.UpdateSourcedCommits(ctx, repositoryID, commit, now)
}
func (s *Service) DeleteUploadsStuckUploading(ctx context.Context, uploadedBefore time.Time) (_ int, err error) {
ctx, _, endObservation := s.operations.deleteUploadsStuckUploading.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.String("uploadedBefore", uploadedBefore.String())},
})
defer endObservation(1, observation.Args{})
return s.store.DeleteUploadsStuckUploading(ctx, uploadedBefore)
}
func (s *Service) SoftDeleteExpiredUploads(ctx context.Context) (int, error) {
ctx, _, endObservation := s.operations.softDeleteExpiredUploads.With(ctx, nil, observation.Args{})
defer endObservation(1, observation.Args{})
return s.store.SoftDeleteExpiredUploads(ctx)
}
// BackfillCommittedAtBatch calculates the committed_at value for a batch of upload records that do not have
// this value set. This method is used to backfill old upload records prior to this value being reliably set
// during processing.
func (s *Service) BackfillCommittedAtBatch(ctx context.Context, batchSize int) (err error) {
tx, err := s.store.Transact(ctx)
defer func() {
err = tx.Done(err)
}()
batch, err := tx.SourcedCommitsWithoutCommittedAt(ctx, batchSize)
if err != nil {
return errors.Wrap(err, "store.SourcedCommitsWithoutCommittedAt")
}
for _, sourcedCommits := range batch {
for _, commit := range sourcedCommits.Commits {
commitDateString, err := s.getCommitDate(ctx, sourcedCommits.RepositoryID, commit)
if err != nil {
return err
}
// Update commit date of all uploads attached to this this repository and commit
if err := tx.UpdateCommittedAt(ctx, sourcedCommits.RepositoryID, commit, commitDateString); err != nil {
return errors.Wrap(err, "store.UpdateCommittedAt")
}
}
// Mark repository as dirty so the commit graph is recalculated with fresh data
if err := tx.SetRepositoryAsDirty(ctx, sourcedCommits.RepositoryID); err != nil {
return errors.Wrap(err, "store.SetRepositoryAsDirty")
}
}
return nil
}
func (s *Service) DeleteUploadsWithoutRepository(ctx context.Context, now time.Time) (_ map[int]int, err error) {
ctx, _, endObservation := s.operations.deleteUploadsWithoutRepository.With(ctx, &err, observation.Args{
LogFields: []log.Field{log.String("now", now.Format(time.RFC3339))},
})
defer endObservation(1, observation.Args{})
return s.store.DeleteUploadsWithoutRepository(ctx, now)
}
func (s *Service) getCommitDate(ctx context.Context, repositoryID int, commit string) (string, error) {
_, commitDate, revisionExists, err := s.gitserverClient.CommitDate(ctx, repositoryID, commit)
if err != nil {
return "", errors.Wrap(err, "gitserver.CommitDate")
}
var commitDateString string
if revisionExists {
commitDateString = commitDate.Format(time.RFC3339)
} else {
// Set a value here that we'll filter out on the query side so that we don't
// reprocess the same failing batch infinitely. We could alternatively soft
// delete the record, but it would be better to keep record deletion behavior
// together in the same place (so we have unified metrics on that event).
commitDateString = "-infinity"
}
return commitDateString, nil
}
// Handle periodically re-calculates the commit and upload visibility graph for repositories
// that are marked as dirty by the worker process. This is done out-of-band from the rest of
// the upload processing as it is likely that we are processing multiple uploads concurrently
// for the same repository and should not repeat the work since the last calculation performed
// will always be the one we want.
func (s *Service) UpdateAllDirtyCommitGraphs(ctx context.Context, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) (err error) {
repositoryIDs, err := s.GetDirtyRepositories(ctx)
if err != nil {
return errors.Wrap(err, "uploadSvc.DirtyRepositories")
}
var updateErr error
for repositoryID, dirtyFlag := range repositoryIDs {
if err := s.lockAndUpdateUploadsVisibleToCommits(ctx, repositoryID, dirtyFlag, maxAgeForNonStaleBranches, maxAgeForNonStaleTags); err != nil {
if updateErr == nil {
updateErr = err
} else {
updateErr = errors.Append(updateErr, err)
}
}
}
return updateErr
}
// lockAndUpdateUploadsVisibleToCommits will call UpdateUploadsVisibleToCommits while holding an advisory lock to give exclusive access to the
// update procedure for this repository. If the lock is already held, this method will simply do nothing.
func (s *Service) lockAndUpdateUploadsVisibleToCommits(ctx context.Context, repositoryID, dirtyToken int, maxAgeForNonStaleBranches time.Duration, maxAgeForNonStaleTags time.Duration) (err error) {
ctx, trace, endObservation := s.operations.updateUploadsVisibleToCommits.With(ctx, &err, observation.Args{
LogFields: []log.Field{
log.Int("repositoryID", repositoryID),
log.Int("dirtyToken", dirtyToken),
},
})
defer endObservation(1, observation.Args{})
ok, unlock, err := s.locker.Lock(ctx, int32(repositoryID), false)
if err != nil || !ok {
return errors.Wrap(err, "locker.Lock")
}
defer func() {
err = unlock(err)
}()
// The following process pulls the commit graph for the given repository from gitserver, pulls the set of LSIF
// upload objects for the given repository from Postgres, and correlates them into a visibility
// graph. This graph is then upserted back into Postgres for use by find closest dumps queries.
//
// The user should supply a dirty token that is associated with the given repository so that
// the repository can be unmarked as long as the repository is not marked as dirty again before
// the update completes.
// Construct a view of the git graph that we will later decorate with upload information.
commitGraph, err := s.getCommitGraph(ctx, repositoryID)
if err != nil {
return err
}
trace.Log(log.Int("numCommitGraphKeys", len(commitGraph.Order())))
refDescriptions, err := s.gitserverClient.RefDescriptions(ctx, repositoryID)
if err != nil {
return errors.Wrap(err, "gitserver.RefDescriptions")
}
trace.Log(log.Int("numRefDescriptions", len(refDescriptions)))
// Decorate the commit graph with the set of processed uploads are visible from each commit,
// then bulk update the denormalized view in Postgres. We call this with an empty graph as well
// so that we end up clearing the stale data and bulk inserting nothing.
if err := s.store.UpdateUploadsVisibleToCommits(ctx, repositoryID, commitGraph, refDescriptions, maxAgeForNonStaleBranches, maxAgeForNonStaleTags, dirtyToken, time.Time{}); err != nil {
return errors.Wrap(err, "uploadSvc.UpdateUploadsVisibleToCommits")
}
return nil
}
// getCommitGraph builds a partial commit graph that includes the most recent commits on each branch
// extending back as as the date of the oldest commit for which we have a processed upload for this
// repository.
//
// This optimization is necessary as decorating the commit graph is an operation that scales with
// the size of both the git graph and the number of uploads (multiplicatively). For repositories with
// a very large number of commits or distinct roots (most monorepos) this is a necessary optimization.
//
// The number of commits pulled back here should not grow over time unless the repo is growing at an
// accelerating rate, as we routinely expire old information for active repositories in a janitor
// process.
func (s *Service) getCommitGraph(ctx context.Context, repositoryID int) (*gitdomain.CommitGraph, error) {
commitDate, ok, err := s.store.GetOldestCommitDate(ctx, repositoryID)
if err != nil {
return nil, err
}
if !ok {
// No uploads exist for this repository
return gitdomain.ParseCommitGraph(nil), nil
}
// The --since flag for git log is exclusive, but we want to include the commit where the
// oldest dump is defined. This flag only has second resolution, so we shouldn't be pulling
// back any more data than we wanted.
commitDate = commitDate.Add(-time.Second)
commitGraph, err := s.gitserverClient.CommitGraph(ctx, repositoryID, gitserverOptions.CommitGraphOptions{
AllRefs: true,
Since: &commitDate,
})
if err != nil {
return nil, errors.Wrap(err, "gitserver.CommitGraph")
}
return commitGraph, nil
}

View File

@ -16,7 +16,17 @@ func TestBackfillCommittedAtBatch(t *testing.T) {
ctx := context.Background()
store := NewMockStore()
gitserverClient := NewMockGitserverClient()
svc := newService(store, nil, nil, gitserverClient, nil, nil, nil, &observation.TestContext)
svc := newService(
store,
nil, // repoStore
nil, // lsifstore
gitserverClient,
nil, // policySvc
nil, // policyMatcher
nil, // locker
nil, // backgroundJobs
&observation.TestContext,
)
// Return self for txn
store.TransactFunc.SetDefaultReturn(store, nil)
@ -54,7 +64,7 @@ func TestBackfillCommittedAtBatch(t *testing.T) {
}
for i := 0; i < n/pageSize; i++ {
if err := svc.backfillCommittedAtBatch(ctx, pageSize); err != nil {
if err := svc.BackfillCommittedAtBatch(ctx, pageSize); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
@ -88,7 +98,17 @@ func TestBackfillCommittedAtBatchUnknownCommits(t *testing.T) {
ctx := context.Background()
store := NewMockStore()
gitserverClient := NewMockGitserverClient()
svc := newService(store, nil, nil, gitserverClient, nil, nil, nil, &observation.TestContext)
svc := newService(
store,
nil, // repoStore
nil, // lsifstore
gitserverClient,
nil, // policySvc
nil, // policyMatcher
nil, // locker
nil, // backgroundJobs
&observation.TestContext,
)
// Return self for txn
store.TransactFunc.SetDefaultReturn(store, nil)
@ -131,7 +151,7 @@ func TestBackfillCommittedAtBatchUnknownCommits(t *testing.T) {
}
for i := 0; i < n/pageSize; i++ {
if err := svc.backfillCommittedAtBatch(ctx, pageSize); err != nil {
if err := svc.BackfillCommittedAtBatch(ctx, pageSize); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}

View File

@ -9,55 +9,25 @@ import (
policiesEnterprise "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
policiesshared "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/shared/types"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/background"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/shared"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (s *Service) NewUploadExpirer(
interval time.Duration,
repositoryProcessDelay time.Duration,
repositoryBatchSize int,
uploadProcessDelay time.Duration,
uploadBatchSize int,
commitBatchSize int,
policyBatchSize int,
) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), interval, goroutine.HandlerFunc(func(ctx context.Context) error {
return s.handleExpiredUploadsBatch(ctx, expirerConfig{
repositoryProcessDelay: repositoryProcessDelay,
repositoryBatchSize: repositoryBatchSize,
uploadProcessDelay: uploadProcessDelay,
uploadBatchSize: uploadBatchSize,
commitBatchSize: commitBatchSize,
policyBatchSize: policyBatchSize,
})
}))
}
type expirerConfig struct {
repositoryProcessDelay time.Duration
repositoryBatchSize int
uploadProcessDelay time.Duration
uploadBatchSize int
commitBatchSize int
policyBatchSize int
}
// handleExpiredUploadsBatch compares the age of upload records against the age of uploads
// protected by global and repository specific data retention policies.
//
// Uploads that are older than the protected retention age are marked as expired. Expired records with
// no dependents will be removed by the expiredUploadDeleter.
func (e *Service) handleExpiredUploadsBatch(ctx context.Context, cfg expirerConfig) (err error) {
func (s *Service) HandleExpiredUploadsBatch(ctx context.Context, metrics *background.ExpirationMetrics, cfg background.ExpirerConfig) (err error) {
// Get the batch of repositories that we'll handle in this invocation of the periodic goroutine. This
// set should contain repositories that have yet to be updated, or that have been updated least recently.
// This allows us to update every repository reliably, even if it takes a long time to process through
// the backlog. Note that this set of repositories require a fresh commit graph, so we're not trying to
// process records that have been uploaded but the commits from which they are visible have yet to be
// determined (and appearing as if they are visible to no commit).
repositories, err := e.store.SetRepositoriesForRetentionScan(ctx, cfg.repositoryProcessDelay, cfg.repositoryBatchSize)
repositories, err := s.SetRepositoriesForRetentionScan(ctx, cfg.RepositoryProcessDelay, cfg.RepositoryBatchSize)
if err != nil {
return errors.Wrap(err, "uploadSvc.SelectRepositoriesForRetentionScan")
}
@ -69,7 +39,7 @@ func (e *Service) handleExpiredUploadsBatch(ctx context.Context, cfg expirerConf
now := timeutil.Now()
for _, repositoryID := range repositories {
if repositoryErr := e.handleRepository(ctx, repositoryID, cfg, now); repositoryErr != nil {
if repositoryErr := s.handleRepository(ctx, repositoryID, cfg, now, metrics); repositoryErr != nil {
if err == nil {
err = repositoryErr
} else {
@ -81,14 +51,14 @@ func (e *Service) handleExpiredUploadsBatch(ctx context.Context, cfg expirerConf
return err
}
func (e *Service) handleRepository(ctx context.Context, repositoryID int, cfg expirerConfig, now time.Time) error {
e.expirationMetrics.numRepositoriesScanned.Inc()
func (s *Service) handleRepository(ctx context.Context, repositoryID int, cfg background.ExpirerConfig, now time.Time, metrics *background.ExpirationMetrics) error {
metrics.NumRepositoriesScanned.Inc()
// Build a map from commits to the set of policies that affect them. Note that this map should
// never be empty as we have multiple protected data retention policies on the global scope so
// that all data visible from a tag or branch tip is protected for at least a short amount of
// time after upload.
commitMap, err := e.buildCommitMap(ctx, repositoryID, cfg, now)
commitMap, err := s.buildCommitMap(ctx, repositoryID, cfg, now)
if err != nil {
return err
}
@ -101,7 +71,7 @@ func (e *Service) handleRepository(ctx context.Context, repositoryID int, cfg ex
// upload process delay is shorter than the time it takes to process one batch of uploads. This
// is obviously a mis-configuration, but one we can make a bit less catastrophic by not updating
// this value in the loop.
lastRetentionScanBefore := now.Add(-cfg.uploadProcessDelay)
lastRetentionScanBefore := now.Add(-cfg.UploadProcessDelay)
for {
// Each record pulled back by this query will either have its expired flag or its last
@ -114,12 +84,12 @@ func (e *Service) handleRepository(ctx context.Context, repositoryID int, cfg ex
// out new uploads that would happen to be visible to no commits since they were never
// installed into the commit graph.
uploads, _, err := e.GetUploads(ctx, shared.GetUploadsOptions{
uploads, _, err := s.GetUploads(ctx, shared.GetUploadsOptions{
State: "completed",
RepositoryID: repositoryID,
AllowExpired: false,
OldestFirst: true,
Limit: cfg.uploadBatchSize,
Limit: cfg.UploadBatchSize,
LastRetentionScanBefore: &lastRetentionScanBefore,
InCommitGraph: true,
})
@ -127,7 +97,7 @@ func (e *Service) handleRepository(ctx context.Context, repositoryID int, cfg ex
return err
}
if err := e.handleUploads(ctx, commitMap, uploads, cfg, now); err != nil {
if err := s.handleUploads(ctx, commitMap, uploads, cfg, metrics, now); err != nil {
// Note that we collect errors in the lop of the handleUploads call, but we will still terminate
// this loop on any non-nil error from that function. This is required to prevent us from pullling
// back the same set of failing records from the database in a tight loop.
@ -138,7 +108,7 @@ func (e *Service) handleRepository(ctx context.Context, repositoryID int, cfg ex
// buildCommitMap will iterate the complete set of configuration policies that apply to a particular
// repository and build a map from commits to the policies that apply to them.
func (e *Service) buildCommitMap(ctx context.Context, repositoryID int, cfg expirerConfig, now time.Time) (map[string][]policiesEnterprise.PolicyMatch, error) {
func (s *Service) buildCommitMap(ctx context.Context, repositoryID int, cfg background.ExpirerConfig, now time.Time) (map[string][]policiesEnterprise.PolicyMatch, error) {
var (
offset int
policies []types.ConfigurationPolicy
@ -146,10 +116,10 @@ func (e *Service) buildCommitMap(ctx context.Context, repositoryID int, cfg expi
for {
// Retrieve the complete set of configuration policies that affect data retention for this repository
policyBatch, totalCount, err := e.policySvc.GetConfigurationPolicies(ctx, policiesshared.GetConfigurationPoliciesOptions{
policyBatch, totalCount, err := s.policySvc.GetConfigurationPolicies(ctx, policiesshared.GetConfigurationPoliciesOptions{
RepositoryID: repositoryID,
ForDataRetention: true,
Limit: cfg.policyBatchSize,
Limit: cfg.PolicyBatchSize,
Offset: offset,
})
if err != nil {
@ -165,14 +135,15 @@ func (e *Service) buildCommitMap(ctx context.Context, repositoryID int, cfg expi
}
// Get the set of commits within this repository that match a data retention policy
return e.policyMatcher.CommitsDescribedByPolicy(ctx, repositoryID, policies, now)
return s.policyMatcher.CommitsDescribedByPolicy(ctx, repositoryID, policies, now)
}
func (e *Service) handleUploads(
func (s *Service) handleUploads(
ctx context.Context,
commitMap map[string][]policiesEnterprise.PolicyMatch,
uploads []types.Upload,
cfg expirerConfig,
cfg background.ExpirerConfig,
metrics *background.ExpirationMetrics,
now time.Time,
) (err error) {
// Categorize each upload as protected or expired
@ -182,7 +153,7 @@ func (e *Service) handleUploads(
)
for _, upload := range uploads {
protected, checkErr := e.isUploadProtectedByPolicy(ctx, commitMap, upload, cfg, now)
protected, checkErr := s.isUploadProtectedByPolicy(ctx, commitMap, upload, cfg, metrics, now)
if checkErr != nil {
if err == nil {
err = checkErr
@ -207,7 +178,7 @@ func (e *Service) handleUploads(
// records with the given expired identifiers so that the expiredUploadDeleter process can remove then once
// they are no longer referenced.
if updateErr := e.store.UpdateUploadRetention(ctx, protectedUploadIDs, expiredUploadIDs); updateErr != nil {
if updateErr := s.store.UpdateUploadRetention(ctx, protectedUploadIDs, expiredUploadIDs); updateErr != nil {
if updateErr := errors.Wrap(err, "uploadSvc.UpdateUploadRetention"); err == nil {
err = updateErr
} else {
@ -216,21 +187,22 @@ func (e *Service) handleUploads(
}
if count := len(expiredUploadIDs); count > 0 {
e.logger.Info("Expiring codeintel uploads", log.Int("count", count))
e.expirationMetrics.numUploadsExpired.Add(float64(count))
s.logger.Info("Expiring codeintel uploads", log.Int("count", count))
metrics.NumUploadsExpired.Add(float64(count))
}
return err
}
func (e *Service) isUploadProtectedByPolicy(
func (s *Service) isUploadProtectedByPolicy(
ctx context.Context,
commitMap map[string][]policiesEnterprise.PolicyMatch,
upload types.Upload,
cfg expirerConfig,
cfg background.ExpirerConfig,
metrics *background.ExpirationMetrics,
now time.Time,
) (bool, error) {
e.expirationMetrics.numUploadsScanned.Inc()
metrics.NumUploadsScanned.Inc()
var token *string
@ -245,13 +217,13 @@ func (e *Service) isUploadProtectedByPolicy(
//
// We check the set of commits visible to an upload in batches as in some cases it can be very large; for
// example, a single historic commit providing code intelligence for all descendants.
commits, nextToken, err := e.GetCommitsVisibleToUpload(ctx, upload.ID, cfg.commitBatchSize, token)
commits, nextToken, err := s.GetCommitsVisibleToUpload(ctx, upload.ID, cfg.CommitBatchSize, token)
if err != nil {
return false, errors.Wrap(err, "uploadSvc.CommitsVisibleToUpload")
}
token = nextToken
e.expirationMetrics.numCommitsScanned.Add(float64(len(commits)))
metrics.NumCommitsScanned.Add(float64(len(commits)))
for _, commit := range commits {
if policyMatches, ok := commitMap[commit]; ok {

View File

@ -13,6 +13,7 @@ import (
policiesEnterprise "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/enterprise"
policiesshared "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared"
"github.com/sourcegraph/sourcegraph/internal/codeintel/shared/types"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/background"
uploadsshared "github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/shared"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
@ -25,23 +26,23 @@ func TestUploadExpirer(t *testing.T) {
uploadSvc := setupMockUploadService(now)
policySvc := setupMockPolicyService()
policyMatcher := testUploadExpirerMockPolicyMatcher()
expirationMetrics := background.NewExpirationMetrics(&observation.TestContext)
uploadExpirer := &Service{
store: uploadSvc,
policySvc: policySvc,
policyMatcher: policyMatcher,
expirationMetrics: newExpirationMetrics(&observation.TestContext),
logger: logtest.Scoped(t),
operations: newOperations(&observation.TestContext),
clock: clock,
store: uploadSvc,
policySvc: policySvc,
policyMatcher: policyMatcher,
logger: logtest.Scoped(t),
operations: newOperations(&observation.TestContext),
clock: clock,
}
if err := uploadExpirer.handleExpiredUploadsBatch(context.Background(), expirerConfig{
repositoryProcessDelay: 24 * time.Hour,
repositoryBatchSize: 100,
uploadProcessDelay: 24 * time.Hour,
uploadBatchSize: 100,
commitBatchSize: 100,
if err := uploadExpirer.HandleExpiredUploadsBatch(context.Background(), expirationMetrics, background.ExpirerConfig{
RepositoryProcessDelay: 24 * time.Hour,
RepositoryBatchSize: 100,
UploadProcessDelay: 24 * time.Hour,
UploadBatchSize: 100,
CommitBatchSize: 100,
}); err != nil {
t.Fatalf("unexpected error from handle: %s", err)
}

View File

@ -5,16 +5,12 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"
"github.com/jackc/pgconn"
"github.com/keegancsmith/sqlf"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/api"
codeinteltypes "github.com/sourcegraph/sourcegraph/internal/codeintel/shared/types"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/lsifstore"
@ -24,172 +20,26 @@ import (
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/types"
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
"github.com/sourcegraph/sourcegraph/lib/codeintel/lsif/conversion"
"github.com/sourcegraph/sourcegraph/lib/codeintel/precise"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (s *Service) NewWorker(
uploadStore uploadstore.Store,
workerConcurrency int,
workerBudget int64,
workerPollInterval time.Duration,
maximumRuntimePerJob time.Duration,
) *workerutil.Worker {
rootContext := actor.WithInternalActor(context.Background())
handler := s.WorkerutilHandler(
uploadStore,
workerConcurrency,
workerBudget,
)
return dbworker.NewWorker(rootContext, s.WorkerutilStore(), handler, workerutil.WorkerOptions{
Name: "precise_code_intel_upload_worker",
NumHandlers: workerConcurrency,
Interval: workerPollInterval,
HeartbeatInterval: time.Second,
Metrics: s.workerMetrics,
MaximumRuntimePerJob: maximumRuntimePerJob,
})
}
func (s *Service) WorkerutilStore() dbworkerstore.Store {
return s.workerutilStore
}
func (s *Service) WorkerutilHandler(
uploadStore uploadstore.Store,
numProcessorRoutines int,
budgetMax int64,
) workerutil.Handler {
return &handler{
dbStore: s.store,
repoStore: s.repoStore,
workerStore: s.workerutilStore,
lsifStore: s.lsifstore,
uploadStore: uploadStore,
gitserverClient: s.gitserverClient,
handleOp: s.operations.uploadProcessor,
budgetRemaining: budgetMax,
enableBudget: budgetMax > 0,
uncompressedSizes: make(map[int]uint64, numProcessorRoutines),
uploadSizeGuage: s.operations.uploadSizeGuage,
}
}
type handler struct {
dbStore store.Store
repoStore RepoStore
workerStore dbworkerstore.Store
lsifStore lsifstore.LsifStore
uploadStore uploadstore.Store
gitserverClient GitserverClient
handleOp *observation.Operation
budgetRemaining int64
enableBudget bool
// Map of upload ID to uncompressed size. Uploads are deleted before
// PostHandle, so we store it here.
// Should only contain entries for processing in-progress uploads.
uncompressedSizes map[int]uint64
uploadSizeGuage prometheus.Gauge
}
var (
_ workerutil.Handler = &handler{}
_ workerutil.WithPreDequeue = &handler{}
_ workerutil.WithHooks = &handler{}
)
// errCommitDoesNotExist occurs when gitserver does not recognize the commit attached to the upload.
var errCommitDoesNotExist = errors.Errorf("commit does not exist")
func (h *handler) Handle(ctx context.Context, logger log.Logger, record workerutil.Record) (err error) {
upload, ok := record.(codeinteltypes.Upload)
if !ok {
return errors.Newf("unexpected record type %T", record)
}
var requeued bool
ctx, otLogger, endObservation := h.handleOp.With(ctx, &err, observation.Args{})
defer func() {
endObservation(1, observation.Args{
LogFields: append(
createLogFields(upload),
otlog.Bool("requeued", requeued),
),
})
}()
requeued, err = h.handle(ctx, logger, upload, otLogger)
return err
}
func (h *handler) PreDequeue(ctx context.Context, logger log.Logger) (bool, any, error) {
if !h.enableBudget {
return true, nil, nil
}
budgetRemaining := atomic.LoadInt64(&h.budgetRemaining)
if budgetRemaining <= 0 {
return false, nil, nil
}
return true, []*sqlf.Query{sqlf.Sprintf("(upload_size IS NULL OR upload_size <= %s)", budgetRemaining)}, nil
}
func (h *handler) PreHandle(ctx context.Context, logger log.Logger, record workerutil.Record) {
upload, ok := record.(codeinteltypes.Upload)
if !ok {
return
}
uncompressedSize := h.getUploadSize(upload.UncompressedSize)
h.uploadSizeGuage.Add(float64(uncompressedSize))
gzipSize := h.getUploadSize(upload.UploadSize)
atomic.AddInt64(&h.budgetRemaining, -gzipSize)
}
func (h *handler) PostHandle(ctx context.Context, logger log.Logger, record workerutil.Record) {
upload, ok := record.(codeinteltypes.Upload)
if !ok {
return
}
uncompressedSize := h.getUploadSize(upload.UncompressedSize)
h.uploadSizeGuage.Sub(float64(uncompressedSize))
gzipSize := h.getUploadSize(upload.UploadSize)
atomic.AddInt64(&h.budgetRemaining, +gzipSize)
}
func (h *handler) getUploadSize(field *int64) int64 {
if field != nil {
return *field
}
return 0
}
// handle converts a raw upload into a dump within the given transaction context. Returns true if the
// upload record was requeued and false otherwise.
func (h *handler) handle(ctx context.Context, logger log.Logger, upload codeinteltypes.Upload, trace observation.TraceLogger) (requeued bool, err error) {
repo, err := h.repoStore.Get(ctx, api.RepoID(upload.RepositoryID))
func (s *Service) HandleRawUpload(ctx context.Context, logger log.Logger, upload codeinteltypes.Upload, uploadStore uploadstore.Store, trace observation.TraceLogger) (requeued bool, err error) {
repo, err := s.repoStore.Get(ctx, api.RepoID(upload.RepositoryID))
if err != nil {
return false, errors.Wrap(err, "Repos.Get")
}
if requeued, err := requeueIfCloningOrCommitUnknown(ctx, logger, h.repoStore, h.workerStore, upload, repo); err != nil || requeued {
if requeued, err := requeueIfCloningOrCommitUnknown(ctx, logger, s.repoStore, s.workerutilStore, upload, repo); err != nil || requeued {
return requeued, err
}
// Determine if the upload is for the default Git branch.
isDefaultBranch, err := h.gitserverClient.DefaultBranchContains(ctx, upload.RepositoryID, upload.Commit)
isDefaultBranch, err := s.gitserverClient.DefaultBranchContains(ctx, upload.RepositoryID, upload.Commit)
if err != nil {
return false, errors.Wrap(err, "gitserver.DefaultBranchContains")
}
@ -197,14 +47,14 @@ func (h *handler) handle(ctx context.Context, logger log.Logger, upload codeinte
trace.Log(otlog.Bool("defaultBranch", isDefaultBranch))
getChildren := func(ctx context.Context, dirnames []string) (map[string][]string, error) {
directoryChildren, err := h.gitserverClient.DirectoryChildren(ctx, upload.RepositoryID, upload.Commit, dirnames)
directoryChildren, err := s.gitserverClient.DirectoryChildren(ctx, upload.RepositoryID, upload.Commit, dirnames)
if err != nil {
return nil, errors.Wrap(err, "gitserverClient.DirectoryChildren")
}
return directoryChildren, nil
}
return false, withUploadData(ctx, logger, h.uploadStore, upload.ID, trace, func(r io.Reader) (err error) {
return false, withUploadData(ctx, logger, uploadStore, upload.ID, trace, func(r io.Reader) (err error) {
groupedBundleData, err := conversion.Correlate(ctx, r, upload.Root, getChildren)
if err != nil {
return errors.Wrap(err, "conversion.Correlate")
@ -212,7 +62,7 @@ func (h *handler) handle(ctx context.Context, logger log.Logger, upload codeinte
// Note: this is writing to a different database than the block below, so we need to use a
// different transaction context (managed by the writeData function).
if err := writeData(ctx, h.lsifStore, upload, repo, isDefaultBranch, groupedBundleData, trace); err != nil {
if err := writeData(ctx, s.lsifstore, upload, repo, isDefaultBranch, groupedBundleData, trace); err != nil {
if isUniqueConstraintViolation(err) {
// If this is a unique constraint violation, then we've previously processed this same
// upload record up to this point, but failed to perform the transaction below. We can
@ -229,7 +79,7 @@ func (h *handler) handle(ctx context.Context, logger log.Logger, upload codeinte
// point fails, we want to update the upload record with an error message but do not want to
// alter any other data in the database. Rolling back to this savepoint will allow us to discard
// any other changes but still commit the transaction as a whole.
return inTransaction(ctx, h.dbStore, func(tx store.Store) error {
return inTransaction(ctx, s.store, func(tx store.Store) error {
// Before we mark the upload as complete, we need to delete any existing completed uploads
// that have the same repository_id, commit, root, and indexer values. Otherwise the transaction
// will fail as these values form a unique constraint.
@ -240,7 +90,7 @@ func (h *handler) handle(ctx context.Context, logger log.Logger, upload codeinte
// Find the date of the commit and store that in the upload record. We do this now as we
// will need to find the _oldest_ commit with code intelligence data to efficiently update
// the commit graph for the repository.
_, commitDate, revisionExists, err := h.gitserverClient.CommitDate(ctx, upload.RepositoryID, upload.Commit)
_, commitDate, revisionExists, err := s.gitserverClient.CommitDate(ctx, upload.RepositoryID, upload.Commit)
if err != nil {
return errors.Wrap(err, "gitserverClient.CommitDate")
}
@ -424,19 +274,5 @@ func isUniqueConstraintViolation(err error) bool {
return errors.As(err, &e) && e.Code == "23505"
}
func createLogFields(upload codeinteltypes.Upload) []otlog.Field {
fields := []otlog.Field{
otlog.Int("uploadID", upload.ID),
otlog.Int("repositoryID", upload.RepositoryID),
otlog.String("commit", upload.Commit),
otlog.String("root", upload.Root),
otlog.String("indexer", upload.Indexer),
otlog.Int("queueDuration", int(time.Since(upload.UploadedAt))),
}
if upload.UploadSize != nil {
fields = append(fields, otlog.Int64("uploadSize", *upload.UploadSize))
}
return fields
}
// errCommitDoesNotExist occurs when gitserver does not recognize the commit attached to the upload.
var errCommitDoesNotExist = errors.Errorf("commit does not exist")

View File

@ -61,16 +61,15 @@ func TestHandle(t *testing.T) {
expectedCommitDateStr := expectedCommitDate.Format(time.RFC3339)
gitserverClient.CommitDateFunc.SetDefaultReturn("deadbeef", expectedCommitDate, true, nil)
handler := &handler{
dbStore: mockDBStore,
svc := &Service{
store: mockDBStore,
repoStore: mockRepoStore,
workerStore: mockWorkerStore,
lsifStore: mockLSIFStore,
uploadStore: mockUploadStore,
workerutilStore: mockWorkerStore,
lsifstore: mockLSIFStore,
gitserverClient: gitserverClient,
}
requeued, err := handler.handle(context.Background(), logtest.Scoped(t), upload, observation.TestTraceLogger(logtest.Scoped(t)))
requeued, err := svc.HandleRawUpload(context.Background(), logtest.Scoped(t), upload, mockUploadStore, observation.TestTraceLogger(logtest.Scoped(t)))
if err != nil {
t.Fatalf("unexpected error handling upload: %s", err)
} else if requeued {
@ -194,16 +193,16 @@ func TestHandleError(t *testing.T) {
// Set a different tip commit
mockDBStore.SetRepositoryAsDirtyFunc.SetDefaultReturn(errors.Errorf("uh-oh!"))
handler := &handler{
dbStore: mockDBStore,
svc := &Service{
store: mockDBStore,
repoStore: mockRepoStore,
workerStore: mockWorkerStore,
lsifStore: mockLSIFStore,
uploadStore: mockUploadStore,
workerutilStore: mockWorkerStore,
lsifstore: mockLSIFStore,
gitserverClient: gitserverClient,
// lsifstore: mockLSIFStore,
}
requeued, err := handler.handle(context.Background(), logtest.Scoped(t), upload, observation.TestTraceLogger(logtest.Scoped(t)))
requeued, err := svc.HandleRawUpload(context.Background(), logtest.Scoped(t), upload, mockUploadStore, observation.TestTraceLogger(logtest.Scoped(t)))
if err == nil {
t.Fatalf("unexpected nil error handling upload")
} else if !strings.Contains(err.Error(), "uh-oh!") {
@ -246,15 +245,14 @@ func TestHandleCloneInProgress(t *testing.T) {
return "", &gitdomain.RepoNotExistError{Repo: repo.Name, CloneInProgress: true}
})
handler := &handler{
dbStore: mockDBStore,
svc := &Service{
store: mockDBStore,
repoStore: mockRepoStore,
workerStore: mockWorkerStore,
uploadStore: mockUploadStore,
workerutilStore: mockWorkerStore,
gitserverClient: gitserverClient,
}
requeued, err := handler.handle(context.Background(), logtest.Scoped(t), upload, observation.TestTraceLogger(logtest.Scoped(t)))
requeued, err := svc.HandleRawUpload(context.Background(), logtest.Scoped(t), upload, mockUploadStore, observation.TestTraceLogger(logtest.Scoped(t)))
if err != nil {
t.Fatalf("unexpected error handling upload: %s", err)
} else if !requeued {

View File

@ -150,11 +150,15 @@
- path: github.com/sourcegraph/sourcegraph/internal/codeintel/uploads
interfaces:
- GitserverClient
- UploadServiceForExpiration
- UploadServiceForCleanup
- PolicyService
- PolicyMatcher
- AutoIndexingService
- filename: internal/codeintel/uploads/internal/background/mocks_test.go
sources:
- path: github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/background
interfaces:
- UploadService
- GitserverClient
# TODO - cannot merge with the one above because 'Store' is looked for in multiple packages.
# This is a deficiency of go-mockgen filed in https://github.com/derision-test/go-mockgen/issues/36.
- filename: internal/codeintel/uploads/mocks2_test.go

View File

@ -14,6 +14,7 @@ func CodeIntelUploads() *monitoring.Dashboard {
Groups: []monitoring.Group{
shared.CodeIntelligence.NewUploadsServiceGroup(""),
shared.CodeIntelligence.NewUploadsStoreGroup(""),
shared.CodeIntelligence.NewUploadsBackgroundGroup(""),
shared.CodeIntelligence.NewUploadsGraphQLTransportGroup(""),
shared.CodeIntelligence.NewUploadsHTTPTransportGroup(""),
shared.CodeIntelligence.NewUploadsCleanupTaskGroup(""),

View File

@ -41,7 +41,7 @@ func (codeIntelligence) NewUploadsStoreGroup(containerName string) monitoring.Gr
return Observation.NewGroup(containerName, monitoring.ObservableOwnerCodeIntel, ObservationGroupOptions{
GroupConstructorOptions: GroupConstructorOptions{
Namespace: "codeintel",
DescriptionRoot: "Uploads > Store",
DescriptionRoot: "Uploads > Store (internal)",
Hidden: false,
ObservableConstructorOptions: ObservableConstructorOptions{
@ -130,6 +130,38 @@ func (codeIntelligence) NewUploadsHTTPTransportGroup(containerName string) monit
})
}
// src_codeintel_uploads_background_total
// src_codeintel_uploads_background_duration_seconds_bucket
// src_codeintel_uploads_background_errors_total
func (codeIntelligence) NewUploadsBackgroundGroup(containerName string) monitoring.Group {
return Observation.NewGroup(containerName, monitoring.ObservableOwnerCodeIntel, ObservationGroupOptions{
GroupConstructorOptions: GroupConstructorOptions{
Namespace: "codeintel",
DescriptionRoot: "Uploads > Background (internal)",
Hidden: false,
ObservableConstructorOptions: ObservableConstructorOptions{
MetricNameRoot: "codeintel_uploads_background",
MetricDescriptionRoot: "background",
By: []string{"op"},
},
},
SharedObservationGroupOptions: SharedObservationGroupOptions{
Total: NoAlertsOption("none"),
Duration: NoAlertsOption("none"),
Errors: NoAlertsOption("none"),
ErrorRate: NoAlertsOption("none"),
},
Aggregate: &SharedObservationGroupOptions{
Total: NoAlertsOption("none"),
Duration: NoAlertsOption("none"),
Errors: NoAlertsOption("none"),
ErrorRate: NoAlertsOption("none"),
},
})
}
// src_codeintel_background_upload_records_removed_total
// src_codeintel_background_index_records_removed_total
// src_codeintel_background_uploads_purged_total