From d3528061cda2c1db06b7a67217798b5ab30b231d Mon Sep 17 00:00:00 2001 From: Stefan Hengl Date: Wed, 24 Jul 2024 10:03:33 +0200 Subject: [PATCH] chore(worker): move llm token counter to worker (#64008) This moves the LLM token counter from `frontend` to the `worker`. Test plan: new unit test --- cmd/frontend/internal/bg/BUILD.bazel | 4 -- cmd/frontend/internal/cli/serve_cmd.go | 1 - cmd/worker/internal/completions/BUILD.bazel | 39 +++++++++++ cmd/worker/internal/completions/job.go | 67 +++++++++++++++++++ .../internal/completions/tokens.go} | 22 +----- .../internal/completions/tokens_test.go | 40 +++++++++++ cmd/worker/internal/eventlogs/job.go | 8 +-- cmd/worker/shared/BUILD.bazel | 1 + cmd/worker/shared/main.go | 2 + internal/completions/tokenusage/BUILD.bazel | 1 + internal/completions/tokenusage/tokenusage.go | 6 ++ lib/telemetrygateway/v1/BUILD.bazel | 1 + 12 files changed, 163 insertions(+), 29 deletions(-) create mode 100644 cmd/worker/internal/completions/BUILD.bazel create mode 100644 cmd/worker/internal/completions/job.go rename cmd/{frontend/internal/bg/store_token_usage_in_postgres.go => worker/internal/completions/tokens.go} (56%) create mode 100644 cmd/worker/internal/completions/tokens_test.go diff --git a/cmd/frontend/internal/bg/BUILD.bazel b/cmd/frontend/internal/bg/BUILD.bazel index c6dce78e52e..9ceb89415b9 100644 --- a/cmd/frontend/internal/bg/BUILD.bazel +++ b/cmd/frontend/internal/bg/BUILD.bazel @@ -6,20 +6,16 @@ go_library( srcs = [ "check_redis_cache_eviction_policy.go", "doc.go", - "store_token_usage_in_postgres.go", "update_permissions.go", ], importpath = "github.com/sourcegraph/sourcegraph/cmd/frontend/internal/bg", visibility = ["//cmd/frontend:__subpackages__"], deps = [ "//internal/collections", - "//internal/completions/tokenusage", "//internal/database", "//internal/rbac", "//internal/rbac/types", "//internal/redispool", - "//internal/telemetry", - "//internal/telemetry/telemetryrecorder", "//internal/types", "//lib/errors", "@com_github_gomodule_redigo//redis", diff --git a/cmd/frontend/internal/cli/serve_cmd.go b/cmd/frontend/internal/cli/serve_cmd.go index 5c8d01f7f67..845409aad64 100644 --- a/cmd/frontend/internal/cli/serve_cmd.go +++ b/cmd/frontend/internal/cli/serve_cmd.go @@ -213,7 +213,6 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic goroutine.Go(func() { bg.UpdatePermissions(ctx, logger, db) }) // Recurring - goroutine.Go(func() { bg.ScheduleStoreTokenUsage(ctx, db) }) goroutine.Go(func() { updatecheck.Start(logger, db) }) goroutine.Go(func() { adminanalytics.StartAnalyticsCacheRefresh(context.Background(), db) }) goroutine.Go(func() { users.StartUpdateAggregatedUsersStatisticsTable(context.Background(), db) }) diff --git a/cmd/worker/internal/completions/BUILD.bazel b/cmd/worker/internal/completions/BUILD.bazel new file mode 100644 index 00000000000..0232b2091a6 --- /dev/null +++ b/cmd/worker/internal/completions/BUILD.bazel @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("//dev:go_defs.bzl", "go_test") + +go_library( + name = "completions", + srcs = [ + "job.go", + "tokens.go", + ], + importpath = "github.com/sourcegraph/sourcegraph/cmd/worker/internal/completions", + visibility = ["//cmd/worker:__subpackages__"], + deps = [ + "//cmd/worker/job", + "//cmd/worker/shared/init/db", + "//internal/completions/tokenusage", + "//internal/database", + "//internal/env", + "//internal/goroutine", + "//internal/metrics", + "//internal/observation", + "//internal/telemetry", + "//internal/telemetry/telemetryrecorder", + ], +) + +go_test( + name = "completions_test", + srcs = ["tokens_test.go"], + embed = [":completions"], + tags = ["requires-network"], + deps = [ + "//internal/completions/tokenusage", + "//internal/rcache", + "//internal/telemetry", + "//internal/telemetry/telemetrytest", + "//lib/telemetrygateway/v1:telemetrygateway", + "@com_github_stretchr_testify//require", + ], +) diff --git a/cmd/worker/internal/completions/job.go b/cmd/worker/internal/completions/job.go new file mode 100644 index 00000000000..4c979935179 --- /dev/null +++ b/cmd/worker/internal/completions/job.go @@ -0,0 +1,67 @@ +package completions + +import ( + "context" + "time" + + "github.com/sourcegraph/sourcegraph/cmd/worker/job" + "github.com/sourcegraph/sourcegraph/internal/completions/tokenusage" + "github.com/sourcegraph/sourcegraph/internal/metrics" + "github.com/sourcegraph/sourcegraph/internal/telemetry/telemetryrecorder" + + workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/env" + "github.com/sourcegraph/sourcegraph/internal/goroutine" + "github.com/sourcegraph/sourcegraph/internal/observation" +) + +type tokenUsageJob struct{} + +func NewTokenUsageJob() job.Job { + return &tokenUsageJob{} +} + +func (e tokenUsageJob) Description() string { + return "stores LLM token usage in DB" +} + +func (e tokenUsageJob) Config() []env.Config { + return nil +} + +func (e tokenUsageJob) Routines(_ context.Context, observationCtx *observation.Context) ([]goroutine.BackgroundRoutine, error) { + db, err := workerdb.InitDB(observationCtx) + if err != nil { + return nil, err + } + + return []goroutine.BackgroundRoutine{ + newTokenUsageJob(observationCtx, db), + }, + nil +} + +func newTokenUsageJob(observationCtx *observation.Context, db database.DB) goroutine.BackgroundRoutine { + handler := goroutine.HandlerFunc(func(ctx context.Context) error { + return recordTokenUsage(ctx, tokenusage.NewManager(), telemetryrecorder.New(db)) + }) + + operation := observationCtx.Operation(observation.Op{ + Name: "cody.llmTokenCounter.record", + Metrics: metrics.NewREDMetrics( + observationCtx.Registerer, + "cody_llm_token_counter", + metrics.WithCountHelp("Total number of cody_llm_token_counter executions"), + ), + }) + + return goroutine.NewPeriodicGoroutine( + context.Background(), + handler, + goroutine.WithName("cody_llm_token_counter"), + goroutine.WithDescription("Stores LLM token usage in DB"), + goroutine.WithInterval(5*time.Minute), + goroutine.WithOperation(operation), + ) +} diff --git a/cmd/frontend/internal/bg/store_token_usage_in_postgres.go b/cmd/worker/internal/completions/tokens.go similarity index 56% rename from cmd/frontend/internal/bg/store_token_usage_in_postgres.go rename to cmd/worker/internal/completions/tokens.go index d9831749414..f781fa1b496 100644 --- a/cmd/frontend/internal/bg/store_token_usage_in_postgres.go +++ b/cmd/worker/internal/completions/tokens.go @@ -1,31 +1,13 @@ -package bg +package completions import ( "context" - "fmt" - "time" "github.com/sourcegraph/sourcegraph/internal/completions/tokenusage" - "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/telemetry" - "github.com/sourcegraph/sourcegraph/internal/telemetry/telemetryrecorder" ) -func ScheduleStoreTokenUsage(ctx context.Context, db database.DB) { - for { - err := storeTokenUsageinDb(ctx, db) - if err != nil { - fmt.Printf("Error storing token usage: %v\n", err) - } - - // Wait for 5 minutes before the next execution - time.Sleep(5 * time.Minute) - } -} - -func storeTokenUsageinDb(ctx context.Context, db database.DB) error { - recorder := telemetryrecorder.New(db) - tokenManager := tokenusage.NewManager() +func recordTokenUsage(ctx context.Context, tokenManager *tokenusage.Manager, recorder *telemetry.EventRecorder) error { tokenUsageData, err := tokenManager.FetchTokenUsageDataForAnalysis() if err != nil { return err diff --git a/cmd/worker/internal/completions/tokens_test.go b/cmd/worker/internal/completions/tokens_test.go new file mode 100644 index 00000000000..ecc66614fa1 --- /dev/null +++ b/cmd/worker/internal/completions/tokens_test.go @@ -0,0 +1,40 @@ +package completions + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sourcegraph/sourcegraph/internal/completions/tokenusage" + "github.com/sourcegraph/sourcegraph/internal/rcache" + "github.com/sourcegraph/sourcegraph/internal/telemetry" + "github.com/sourcegraph/sourcegraph/internal/telemetry/telemetrytest" + v1 "github.com/sourcegraph/sourcegraph/lib/telemetrygateway/v1" +) + +func TestStoreTokenUsageInDB(t *testing.T) { + kv := rcache.SetupForTest(t) + cache := rcache.NewWithTTL(kv, "LLMUsage", 1800) + cache.SetInt("LLMUsage:model1:feature1:stream:input", 10) + cache.SetInt("LLMUsage:model1:feature1:stream:output", 20) + manager := tokenusage.NewManagerWithCache(cache) + + mockEventStore := telemetrytest.NewMockEventsStore() + var sentEvent []*v1.Event + mockEventStore.StoreEventsFunc.SetDefaultHook(func(ctx context.Context, event []*v1.Event) error { + sentEvent = event + return nil + }) + recorder := telemetry.NewEventRecorder(mockEventStore) + + err := recordTokenUsage(context.Background(), manager, recorder) + require.NoError(t, err) + require.Equal(t, len(sentEvent), 1) + require.Equal(t, sentEvent[0].Feature, "cody.llmTokenCounter") + require.Equal(t, map[string]float64{ + "LLMUsage:model1:feature1:stream:input": 10, + "LLMUsage:model1:feature1:stream:output": 20, + "FinalFetchAndSync": 0.0, + }, sentEvent[0].Parameters.Metadata) +} diff --git a/cmd/worker/internal/eventlogs/job.go b/cmd/worker/internal/eventlogs/job.go index b1a425cb261..edf1942825a 100644 --- a/cmd/worker/internal/eventlogs/job.go +++ b/cmd/worker/internal/eventlogs/job.go @@ -35,13 +35,13 @@ func (e eventLogsJob) Routines(_ context.Context, observationCtx *observation.Co } return []goroutine.BackgroundRoutine{ - NewEventLogsJob(observationCtx, db), - NewSecurityEventLogsJob(observationCtx, db), + newEventLogsJob(observationCtx, db), + newSecurityEventLogsJob(observationCtx, db), }, nil } -func NewEventLogsJob(observationCtx *observation.Context, db database.DB) goroutine.BackgroundRoutine { +func newEventLogsJob(observationCtx *observation.Context, db database.DB) goroutine.BackgroundRoutine { handler := goroutine.HandlerFunc(func(ctx context.Context) error { return deleteOldEventLogsInPostgres(ctx, db) }) @@ -65,7 +65,7 @@ func NewEventLogsJob(observationCtx *observation.Context, db database.DB) gorout ) } -func NewSecurityEventLogsJob(observationCtx *observation.Context, db database.DB) goroutine.BackgroundRoutine { +func newSecurityEventLogsJob(observationCtx *observation.Context, db database.DB) goroutine.BackgroundRoutine { handler := goroutine.HandlerFunc(func(ctx context.Context) error { return deleteOldSecurityEventLogsInPostgres(ctx, db) }) diff --git a/cmd/worker/shared/BUILD.bazel b/cmd/worker/shared/BUILD.bazel index 861630b5051..30ae57ed640 100644 --- a/cmd/worker/shared/BUILD.bazel +++ b/cmd/worker/shared/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//cmd/worker/internal/codeintel", "//cmd/worker/internal/codemonitors", "//cmd/worker/internal/codygateway", + "//cmd/worker/internal/completions", "//cmd/worker/internal/embeddings/repo", "//cmd/worker/internal/encryption", "//cmd/worker/internal/eventlogs", diff --git a/cmd/worker/shared/main.go b/cmd/worker/shared/main.go index eea086bfdfa..b54bc0633dc 100644 --- a/cmd/worker/shared/main.go +++ b/cmd/worker/shared/main.go @@ -17,6 +17,7 @@ import ( "github.com/sourcegraph/sourcegraph/cmd/worker/internal/codeintel" "github.com/sourcegraph/sourcegraph/cmd/worker/internal/codemonitors" "github.com/sourcegraph/sourcegraph/cmd/worker/internal/codygateway" + "github.com/sourcegraph/sourcegraph/cmd/worker/internal/completions" repoembeddings "github.com/sourcegraph/sourcegraph/cmd/worker/internal/embeddings/repo" "github.com/sourcegraph/sourcegraph/cmd/worker/internal/encryption" "github.com/sourcegraph/sourcegraph/cmd/worker/internal/eventlogs" @@ -106,6 +107,7 @@ func LoadConfig(registerEnterpriseMigrators oobmigration.RegisterMigratorsFunc) "export-usage-telemetry": telemetry.NewTelemetryJob(), "telemetrygateway-exporter": telemetrygatewayexporter.NewJob(), "event-logs-janitor": eventlogs.NewEventLogsJanitorJob(), + "cody-llm-token-counter": completions.NewTokenUsageJob(), "codeintel-policies-repository-matcher": codeintel.NewPoliciesRepositoryMatcherJob(), "codeintel-autoindexing-summary-builder": codeintel.NewAutoindexingSummaryBuilder(), diff --git a/internal/completions/tokenusage/BUILD.bazel b/internal/completions/tokenusage/BUILD.bazel index dea5064dcb9..669845e5d42 100644 --- a/internal/completions/tokenusage/BUILD.bazel +++ b/internal/completions/tokenusage/BUILD.bazel @@ -9,6 +9,7 @@ go_library( visibility = [ "//cmd/cody-gateway:__subpackages__", "//cmd/frontend/internal/bg:__pkg__", + "//cmd/worker/internal/completions:__pkg__", "//internal/completions/client:__pkg__", "//internal/completions/client/anthropic:__pkg__", "//internal/completions/client/awsbedrock:__pkg__", diff --git a/internal/completions/tokenusage/tokenusage.go b/internal/completions/tokenusage/tokenusage.go index 9750fec36ee..be31df0af88 100644 --- a/internal/completions/tokenusage/tokenusage.go +++ b/internal/completions/tokenusage/tokenusage.go @@ -24,6 +24,12 @@ func NewManager() *Manager { } } +func NewManagerWithCache(cache *rcache.Cache) *Manager { + return &Manager{ + cache: cache, + } +} + type Provider string const ( diff --git a/lib/telemetrygateway/v1/BUILD.bazel b/lib/telemetrygateway/v1/BUILD.bazel index 44168b30401..dbcdf3ac6f9 100644 --- a/lib/telemetrygateway/v1/BUILD.bazel +++ b/lib/telemetrygateway/v1/BUILD.bazel @@ -44,6 +44,7 @@ go_library( "//cmd/telemetry-gateway/shared:__pkg__", "//cmd/telemetrygateway/server:__pkg__", "//cmd/telemetrygateway/shared:__pkg__", + "//cmd/worker/internal/completions:__pkg__", "//internal/api:__pkg__", "//internal/database:__pkg__", "//internal/database/dbmocks:__pkg__",