codeintel: Sink construction of pci-worker into service (#42666)

This commit is contained in:
Eric Fritz 2022-10-06 22:21:24 -05:00 committed by GitHub
parent 9ada68c16c
commit 73ccd3a26f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 26 deletions

View File

@ -12,7 +12,6 @@ import (
"go.opentelemetry.io/otel"
eiauthz "github.com/sourcegraph/sourcegraph/enterprise/internal/authz"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/authz"
"github.com/sourcegraph/sourcegraph/internal/codeintel"
"github.com/sourcegraph/sourcegraph/internal/codeintel/stores"
@ -35,8 +34,6 @@ import (
"github.com/sourcegraph/sourcegraph/internal/tracer"
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
"github.com/sourcegraph/sourcegraph/internal/version"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -94,16 +91,8 @@ func main() {
// be ready for traffic.
close(ready)
// Initialize stores
uploadStore, err := lsifuploadstore.New(context.Background(), config.LSIFUploadStoreConfig, observationContext)
if err != nil {
logger.Fatal("Failed to create upload store", log.Error(err))
}
if err := initializeUploadStore(context.Background(), uploadStore); err != nil {
logger.Fatal("Failed to initialize upload store", log.Error(err))
}
// Initialize sub-repo permissions client
var err error
authz.DefaultSubRepoPermsChecker, err = authz.NewSubRepoPermsClient(db.SubRepoPerms())
if err != nil {
logger.Fatal("Failed to create sub-repo client", log.Error(err))
@ -117,21 +106,23 @@ func main() {
logger.Fatal("Failed to create codeintel services", log.Error(err))
}
// Initialize stores
uploadStore, err := lsifuploadstore.New(context.Background(), config.LSIFUploadStoreConfig, observationContext)
if err != nil {
logger.Fatal("Failed to create upload store", log.Error(err))
}
if err := initializeUploadStore(context.Background(), uploadStore); err != nil {
logger.Fatal("Failed to initialize upload store", log.Error(err))
}
// Initialize worker
rootContext := actor.WithInternalActor(context.Background())
handler := services.UploadsService.WorkerutilHandler(
worker := services.UploadsService.NewWorker(
uploadStore,
config.WorkerConcurrency,
config.WorkerBudget,
config.WorkerPollInterval,
config.MaximumRuntimePerJob,
)
worker := dbworker.NewWorker(rootContext, services.UploadsService.WorkerutilStore(), handler, workerutil.WorkerOptions{
Name: "precise_code_intel_upload_worker",
NumHandlers: config.WorkerConcurrency,
Interval: config.WorkerPollInterval,
HeartbeatInterval: time.Second,
Metrics: makeWorkerMetrics(observationContext),
MaximumRuntimePerJob: config.MaximumRuntimePerJob,
})
// Initialize health server
server := httpserver.NewFromAddr(addr, &http.Server{
@ -194,10 +185,6 @@ func makeObservationContext(observationContext *observation.Context, withHoney b
return &ctx
}
func makeWorkerMetrics(observationContext *observation.Context) workerutil.WorkerMetrics {
return workerutil.NewMetrics(observationContext, "codeintel_upload_processor")
}
func initializeUploadStore(ctx context.Context, uploadStore uploadstore.Store) error {
for {
if err := uploadStore.Init(ctx); err == nil || !isRequestError(err) {

View File

@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/api"
codeinteltypes "github.com/sourcegraph/sourcegraph/internal/codeintel/types"
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/lsifstore"
@ -24,12 +25,37 @@ import (
"github.com/sourcegraph/sourcegraph/internal/types"
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
"github.com/sourcegraph/sourcegraph/lib/codeintel/lsif/conversion"
"github.com/sourcegraph/sourcegraph/lib/codeintel/precise"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func (s *Service) NewWorker(
uploadStore uploadstore.Store,
workerConcurrency int,
workerBudget int64,
workerPollInterval time.Duration,
maximumRuntimePerJob time.Duration,
) *workerutil.Worker {
rootContext := actor.WithInternalActor(context.Background())
handler := s.WorkerutilHandler(
uploadStore,
workerConcurrency,
workerBudget,
)
return dbworker.NewWorker(rootContext, s.WorkerutilStore(), handler, workerutil.WorkerOptions{
Name: "precise_code_intel_upload_worker",
NumHandlers: workerConcurrency,
Interval: workerPollInterval,
HeartbeatInterval: time.Second,
Metrics: s.workerMetrics,
MaximumRuntimePerJob: maximumRuntimePerJob,
})
}
func (s *Service) WorkerutilStore() dbworkerstore.Store {
return s.workerutilStore
}

View File

@ -18,6 +18,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/gitserver"
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
"github.com/sourcegraph/sourcegraph/lib/codeintel/precise"
@ -93,6 +94,7 @@ type Service struct {
expirationMetrics *expirationMetrics
resetterMetrics *resetterMetrics
janitorMetrics *janitorMetrics
workerMetrics workerutil.WorkerMetrics
policyMatcher PolicyMatcher
locker Locker
logger logger.Logger
@ -127,6 +129,7 @@ func newService(
expirationMetrics: newExpirationMetrics(observationContext),
resetterMetrics: newResetterMetrics(observationContext),
janitorMetrics: newJanitorMetrics(observationContext),
workerMetrics: workerutil.NewMetrics(observationContext, "codeintel_upload_processor"),
policyMatcher: policyMatcher,
locker: locker,
logger: observationContext.Logger,