From 5ba82c2c7d6fb2100f7fc5a6becc116403d80530 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Tue, 8 Aug 2023 09:42:32 -0600 Subject: [PATCH] Embeddings: minimal local qdrant ingestion (#55559) This is the minimal change to start ingesting into qdrant locally. Just `sg start qdrant` and all your background embeddings jobs will slowly fill up your qdrant db. Note that, right now, repos will only be ingested if they have changed (we do not schedule any jobs for repos that have not changed). I'll likely add some sort of "add a job for all embedded repos" lever soon. This does not yet use qdrant for the search side of things. --- cmd/frontend/internal/cli/config.go | 3 ++ .../background-information/sg/reference.md | 2 ++ .../internal/embeddings/repo/BUILD.bazel | 1 + .../internal/embeddings/repo/handler.go | 12 +++---- .../worker/internal/embeddings/repo/worker.go | 15 +++++++++ internal/conf/conftypes/conftypes.go | 2 ++ internal/embeddings/db/BUILD.bazel | 1 + internal/embeddings/db/qdrant.go | 5 +++ sg.config.yaml | 31 +++++++++++++++++++ 9 files changed, 66 insertions(+), 6 deletions(-) diff --git a/cmd/frontend/internal/cli/config.go b/cmd/frontend/internal/cli/config.go index 5f6d54bb140..ab9cac03c13 100644 --- a/cmd/frontend/internal/cli/config.go +++ b/cmd/frontend/internal/cli/config.go @@ -598,6 +598,7 @@ func serviceConnections(logger log.Logger) conftypes.ServiceConnections { Searchers: searcherAddrs, Symbols: symbolsAddrs, Embeddings: embeddingsAddrs, + Qdrant: qdrantAddr, Zoekts: zoektAddrs, ZoektListTTL: indexedListTTL, } @@ -616,6 +617,8 @@ var ( embeddingsURLsOnce sync.Once embeddingsURLs *endpoint.Map + qdrantAddr = os.Getenv("QDRANT_ENDPOINT") + indexedListTTL = func() time.Duration { ttl, _ := time.ParseDuration(env.Get("SRC_INDEXED_SEARCH_LIST_CACHE_TTL", "", "Indexed search list cache TTL")) if ttl == 0 { diff --git a/doc/dev/background-information/sg/reference.md b/doc/dev/background-information/sg/reference.md index a8ab3e2d434..befe4707492 100644 --- a/doc/dev/background-information/sg/reference.md +++ b/doc/dev/background-information/sg/reference.md @@ -50,6 +50,7 @@ Available comamndsets in `sg.config.yaml`: * monitoring * monitoring-alerts * otel +* qdrant * web-standalone * web-standalone-prod @@ -124,6 +125,7 @@ Available commands in `sg.config.yaml`: * otel-collector: OpenTelemetry collector * postgres_exporter * prometheus +* qdrant * redis-postgres: Dockerized version of redis and postgres * repo-updater * searcher diff --git a/enterprise/cmd/worker/internal/embeddings/repo/BUILD.bazel b/enterprise/cmd/worker/internal/embeddings/repo/BUILD.bazel index 431cac1dfae..20eedbf3e5b 100644 --- a/enterprise/cmd/worker/internal/embeddings/repo/BUILD.bazel +++ b/enterprise/cmd/worker/internal/embeddings/repo/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//internal/featureflag", "//internal/gitserver", "//internal/goroutine", + "//internal/grpc/defaults", "//internal/httpcli", "//internal/observation", "//internal/paths", diff --git a/enterprise/cmd/worker/internal/embeddings/repo/handler.go b/enterprise/cmd/worker/internal/embeddings/repo/handler.go index 88a615b0b16..23d8a0e7fc0 100644 --- a/enterprise/cmd/worker/internal/embeddings/repo/handler.go +++ b/enterprise/cmd/worker/internal/embeddings/repo/handler.go @@ -28,6 +28,7 @@ type handler struct { db database.DB uploadStore uploadstore.Store gitserverClient gitserver.Client + qdrantInserter db.VectorInserter contextService embed.ContextService repoEmbeddingJobsStore bgrepo.RepoEmbeddingJobsStore } @@ -102,8 +103,7 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo. return err } - qdrantInserter := db.NewNoopInserter() - err = qdrantInserter.PrepareUpdate(ctx, modelID, uint64(modelDims)) + err = h.qdrantInserter.PrepareUpdate(ctx, modelID, uint64(modelDims)) if err != nil { return err } @@ -139,13 +139,13 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo. logger.Info("found previous embeddings index. Attempting incremental update", log.String("old_revision", string(previousIndex.Revision))) opts.IndexedRevision = previousIndex.Revision - hasPreviousIndex, err := qdrantInserter.HasIndex(ctx, modelID, repo.ID, previousIndex.Revision) + hasPreviousIndex, err := h.qdrantInserter.HasIndex(ctx, modelID, repo.ID, previousIndex.Revision) if err != nil { return err } if !hasPreviousIndex { - err = uploadPreviousIndex(ctx, modelID, qdrantInserter, repo.ID, previousIndex) + err = uploadPreviousIndex(ctx, modelID, h.qdrantInserter, repo.ID, previousIndex) if err != nil { return err } @@ -166,7 +166,7 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo. repoEmbeddingIndex, toRemove, stats, err := embed.EmbedRepo( ctx, embeddingsClient, - qdrantInserter, + h.qdrantInserter, h.contextService, fetcher, repo.IDName(), @@ -179,7 +179,7 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo. return err } - err = qdrantInserter.FinalizeUpdate(ctx, db.FinalizeUpdateParams{ + err = h.qdrantInserter.FinalizeUpdate(ctx, db.FinalizeUpdateParams{ ModelID: modelID, RepoID: repo.ID, Revision: record.Revision, diff --git a/enterprise/cmd/worker/internal/embeddings/repo/worker.go b/enterprise/cmd/worker/internal/embeddings/repo/worker.go index ee12c51c74f..8d3d9db4daf 100644 --- a/enterprise/cmd/worker/internal/embeddings/repo/worker.go +++ b/enterprise/cmd/worker/internal/embeddings/repo/worker.go @@ -8,13 +8,16 @@ import ( "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/codeintel" workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db" "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/embeddings" repoembeddingsbg "github.com/sourcegraph/sourcegraph/internal/embeddings/background/repo" + vdb "github.com/sourcegraph/sourcegraph/internal/embeddings/db" "github.com/sourcegraph/sourcegraph/internal/embeddings/embed" "github.com/sourcegraph/sourcegraph/internal/env" "github.com/sourcegraph/sourcegraph/internal/gitserver" "github.com/sourcegraph/sourcegraph/internal/goroutine" + "github.com/sourcegraph/sourcegraph/internal/grpc/defaults" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/sourcegraph/sourcegraph/internal/uploadstore" "github.com/sourcegraph/sourcegraph/internal/workerutil" @@ -52,6 +55,15 @@ func (s *repoEmbeddingJob) Routines(_ context.Context, observationCtx *observati return nil, err } + qdrantInserter := vdb.NewNoopInserter() + if qdrantAddr := conf.Get().ServiceConnections().Qdrant; qdrantAddr != "" { + conn, err := defaults.Dial(qdrantAddr, observationCtx.Logger) + if err != nil { + return nil, err + } + qdrantInserter = vdb.NewQdrantDBFromConn(conn) + } + workCtx := actor.WithInternalActor(context.Background()) return []goroutine.BackgroundRoutine{ newRepoEmbeddingJobWorker( @@ -61,6 +73,7 @@ func (s *repoEmbeddingJob) Routines(_ context.Context, observationCtx *observati db, uploadStore, gitserver.NewClient(db), + qdrantInserter, services.ContextService, repoembeddingsbg.NewRepoEmbeddingJobsStore(db), ), @@ -74,6 +87,7 @@ func newRepoEmbeddingJobWorker( db database.DB, uploadStore uploadstore.Store, gitserverClient gitserver.Client, + qdrantInserter vdb.VectorInserter, contextService embed.ContextService, repoEmbeddingJobsStore repoembeddingsbg.RepoEmbeddingJobsStore, ) *workerutil.Worker[*repoembeddingsbg.RepoEmbeddingJob] { @@ -81,6 +95,7 @@ func newRepoEmbeddingJobWorker( db: db, uploadStore: uploadStore, gitserverClient: gitserverClient, + qdrantInserter: qdrantInserter, contextService: contextService, repoEmbeddingJobsStore: repoEmbeddingJobsStore, } diff --git a/internal/conf/conftypes/conftypes.go b/internal/conf/conftypes/conftypes.go index 6a3b181f2a8..43e0f72bbcc 100644 --- a/internal/conf/conftypes/conftypes.go +++ b/internal/conf/conftypes/conftypes.go @@ -34,6 +34,8 @@ type ServiceConnections struct { Symbols []string `json:"symbols"` // Embeddings is the addresses of embeddings instances that should be talked to. Embeddings []string `json:"embeddings"` + // Qdrant is the address of the Qdrant instance (or empty if disabled) + Qdrant string `json:"qdrant"` // Zoekts is the addresses of Zoekt instances to talk to. Zoekts []string `json:"zoekts"` // ZoektListTTL is the TTL of the internal cache that Zoekt clients use to diff --git a/internal/embeddings/db/BUILD.bazel b/internal/embeddings/db/BUILD.bazel index a0aa80997e4..80f91f04817 100644 --- a/internal/embeddings/db/BUILD.bazel +++ b/internal/embeddings/db/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//lib/pointers", "@com_github_google_uuid//:uuid", "@com_github_qdrant_go_client//qdrant", + "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/internal/embeddings/db/qdrant.go b/internal/embeddings/db/qdrant.go index 97c6bab5d95..e866c4f88b8 100644 --- a/internal/embeddings/db/qdrant.go +++ b/internal/embeddings/db/qdrant.go @@ -4,11 +4,16 @@ import ( "context" qdrant "github.com/qdrant/go-client/qdrant" + "google.golang.org/grpc" "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/lib/pointers" ) +func NewQdrantDBFromConn(conn *grpc.ClientConn) VectorDB { + return NewQdrantDB(qdrant.NewPointsClient(conn), qdrant.NewCollectionsClient(conn)) +} + func NewQdrantDB(pointsClient qdrant.PointsClient, collectionsClient qdrant.CollectionsClient) VectorDB { return &qdrantDB{ pointsClient: pointsClient, diff --git a/sg.config.yaml b/sg.config.yaml index 11d93145765..bcbbc886210 100644 --- a/sg.config.yaml +++ b/sg.config.yaml @@ -271,6 +271,12 @@ commands: - internal - enterprise/cmd/embeddings - internal/embeddings + qdrant: + cmd: | + docker run -p 6333:6333 -p 6334:6334 \ + -v $HOME/.sourcegraph-dev/data/qdrant_data:/qdrant/storage \ + -e QDRANT__SERVICE__GRPC_PORT="6334" \ + qdrant/qdrant worker: cmd: | export SOURCEGRAPH_LICENSE_GENERATION_KEY=$(cat ../dev-private/enterprise/dev/test-license-generation-key.pem) @@ -1425,6 +1431,31 @@ commandsets: commands: - cody-gateway + qdrant: + commands: + - qdrant + - frontend + - worker + - repo-updater + - web + - gitserver-0 + - gitserver-1 + - searcher + - caddy + - symbols + - docsite + - syntax-highlighter + - github-proxy + - zoekt-index-0 + - zoekt-index-1 + - zoekt-web-0 + - zoekt-web-1 + - blobstore + - embeddings + env: + QDRANT_ENDPOINT: 'localhost:6334' + + tests: # These can be run with `sg test [name]` backend: