search jobs: add blobstore (#56463)

This adds a blobstore for search job results to the worker and frontend. 
The worker will need the blobstore to upload results, frontend will need it to aggregate the CSV. 

This is mostly copy&paste from embeddings and codeintel.

Note:
- We should think about a central blobstore config instead of copy&pasting like I did here. I think for now this is ok to maintain speed for the sake of the first release but it feels wrong to copy&paste this in the future.

## Test plan
- CI
- We are not using the blobstore anywhere yet, so this should be save. Locally I confirmed that I can upload a blob.
This commit is contained in:
Stefan Hengl 2023-09-08 15:01:03 +02:00 committed by GitHub
parent dbd4852ab4
commit 87ce692fe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 111 additions and 1 deletions

View File

@ -15,5 +15,6 @@ go_library(
"//internal/observation",
"//internal/search/exhaustive/service",
"//internal/search/exhaustive/store",
"//internal/search/exhaustive/uploadstore",
],
)

View File

@ -12,8 +12,13 @@ import (
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/service"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store"
uploadstore "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/uploadstore"
)
func LoadConfig() {
uploadstore.ConfigInst.Load()
}
// Init initializes the given enterpriseServices to include the required resolvers for search.
func Init(
ctx context.Context,

View File

@ -5,6 +5,7 @@ import (
frontend_shared "github.com/sourcegraph/sourcegraph/cmd/frontend/shared"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/internal/codeintel"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/internal/search"
"github.com/sourcegraph/sourcegraph/internal/debugserver"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/observation"
@ -22,6 +23,7 @@ func (svc) Name() string { return "frontend" }
func (svc) Configure() (env.Config, []debugserver.Endpoint) {
frontend_shared.CLILoadConfig()
codeintel.LoadConfig()
search.LoadConfig()
return nil, frontend_shared.GRPCWebUIDebugEndpoints()
}

View File

@ -22,6 +22,7 @@ go_library(
"//internal/search/exhaustive/service",
"//internal/search/exhaustive/store",
"//internal/search/exhaustive/types",
"//internal/search/exhaustive/uploadstore",
"//internal/workerutil",
"//internal/workerutil/dbworker",
"//internal/workerutil/dbworker/store",

View File

@ -14,6 +14,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/service"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/uploadstore"
)
// config stores shared config we can override in each worker. We don't expose
@ -51,7 +52,7 @@ func (j *searchJob) Description() string {
}
func (j *searchJob) Config() []env.Config {
return nil
return []env.Config{uploadstore.ConfigInst}
}
func (j *searchJob) Routines(_ context.Context, observationCtx *observation.Context) ([]goroutine.BackgroundRoutine, error) {

View File

@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "uploadstore",
srcs = ["store.go"],
importpath = "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/uploadstore",
visibility = ["//:__subpackages__"],
deps = [
"//internal/conf/deploy",
"//internal/env",
"//internal/observation",
"//internal/uploadstore",
"//lib/errors",
],
)

View File

@ -0,0 +1,82 @@
package uploadstore
import (
"context"
"strings"
"github.com/sourcegraph/sourcegraph/internal/conf/deploy"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type Config struct {
env.BaseConfig
Backend string
ManageBucket bool
Bucket string
S3Region string
S3Endpoint string
S3UsePathStyle bool
S3AccessKeyID string
S3SecretAccessKey string
S3SessionToken string
GCSProjectID string
GCSCredentialsFile string
GCSCredentialsFileContents string
}
func (c *Config) Load() {
c.Backend = strings.ToLower(c.Get("SEARCH_JOBS_UPLOAD_BACKEND", "blobstore", "The target file service for search jobs. S3, GCS, and Blobstore are supported."))
c.ManageBucket = c.GetBool("SEARCH_JOBS_UPLOAD_MANAGE_BUCKET", "false", "Whether or not the client should manage the target bucket configuration.")
c.Bucket = c.Get("SEARCH_JOBS_UPLOAD_BUCKET", "search-jobs", "The name of the bucket to store search job results in.")
if c.Backend != "blobstore" && c.Backend != "s3" && c.Backend != "gcs" {
c.AddError(errors.Errorf("invalid backend %q for SEARCH_JOBS_UPLOAD_BACKEND: must be S3, GCS, or Blobstore", c.Backend))
}
if c.Backend == "blobstore" || c.Backend == "s3" {
c.S3Region = c.Get("SEARCH_JOBS_UPLOAD_AWS_REGION", "us-east-1", "The target AWS region.")
c.S3Endpoint = c.Get("SEARCH_JOBS_UPLOAD_AWS_ENDPOINT", deploy.BlobstoreDefaultEndpoint(), "The target AWS endpoint.")
c.S3UsePathStyle = c.GetBool("SEARCH_JOBS_UPLOAD_AWS_USE_PATH_STYLE", "false", "Whether to use path calling (vs subdomain calling).")
ec2RoleCredentials := c.GetBool("SEARCH_JOBS_UPLOAD_AWS_USE_EC2_ROLE_CREDENTIALS", "false", "Whether to use the EC2 metadata API, or use the provided static credentials.")
if !ec2RoleCredentials {
c.S3AccessKeyID = c.Get("SEARCH_JOBS_UPLOAD_AWS_ACCESS_KEY_ID", "AKIAIOSFODNN7EXAMPLE", "An AWS access key associated with a user with access to S3.")
c.S3SecretAccessKey = c.Get("SEARCH_JOBS_UPLOAD_AWS_SECRET_ACCESS_KEY", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "An AWS secret key associated with a user with access to S3.")
c.S3SessionToken = c.GetOptional("SEARCH_JOBS_UPLOAD_AWS_SESSION_TOKEN", "An optional AWS session token associated with a user with access to S3.")
}
} else if c.Backend == "gcs" {
c.GCSProjectID = c.Get("SEARCH_JOBS_UPLOAD_GCP_PROJECT_ID", "", "The project containing the GCS bucket.")
c.GCSCredentialsFile = c.GetOptional("SEARCH_JOBS_UPLOAD_GOOGLE_APPLICATION_CREDENTIALS_FILE", "The path to a service account key file with access to GCS.")
c.GCSCredentialsFileContents = c.GetOptional("SEARCH_JOBS_UPLOAD_GOOGLE_APPLICATION_CREDENTIALS_FILE_CONTENT", "The contents of a service account key file with access to GCS.")
}
}
var ConfigInst = &Config{}
func New(ctx context.Context, observationCtx *observation.Context, conf *Config) (uploadstore.Store, error) {
c := uploadstore.Config{
Backend: conf.Backend,
ManageBucket: conf.ManageBucket,
Bucket: conf.Bucket,
S3: uploadstore.S3Config{
Region: conf.S3Region,
Endpoint: conf.S3Endpoint,
UsePathStyle: conf.S3UsePathStyle,
AccessKeyID: conf.S3AccessKeyID,
SecretAccessKey: conf.S3SecretAccessKey,
SessionToken: conf.S3SessionToken,
},
GCS: uploadstore.GCSConfig{
ProjectID: conf.GCSProjectID,
CredentialsFile: conf.GCSCredentialsFile,
CredentialsFileContents: conf.GCSCredentialsFileContents,
},
}
return uploadstore.CreateLazy(ctx, c, uploadstore.NewOperations(observationCtx, "search_jobs", "uploadstore"))
}

View File

@ -97,6 +97,9 @@ env:
# Required for embeddings job upload
EMBEDDINGS_UPLOAD_AWS_ENDPOINT: http://localhost:9000
# Required for upload of search job results
SEARCH_JOBS_UPLOAD_AWS_ENDPOINT: http://localhost:9000
# Disable auto-indexing the CNCF repo group (this only works in Cloud)
# This setting will be going away soon
DISABLE_CNCF: notonmybox