Embeddings: minimal local qdrant ingestion (#55559)

This is the minimal change to start ingesting into qdrant locally. Just
`sg start qdrant` and all your background embeddings jobs will slowly
fill up your qdrant db. Note that, right now, repos will only be
ingested if they have changed (we do not schedule any jobs for repos
that have not changed). I'll likely add some sort of "add a job for all
embedded repos" lever soon.

This does not yet use qdrant for the search side of things.
This commit is contained in:
Camden Cheek 2023-08-08 09:42:32 -06:00 committed by GitHub
parent 1226c2b10b
commit 5ba82c2c7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 66 additions and 6 deletions

View File

@ -598,6 +598,7 @@ func serviceConnections(logger log.Logger) conftypes.ServiceConnections {
Searchers: searcherAddrs,
Symbols: symbolsAddrs,
Embeddings: embeddingsAddrs,
Qdrant: qdrantAddr,
Zoekts: zoektAddrs,
ZoektListTTL: indexedListTTL,
}
@ -616,6 +617,8 @@ var (
embeddingsURLsOnce sync.Once
embeddingsURLs *endpoint.Map
qdrantAddr = os.Getenv("QDRANT_ENDPOINT")
indexedListTTL = func() time.Duration {
ttl, _ := time.ParseDuration(env.Get("SRC_INDEXED_SEARCH_LIST_CACHE_TTL", "", "Indexed search list cache TTL"))
if ttl == 0 {

View File

@ -50,6 +50,7 @@ Available comamndsets in `sg.config.yaml`:
* monitoring
* monitoring-alerts
* otel
* qdrant
* web-standalone
* web-standalone-prod
@ -124,6 +125,7 @@ Available commands in `sg.config.yaml`:
* otel-collector: OpenTelemetry collector
* postgres_exporter
* prometheus
* qdrant
* redis-postgres: Dockerized version of redis and postgres
* repo-updater
* searcher

View File

@ -33,6 +33,7 @@ go_library(
"//internal/featureflag",
"//internal/gitserver",
"//internal/goroutine",
"//internal/grpc/defaults",
"//internal/httpcli",
"//internal/observation",
"//internal/paths",

View File

@ -28,6 +28,7 @@ type handler struct {
db database.DB
uploadStore uploadstore.Store
gitserverClient gitserver.Client
qdrantInserter db.VectorInserter
contextService embed.ContextService
repoEmbeddingJobsStore bgrepo.RepoEmbeddingJobsStore
}
@ -102,8 +103,7 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo.
return err
}
qdrantInserter := db.NewNoopInserter()
err = qdrantInserter.PrepareUpdate(ctx, modelID, uint64(modelDims))
err = h.qdrantInserter.PrepareUpdate(ctx, modelID, uint64(modelDims))
if err != nil {
return err
}
@ -139,13 +139,13 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo.
logger.Info("found previous embeddings index. Attempting incremental update", log.String("old_revision", string(previousIndex.Revision)))
opts.IndexedRevision = previousIndex.Revision
hasPreviousIndex, err := qdrantInserter.HasIndex(ctx, modelID, repo.ID, previousIndex.Revision)
hasPreviousIndex, err := h.qdrantInserter.HasIndex(ctx, modelID, repo.ID, previousIndex.Revision)
if err != nil {
return err
}
if !hasPreviousIndex {
err = uploadPreviousIndex(ctx, modelID, qdrantInserter, repo.ID, previousIndex)
err = uploadPreviousIndex(ctx, modelID, h.qdrantInserter, repo.ID, previousIndex)
if err != nil {
return err
}
@ -166,7 +166,7 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo.
repoEmbeddingIndex, toRemove, stats, err := embed.EmbedRepo(
ctx,
embeddingsClient,
qdrantInserter,
h.qdrantInserter,
h.contextService,
fetcher,
repo.IDName(),
@ -179,7 +179,7 @@ func (h *handler) Handle(ctx context.Context, logger log.Logger, record *bgrepo.
return err
}
err = qdrantInserter.FinalizeUpdate(ctx, db.FinalizeUpdateParams{
err = h.qdrantInserter.FinalizeUpdate(ctx, db.FinalizeUpdateParams{
ModelID: modelID,
RepoID: repo.ID,
Revision: record.Revision,

View File

@ -8,13 +8,16 @@ import (
"github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/codeintel"
workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/embeddings"
repoembeddingsbg "github.com/sourcegraph/sourcegraph/internal/embeddings/background/repo"
vdb "github.com/sourcegraph/sourcegraph/internal/embeddings/db"
"github.com/sourcegraph/sourcegraph/internal/embeddings/embed"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/gitserver"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/grpc/defaults"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
@ -52,6 +55,15 @@ func (s *repoEmbeddingJob) Routines(_ context.Context, observationCtx *observati
return nil, err
}
qdrantInserter := vdb.NewNoopInserter()
if qdrantAddr := conf.Get().ServiceConnections().Qdrant; qdrantAddr != "" {
conn, err := defaults.Dial(qdrantAddr, observationCtx.Logger)
if err != nil {
return nil, err
}
qdrantInserter = vdb.NewQdrantDBFromConn(conn)
}
workCtx := actor.WithInternalActor(context.Background())
return []goroutine.BackgroundRoutine{
newRepoEmbeddingJobWorker(
@ -61,6 +73,7 @@ func (s *repoEmbeddingJob) Routines(_ context.Context, observationCtx *observati
db,
uploadStore,
gitserver.NewClient(db),
qdrantInserter,
services.ContextService,
repoembeddingsbg.NewRepoEmbeddingJobsStore(db),
),
@ -74,6 +87,7 @@ func newRepoEmbeddingJobWorker(
db database.DB,
uploadStore uploadstore.Store,
gitserverClient gitserver.Client,
qdrantInserter vdb.VectorInserter,
contextService embed.ContextService,
repoEmbeddingJobsStore repoembeddingsbg.RepoEmbeddingJobsStore,
) *workerutil.Worker[*repoembeddingsbg.RepoEmbeddingJob] {
@ -81,6 +95,7 @@ func newRepoEmbeddingJobWorker(
db: db,
uploadStore: uploadStore,
gitserverClient: gitserverClient,
qdrantInserter: qdrantInserter,
contextService: contextService,
repoEmbeddingJobsStore: repoEmbeddingJobsStore,
}

View File

@ -34,6 +34,8 @@ type ServiceConnections struct {
Symbols []string `json:"symbols"`
// Embeddings is the addresses of embeddings instances that should be talked to.
Embeddings []string `json:"embeddings"`
// Qdrant is the address of the Qdrant instance (or empty if disabled)
Qdrant string `json:"qdrant"`
// Zoekts is the addresses of Zoekt instances to talk to.
Zoekts []string `json:"zoekts"`
// ZoektListTTL is the TTL of the internal cache that Zoekt clients use to

View File

@ -18,6 +18,7 @@ go_library(
"//lib/pointers",
"@com_github_google_uuid//:uuid",
"@com_github_qdrant_go_client//qdrant",
"@org_golang_google_grpc//:go_default_library",
],
)

View File

@ -4,11 +4,16 @@ import (
"context"
qdrant "github.com/qdrant/go-client/qdrant"
"google.golang.org/grpc"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/lib/pointers"
)
func NewQdrantDBFromConn(conn *grpc.ClientConn) VectorDB {
return NewQdrantDB(qdrant.NewPointsClient(conn), qdrant.NewCollectionsClient(conn))
}
func NewQdrantDB(pointsClient qdrant.PointsClient, collectionsClient qdrant.CollectionsClient) VectorDB {
return &qdrantDB{
pointsClient: pointsClient,

View File

@ -271,6 +271,12 @@ commands:
- internal
- enterprise/cmd/embeddings
- internal/embeddings
qdrant:
cmd: |
docker run -p 6333:6333 -p 6334:6334 \
-v $HOME/.sourcegraph-dev/data/qdrant_data:/qdrant/storage \
-e QDRANT__SERVICE__GRPC_PORT="6334" \
qdrant/qdrant
worker:
cmd: |
export SOURCEGRAPH_LICENSE_GENERATION_KEY=$(cat ../dev-private/enterprise/dev/test-license-generation-key.pem)
@ -1425,6 +1431,31 @@ commandsets:
commands:
- cody-gateway
qdrant:
commands:
- qdrant
- frontend
- worker
- repo-updater
- web
- gitserver-0
- gitserver-1
- searcher
- caddy
- symbols
- docsite
- syntax-highlighter
- github-proxy
- zoekt-index-0
- zoekt-index-1
- zoekt-web-0
- zoekt-web-1
- blobstore
- embeddings
env:
QDRANT_ENDPOINT: 'localhost:6334'
tests:
# These can be run with `sg test [name]`
backend: