diff --git a/cmd/frontend/internal/cli/config.go b/cmd/frontend/internal/cli/config.go index 5f6d54bb140..ab9cac03c13 100644 --- a/cmd/frontend/internal/cli/config.go +++ b/cmd/frontend/internal/cli/config.go @@ -598,6 +598,7 @@ func serviceConnections(logger log.Logger) conftypes.ServiceConnections { Searchers: searcherAddrs, Symbols: symbolsAddrs, Embeddings: embeddingsAddrs, + Qdrant: qdrantAddr, Zoekts: zoektAddrs, ZoektListTTL: indexedListTTL, } @@ -616,6 +617,8 @@ var ( embeddingsURLsOnce sync.Once embeddingsURLs *endpoint.Map + qdrantAddr = os.Getenv("QDRANT_ENDPOINT") + indexedListTTL = func() time.Duration { ttl, _ := time.ParseDuration(env.Get("SRC_INDEXED_SEARCH_LIST_CACHE_TTL", "", "Indexed search list cache TTL")) if ttl == 0 { diff --git a/doc/dev/background-information/sg/reference.md b/doc/dev/background-information/sg/reference.md index a8ab3e2d434..befe4707492 100644 --- a/doc/dev/background-information/sg/reference.md +++ b/doc/dev/background-information/sg/reference.md @@ -50,6 +50,7 @@ Available comamndsets in `sg.config.yaml`: * monitoring * monitoring-alerts * otel +* qdrant * web-standalone * web-standalone-prod @@ -124,6 +125,7 @@ Available commands in `sg.config.yaml`: * otel-collector: OpenTelemetry collector * postgres_exporter * prometheus +* qdrant * redis-postgres: Dockerized version of redis and postgres * repo-updater * searcher diff --git a/enterprise/cmd/worker/internal/embeddings/repo/BUILD.bazel b/enterprise/cmd/worker/internal/embeddings/repo/BUILD.bazel index 431cac1dfae..20eedbf3e5b 100644 --- a/enterprise/cmd/worker/internal/embeddings/repo/BUILD.bazel +++ b/enterprise/cmd/worker/internal/embeddings/repo/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//internal/featureflag", "//internal/gitserver", "//internal/goroutine", + "//internal/grpc/defaults", "//internal/httpcli", "//internal/observation", "//internal/paths", diff --git a/enterprise/cmd/worker/internal/embeddings/repo/handler.go b/enterprise/cmd/worker/internal/embeddings/repo/handler.go index 88a615b0b16..23d8a0e7fc0 100644 --- a/enterprise/cmd/worker/internal/embeddings/repo/handler.go +++ b/enterprise/cmd/worker/internal/embeddings/repo/handler.go @@ -28,6 +28,7 @@ type handler struct { db database.DB uploadStore uploadstore.Store gitserverClient gitserver.Client + qdrantInserter db.VectorInserter contextService embed.ContextService repoEmbeddingJobsStore bgrepo.RepoEmbeddingJobsStore } @@ -102,8 +103,7 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo. return err } - qdrantInserter := db.NewNoopInserter() - err = qdrantInserter.PrepareUpdate(ctx, modelID, uint64(modelDims)) + err = h.qdrantInserter.PrepareUpdate(ctx, modelID, uint64(modelDims)) if err != nil { return err } @@ -139,13 +139,13 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo. logger.Info("found previous embeddings index. Attempting incremental update", log.String("old_revision", string(previousIndex.Revision))) opts.IndexedRevision = previousIndex.Revision - hasPreviousIndex, err := qdrantInserter.HasIndex(ctx, modelID, repo.ID, previousIndex.Revision) + hasPreviousIndex, err := h.qdrantInserter.HasIndex(ctx, modelID, repo.ID, previousIndex.Revision) if err != nil { return err } if !hasPreviousIndex { - err = uploadPreviousIndex(ctx, modelID, qdrantInserter, repo.ID, previousIndex) + err = uploadPreviousIndex(ctx, modelID, h.qdrantInserter, repo.ID, previousIndex) if err != nil { return err } @@ -166,7 +166,7 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo. repoEmbeddingIndex, toRemove, stats, err := embed.EmbedRepo( ctx, embeddingsClient, - qdrantInserter, + h.qdrantInserter, h.contextService, fetcher, repo.IDName(), @@ -179,7 +179,7 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo. return err } - err = qdrantInserter.FinalizeUpdate(ctx, db.FinalizeUpdateParams{ + err = h.qdrantInserter.FinalizeUpdate(ctx, db.FinalizeUpdateParams{ ModelID: modelID, RepoID: repo.ID, Revision: record.Revision, diff --git a/enterprise/cmd/worker/internal/embeddings/repo/worker.go b/enterprise/cmd/worker/internal/embeddings/repo/worker.go index ee12c51c74f..8d3d9db4daf 100644 --- a/enterprise/cmd/worker/internal/embeddings/repo/worker.go +++ b/enterprise/cmd/worker/internal/embeddings/repo/worker.go @@ -8,13 +8,16 @@ import ( "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/codeintel" workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db" "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/embeddings" repoembeddingsbg "github.com/sourcegraph/sourcegraph/internal/embeddings/background/repo" + vdb "github.com/sourcegraph/sourcegraph/internal/embeddings/db" "github.com/sourcegraph/sourcegraph/internal/embeddings/embed" "github.com/sourcegraph/sourcegraph/internal/env" "github.com/sourcegraph/sourcegraph/internal/gitserver" "github.com/sourcegraph/sourcegraph/internal/goroutine" + "github.com/sourcegraph/sourcegraph/internal/grpc/defaults" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/sourcegraph/sourcegraph/internal/uploadstore" "github.com/sourcegraph/sourcegraph/internal/workerutil" @@ -52,6 +55,15 @@ func (s *repoEmbeddingJob) Routines(_ context.Context, observationCtx *observati return nil, err } + qdrantInserter := vdb.NewNoopInserter() + if qdrantAddr := conf.Get().ServiceConnections().Qdrant; qdrantAddr != "" { + conn, err := defaults.Dial(qdrantAddr, observationCtx.Logger) + if err != nil { + return nil, err + } + qdrantInserter = vdb.NewQdrantDBFromConn(conn) + } + workCtx := actor.WithInternalActor(context.Background()) return []goroutine.BackgroundRoutine{ newRepoEmbeddingJobWorker( @@ -61,6 +73,7 @@ func (s *repoEmbeddingJob) Routines(_ context.Context, observationCtx *observati db, uploadStore, gitserver.NewClient(db), + qdrantInserter, services.ContextService, repoembeddingsbg.NewRepoEmbeddingJobsStore(db), ), @@ -74,6 +87,7 @@ func newRepoEmbeddingJobWorker( db database.DB, uploadStore uploadstore.Store, gitserverClient gitserver.Client, + qdrantInserter vdb.VectorInserter, contextService embed.ContextService, repoEmbeddingJobsStore repoembeddingsbg.RepoEmbeddingJobsStore, ) *workerutil.Worker[*repoembeddingsbg.RepoEmbeddingJob] { @@ -81,6 +95,7 @@ func newRepoEmbeddingJobWorker( db: db, uploadStore: uploadStore, gitserverClient: gitserverClient, + qdrantInserter: qdrantInserter, contextService: contextService, repoEmbeddingJobsStore: repoEmbeddingJobsStore, } diff --git a/internal/conf/conftypes/conftypes.go b/internal/conf/conftypes/conftypes.go index 6a3b181f2a8..43e0f72bbcc 100644 --- a/internal/conf/conftypes/conftypes.go +++ b/internal/conf/conftypes/conftypes.go @@ -34,6 +34,8 @@ type ServiceConnections struct { Symbols []string `json:"symbols"` // Embeddings is the addresses of embeddings instances that should be talked to. Embeddings []string `json:"embeddings"` + // Qdrant is the address of the Qdrant instance (or empty if disabled) + Qdrant string `json:"qdrant"` // Zoekts is the addresses of Zoekt instances to talk to. Zoekts []string `json:"zoekts"` // ZoektListTTL is the TTL of the internal cache that Zoekt clients use to diff --git a/internal/embeddings/db/BUILD.bazel b/internal/embeddings/db/BUILD.bazel index a0aa80997e4..80f91f04817 100644 --- a/internal/embeddings/db/BUILD.bazel +++ b/internal/embeddings/db/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//lib/pointers", "@com_github_google_uuid//:uuid", "@com_github_qdrant_go_client//qdrant", + "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/internal/embeddings/db/qdrant.go b/internal/embeddings/db/qdrant.go index 97c6bab5d95..e866c4f88b8 100644 --- a/internal/embeddings/db/qdrant.go +++ b/internal/embeddings/db/qdrant.go @@ -4,11 +4,16 @@ import ( "context" qdrant "github.com/qdrant/go-client/qdrant" + "google.golang.org/grpc" "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/lib/pointers" ) +func NewQdrantDBFromConn(conn *grpc.ClientConn) VectorDB { + return NewQdrantDB(qdrant.NewPointsClient(conn), qdrant.NewCollectionsClient(conn)) +} + func NewQdrantDB(pointsClient qdrant.PointsClient, collectionsClient qdrant.CollectionsClient) VectorDB { return &qdrantDB{ pointsClient: pointsClient, diff --git a/sg.config.yaml b/sg.config.yaml index 11d93145765..bcbbc886210 100644 --- a/sg.config.yaml +++ b/sg.config.yaml @@ -271,6 +271,12 @@ commands: - internal - enterprise/cmd/embeddings - internal/embeddings + qdrant: + cmd: | + docker run -p 6333:6333 -p 6334:6334 \ + -v $HOME/.sourcegraph-dev/data/qdrant_data:/qdrant/storage \ + -e QDRANT__SERVICE__GRPC_PORT="6334" \ + qdrant/qdrant worker: cmd: | export SOURCEGRAPH_LICENSE_GENERATION_KEY=$(cat ../dev-private/enterprise/dev/test-license-generation-key.pem) @@ -1425,6 +1431,31 @@ commandsets: commands: - cody-gateway + qdrant: + commands: + - qdrant + - frontend + - worker + - repo-updater + - web + - gitserver-0 + - gitserver-1 + - searcher + - caddy + - symbols + - docsite + - syntax-highlighter + - github-proxy + - zoekt-index-0 + - zoekt-index-1 + - zoekt-web-0 + - zoekt-web-1 + - blobstore + - embeddings + env: + QDRANT_ENDPOINT: 'localhost:6334' + + tests: # These can be run with `sg test [name]` backend: