diff --git a/cmd/frontend/internal/telemetry/resolvers/BUILD.bazel b/cmd/frontend/internal/telemetry/resolvers/BUILD.bazel index b7566efe253..eed1b1cfe63 100644 --- a/cmd/frontend/internal/telemetry/resolvers/BUILD.bazel +++ b/cmd/frontend/internal/telemetry/resolvers/BUILD.bazel @@ -1,14 +1,36 @@ +load("//dev:go_defs.bzl", "go_test") load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "resolvers", - srcs = ["resolvers.go"], + srcs = [ + "resolvers.go", + "telemetrygateway.go", + ], importpath = "github.com/sourcegraph/sourcegraph/cmd/frontend/internal/telemetry/resolvers", visibility = ["//cmd/frontend:__subpackages__"], deps = [ "//cmd/frontend/graphqlbackend", "//internal/database", + "//internal/telemetry/teestore", + "//internal/telemetrygateway/v1:telemetrygateway", + "//internal/version", "//lib/errors", "@com_github_sourcegraph_log//:log", + "@org_golang_google_protobuf//types/known/structpb", + ], +) + +go_test( + name = "resolvers_test", + srcs = ["telemetrygateway_test.go"], + embed = [":resolvers"], + deps = [ + "//cmd/frontend/graphqlbackend", + "//internal/actor", + "//lib/pointers", + "@com_github_hexops_autogold_v2//:autogold", + "@com_github_stretchr_testify//require", + "@org_golang_google_protobuf//encoding/protojson", ], ) diff --git a/cmd/frontend/internal/telemetry/resolvers/resolvers.go b/cmd/frontend/internal/telemetry/resolvers/resolvers.go index c5ea26ef3eb..adb77d840b2 100644 --- a/cmd/frontend/internal/telemetry/resolvers/resolvers.go +++ b/cmd/frontend/internal/telemetry/resolvers/resolvers.go @@ -2,27 +2,47 @@ package resolvers import ( "context" + "encoding/json" + "time" "github.com/sourcegraph/log" "github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend" "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/telemetry/teestore" + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" "github.com/sourcegraph/sourcegraph/lib/errors" ) // Resolver is the GraphQL resolver of all things related to telemetry V2. type Resolver struct { - logger log.Logger - db database.DB + logger log.Logger + teestore teestore.Store } // New returns a new Resolver whose store uses the given database func New(logger log.Logger, db database.DB) graphqlbackend.TelemetryResolver { - return &Resolver{logger: logger, db: db} + return &Resolver{logger: logger, teestore: teestore.NewStore(db.TelemetryEventsExportQueue(), db.EventLogs())} } var _ graphqlbackend.TelemetryResolver = &Resolver{} func (r *Resolver) RecordEvents(ctx context.Context, args *graphqlbackend.RecordEventsArgs) (*graphqlbackend.EmptyResponse, error) { - return nil, errors.New("not implemented") + if args == nil || len(args.Events) == 0 { + return nil, errors.New("no events provided") + } + gatewayEvents, err := newTelemetryGatewayEvents(ctx, time.Now(), telemetrygatewayv1.DefaultEventIDFunc, args.Events) + if err != nil { + // This is an important failure, make sure we surface it, as it could be + // an implementation error. + data, _ := json.Marshal(args.Events) + r.logger.Error("failed to convert telemetry events to internal format", + log.Error(err), + log.String("eventData", string(data))) + return nil, errors.Wrap(err, "invalid events provided") + } + if err := r.teestore.StoreEvents(ctx, gatewayEvents); err != nil { + return nil, errors.Wrap(err, "error storing events") + } + return &graphqlbackend.EmptyResponse{}, nil } diff --git a/cmd/frontend/internal/telemetry/resolvers/telemetrygateway.go b/cmd/frontend/internal/telemetry/resolvers/telemetrygateway.go new file mode 100644 index 00000000000..e129a38eb81 --- /dev/null +++ b/cmd/frontend/internal/telemetry/resolvers/telemetrygateway.go @@ -0,0 +1,97 @@ +package resolvers + +import ( + "context" + "encoding/json" + "time" + + "google.golang.org/protobuf/types/known/structpb" + + "github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend" + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" + "github.com/sourcegraph/sourcegraph/internal/version" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +func newTelemetryGatewayEvents( + ctx context.Context, + now time.Time, + newUUID func() string, + events []graphqlbackend.TelemetryEventInput, +) ([]*telemetrygatewayv1.Event, error) { + gatewayEvents := make([]*telemetrygatewayv1.Event, len(events)) + for i, e := range events { + event := telemetrygatewayv1.NewEventWithDefaults(ctx, now, newUUID) + + event.Feature = e.Feature + event.Action = e.Action + + // Parse private metadata + var privateMetadata *structpb.Struct + if e.Parameters.PrivateMetadata != nil && len(*e.Parameters.PrivateMetadata) > 0 { + data, err := e.Parameters.PrivateMetadata.MarshalJSON() + if err != nil { + return nil, errors.Wrapf(err, "error marshaling privateMetadata for event %d", i) + } + var privateData map[string]any + if err := json.Unmarshal(data, &privateData); err != nil { + return nil, errors.Wrapf(err, "error unmarshaling privateMetadata for event %d", i) + } + privateMetadata, err = structpb.NewStruct(privateData) + if err != nil { + return nil, errors.Wrapf(err, "error converting privateMetadata to protobuf for event %d", i) + } + } + + // Configure parameters + event.Parameters = &telemetrygatewayv1.EventParameters{ + Version: e.Parameters.Version, + Metadata: func() map[string]int64 { + if e.Parameters.Metadata == nil || len(*e.Parameters.Metadata) == 0 { + return nil + } + metadata := make(map[string]int64, len(*e.Parameters.Metadata)) + for _, kv := range *e.Parameters.Metadata { + metadata[kv.Key] = int64(kv.Value) + } + return metadata + }(), + PrivateMetadata: privateMetadata, + BillingMetadata: func() *telemetrygatewayv1.EventBillingMetadata { + if e.Parameters.BillingMetadata == nil { + return nil + } + return &telemetrygatewayv1.EventBillingMetadata{ + Product: e.Parameters.BillingMetadata.Product, + Category: e.Parameters.BillingMetadata.Category, + } + }(), + } + event.Source = &telemetrygatewayv1.EventSource{ + Server: &telemetrygatewayv1.EventSource_Server{ + Version: version.Version(), + }, + Client: &telemetrygatewayv1.EventSource_Client{ + Name: e.Source.Client, + Version: e.Source.ClientVersion, + }, + } + + if e.MarketingTracking != nil { + event.MarketingTracking = &telemetrygatewayv1.EventMarketingTracking{ + Url: e.MarketingTracking.Url, + FirstSourceUrl: e.MarketingTracking.FirstSourceURL, + CohortId: e.MarketingTracking.CohortID, + Referrer: e.MarketingTracking.Referrer, + LastSourceUrl: e.MarketingTracking.LastSourceURL, + DeviceSessionId: e.MarketingTracking.DeviceSessionID, + SessionReferrer: e.MarketingTracking.SessionReferrer, + SessionFirstUrl: e.MarketingTracking.SessionFirstURL, + } + } + + // Done! + gatewayEvents[i] = event + } + return gatewayEvents, nil +} diff --git a/cmd/frontend/internal/telemetry/resolvers/telemetrygateway_test.go b/cmd/frontend/internal/telemetry/resolvers/telemetrygateway_test.go new file mode 100644 index 00000000000..c83c1bf965f --- /dev/null +++ b/cmd/frontend/internal/telemetry/resolvers/telemetrygateway_test.go @@ -0,0 +1,167 @@ +package resolvers + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/hexops/autogold/v2" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend" + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/lib/pointers" +) + +func TestNewTelemetryGatewayEvents(t *testing.T) { + staticTime, err := time.Parse(time.RFC3339, "2023-02-24T14:48:30Z") + require.NoError(t, err) + + for _, tc := range []struct { + name string + ctx context.Context + event graphqlbackend.TelemetryEventInput + expect autogold.Value + }{ + { + name: "basic", + ctx: context.Background(), + event: graphqlbackend.TelemetryEventInput{ + Feature: "Feature", + Action: "Example", + }, + expect: autogold.Expect(`{ + "action": "Example", + "feature": "Feature", + "id": "basic", + "parameters": {}, + "source": { + "client": {}, + "server": { + "version": "0.0.0+dev" + } + }, + "timestamp": "2023-02-24T14:48:30Z" +}`), + }, + { + name: "with anonymous user", + ctx: actor.WithActor(context.Background(), actor.FromAnonymousUser("1234")), + event: graphqlbackend.TelemetryEventInput{ + Feature: "Feature", + Action: "Example", + }, + expect: autogold.Expect(`{ + "action": "Example", + "feature": "Feature", + "id": "with anonymous user", + "parameters": {}, + "source": { + "client": {}, + "server": { + "version": "0.0.0+dev" + } + }, + "timestamp": "2023-02-24T14:48:30Z", + "user": { + "anonymousUserId": "1234" + } +}`), + }, + { + name: "with authenticated user", + ctx: actor.WithActor(context.Background(), actor.FromMockUser(1234)), + event: graphqlbackend.TelemetryEventInput{ + Feature: "Feature", + Action: "Example", + }, + expect: autogold.Expect(`{ + "action": "Example", + "feature": "Feature", + "id": "with authenticated user", + "parameters": {}, + "source": { + "client": {}, + "server": { + "version": "0.0.0+dev" + } + }, + "timestamp": "2023-02-24T14:48:30Z", + "user": { + "userId": "1234" + } +}`), + }, + { + name: "with parameters", + ctx: context.Background(), + event: graphqlbackend.TelemetryEventInput{ + Feature: "Feature", + Action: "Example", + Parameters: graphqlbackend.TelemetryEventParametersInput{ + Version: 0, + Metadata: &[]graphqlbackend.TelemetryEventMetadataInput{ + { + Key: "metadata", + Value: 123, + }, + }, + PrivateMetadata: pointers.Ptr(json.RawMessage(`{"private": "super-sensitive"}`)), + BillingMetadata: &graphqlbackend.TelemetryEventBillingMetadataInput{ + Product: "Product", + Category: "Category", + }, + }, + }, + expect: autogold.Expect(`{ + "action": "Example", + "feature": "Feature", + "id": "with parameters", + "parameters": { + "billingMetadata": { + "category": "Category", + "product": "Product" + }, + "metadata": { + "metadata": "123" + }, + "privateMetadata": { + "private": "super-sensitive" + } + }, + "source": { + "client": {}, + "server": { + "version": "0.0.0+dev" + } + }, + "timestamp": "2023-02-24T14:48:30Z" +}`), + }, + } { + t.Run(tc.name, func(t *testing.T) { + got, err := newTelemetryGatewayEvents(tc.ctx, + staticTime, + func() string { return tc.name }, + []graphqlbackend.TelemetryEventInput{ + tc.event, + }) + require.NoError(t, err) + require.Len(t, got, 1) + + protodata, err := protojson.Marshal(got[0]) + require.NoError(t, err) + + // Protojson output isn't stable by injecting randomized whitespace, + // so we re-marshal it to stabilize the output for golden tests. + // https://github.com/golang/protobuf/issues/1082 + var gotJSON map[string]any + require.NoError(t, json.Unmarshal(protodata, &gotJSON)) + jsondata, err := json.MarshalIndent(gotJSON, "", " ") + require.NoError(t, err) + tc.expect.Equal(t, string(jsondata)) + }) + } +} diff --git a/internal/database/BUILD.bazel b/internal/database/BUILD.bazel index 9f8d5cd4f32..828a05d8b87 100644 --- a/internal/database/BUILD.bazel +++ b/internal/database/BUILD.bazel @@ -80,6 +80,7 @@ go_library( "sub_repo_perms_store.go", "survey_responses.go", "teams.go", + "telemetry_export_store.go", "temporary_settings.go", "user_credentials.go", "user_emails.go", @@ -141,6 +142,7 @@ go_library( "//internal/rbac/types", "//internal/search/result", "//internal/security", + "//internal/telemetrygateway/v1:telemetrygateway", "//internal/temporarysettings", "//internal/timeutil", "//internal/trace", @@ -249,6 +251,7 @@ go_test( "sub_repo_perms_store_test.go", "survey_responses_test.go", "teams_test.go", + "telemetry_export_store_test.go", "temporary_settings_test.go", "user_credentials_test.go", "user_emails_test.go", @@ -295,6 +298,7 @@ go_test( "//internal/perforce", "//internal/rbac/types", "//internal/search/result", + "//internal/telemetrygateway/v1:telemetrygateway", "//internal/temporarysettings", "//internal/timeutil", "//internal/trace", @@ -322,7 +326,10 @@ go_test( "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tidwall_gjson//:gjson", + "@org_golang_google_protobuf//proto", "@org_golang_google_protobuf//testing/protocmp", + "@org_golang_google_protobuf//types/known/structpb", + "@org_golang_google_protobuf//types/known/timestamppb", "@org_golang_x_exp//maps", "@org_golang_x_exp//slices", "@org_golang_x_sync//errgroup", diff --git a/internal/database/database.go b/internal/database/database.go index 635e6ebb477..ea791698f1d 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -64,6 +64,7 @@ type DB interface { Settings() SettingsStore SubRepoPerms() SubRepoPermsStore TemporarySettings() TemporarySettingsStore + TelemetryEventsExportQueue() TelemetryEventsExportQueueStore UserCredentials(encryption.Key) UserCredentialsStore UserEmails() UserEmailsStore UserExternalAccounts() UserExternalAccountsStore @@ -301,6 +302,13 @@ func (d *db) TemporarySettings() TemporarySettingsStore { return TemporarySettingsWith(d.Store) } +func (d *db) TelemetryEventsExportQueue() TelemetryEventsExportQueueStore { + return TelemetryEventsExportQueueWith( + d.logger.Scoped("telemetry_events", "telemetry events export queue store"), + d.Store, + ) +} + func (d *db) UserCredentials(key encryption.Key) UserCredentialsStore { return UserCredentialsWith(d.logger, d.Store, key) } diff --git a/internal/database/dbmocks/mocks_temp.go b/internal/database/dbmocks/mocks_temp.go index 7f07a267e49..c44d881d550 100644 --- a/internal/database/dbmocks/mocks_temp.go +++ b/internal/database/dbmocks/mocks_temp.go @@ -15061,6 +15061,10 @@ type MockDB struct { // TeamsFunc is an instance of a mock function object controlling the // behavior of the method Teams. TeamsFunc *DBTeamsFunc + // TelemetryEventsExportQueueFunc is an instance of a mock function + // object controlling the behavior of the method + // TelemetryEventsExportQueue. + TelemetryEventsExportQueueFunc *DBTelemetryEventsExportQueueFunc // TemporarySettingsFunc is an instance of a mock function object // controlling the behavior of the method TemporarySettings. TemporarySettingsFunc *DBTemporarySettingsFunc @@ -15372,6 +15376,11 @@ func NewMockDB() *MockDB { return }, }, + TelemetryEventsExportQueueFunc: &DBTelemetryEventsExportQueueFunc{ + defaultHook: func() (r0 database.TelemetryEventsExportQueueStore) { + return + }, + }, TemporarySettingsFunc: &DBTemporarySettingsFunc{ defaultHook: func() (r0 database.TemporarySettingsStore) { return @@ -15704,6 +15713,11 @@ func NewStrictMockDB() *MockDB { panic("unexpected invocation of MockDB.Teams") }, }, + TelemetryEventsExportQueueFunc: &DBTelemetryEventsExportQueueFunc{ + defaultHook: func() database.TelemetryEventsExportQueueStore { + panic("unexpected invocation of MockDB.TelemetryEventsExportQueue") + }, + }, TemporarySettingsFunc: &DBTemporarySettingsFunc{ defaultHook: func() database.TemporarySettingsStore { panic("unexpected invocation of MockDB.TemporarySettings") @@ -15926,6 +15940,9 @@ func NewMockDBFrom(i database.DB) *MockDB { TeamsFunc: &DBTeamsFunc{ defaultHook: i.Teams, }, + TelemetryEventsExportQueueFunc: &DBTelemetryEventsExportQueueFunc{ + defaultHook: i.TelemetryEventsExportQueue, + }, TemporarySettingsFunc: &DBTemporarySettingsFunc{ defaultHook: i.TemporarySettings, }, @@ -21443,6 +21460,106 @@ func (c DBTeamsFuncCall) Results() []interface{} { return []interface{}{c.Result0} } +// DBTelemetryEventsExportQueueFunc describes the behavior when the +// TelemetryEventsExportQueue method of the parent MockDB instance is +// invoked. +type DBTelemetryEventsExportQueueFunc struct { + defaultHook func() database.TelemetryEventsExportQueueStore + hooks []func() database.TelemetryEventsExportQueueStore + history []DBTelemetryEventsExportQueueFuncCall + mutex sync.Mutex +} + +// TelemetryEventsExportQueue delegates to the next hook function in the +// queue and stores the parameter and result values of this invocation. +func (m *MockDB) TelemetryEventsExportQueue() database.TelemetryEventsExportQueueStore { + r0 := m.TelemetryEventsExportQueueFunc.nextHook()() + m.TelemetryEventsExportQueueFunc.appendCall(DBTelemetryEventsExportQueueFuncCall{r0}) + return r0 +} + +// SetDefaultHook sets function that is called when the +// TelemetryEventsExportQueue method of the parent MockDB instance is +// invoked and the hook queue is empty. +func (f *DBTelemetryEventsExportQueueFunc) SetDefaultHook(hook func() database.TelemetryEventsExportQueueStore) { + f.defaultHook = hook +} + +// PushHook adds a function to the end of hook queue. Each invocation of the +// TelemetryEventsExportQueue method of the parent MockDB instance invokes +// the hook at the front of the queue and discards it. After the queue is +// empty, the default hook function is invoked for any future action. +func (f *DBTelemetryEventsExportQueueFunc) PushHook(hook func() database.TelemetryEventsExportQueueStore) { + f.mutex.Lock() + f.hooks = append(f.hooks, hook) + f.mutex.Unlock() +} + +// SetDefaultReturn calls SetDefaultHook with a function that returns the +// given values. +func (f *DBTelemetryEventsExportQueueFunc) SetDefaultReturn(r0 database.TelemetryEventsExportQueueStore) { + f.SetDefaultHook(func() database.TelemetryEventsExportQueueStore { + return r0 + }) +} + +// PushReturn calls PushHook with a function that returns the given values. +func (f *DBTelemetryEventsExportQueueFunc) PushReturn(r0 database.TelemetryEventsExportQueueStore) { + f.PushHook(func() database.TelemetryEventsExportQueueStore { + return r0 + }) +} + +func (f *DBTelemetryEventsExportQueueFunc) nextHook() func() database.TelemetryEventsExportQueueStore { + f.mutex.Lock() + defer f.mutex.Unlock() + + if len(f.hooks) == 0 { + return f.defaultHook + } + + hook := f.hooks[0] + f.hooks = f.hooks[1:] + return hook +} + +func (f *DBTelemetryEventsExportQueueFunc) appendCall(r0 DBTelemetryEventsExportQueueFuncCall) { + f.mutex.Lock() + f.history = append(f.history, r0) + f.mutex.Unlock() +} + +// History returns a sequence of DBTelemetryEventsExportQueueFuncCall +// objects describing the invocations of this function. +func (f *DBTelemetryEventsExportQueueFunc) History() []DBTelemetryEventsExportQueueFuncCall { + f.mutex.Lock() + history := make([]DBTelemetryEventsExportQueueFuncCall, len(f.history)) + copy(history, f.history) + f.mutex.Unlock() + + return history +} + +// DBTelemetryEventsExportQueueFuncCall is an object that describes an +// invocation of method TelemetryEventsExportQueue on an instance of MockDB. +type DBTelemetryEventsExportQueueFuncCall struct { + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 database.TelemetryEventsExportQueueStore +} + +// Args returns an interface slice containing the arguments of this +// invocation. +func (c DBTelemetryEventsExportQueueFuncCall) Args() []interface{} { + return []interface{}{} +} + +// Results returns an interface slice containing the results of this +// invocation. +func (c DBTelemetryEventsExportQueueFuncCall) Results() []interface{} { + return []interface{}{c.Result0} +} + // DBTemporarySettingsFunc describes the behavior when the TemporarySettings // method of the parent MockDB instance is invoked. type DBTemporarySettingsFunc struct { diff --git a/internal/database/event_logs.go b/internal/database/event_logs.go index cc0229b67d9..76f84e03985 100644 --- a/internal/database/event_logs.go +++ b/internal/database/event_logs.go @@ -12,6 +12,7 @@ import ( "github.com/gofrs/uuid" "github.com/keegancsmith/sqlf" "github.com/lib/pq" + "go.opentelemetry.io/otel/attribute" sgactor "github.com/sourcegraph/sourcegraph/internal/actor" "github.com/sourcegraph/sourcegraph/internal/conf" @@ -22,6 +23,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/featureflag" "github.com/sourcegraph/sourcegraph/internal/jsonc" "github.com/sourcegraph/sourcegraph/internal/timeutil" + "github.com/sourcegraph/sourcegraph/internal/trace" "github.com/sourcegraph/sourcegraph/internal/types" "github.com/sourcegraph/sourcegraph/internal/version" "github.com/sourcegraph/sourcegraph/lib/errors" @@ -216,6 +218,11 @@ func (l *eventLogStore) Insert(ctx context.Context, e *Event) error { const EventLogsSourcegraphOperatorKey = "sourcegraph_operator" func (l *eventLogStore) BulkInsert(ctx context.Context, events []*Event) error { + var tr trace.Trace + tr, ctx = trace.New(ctx, "eventLogs.BulkInsert", + attribute.Int("events", len(events))) + defer tr.End() + coalesce := func(v json.RawMessage) json.RawMessage { if v != nil { return v diff --git a/internal/database/schema.json b/internal/database/schema.json index 1e97020d55e..2f35679840e 100755 --- a/internal/database/schema.json +++ b/internal/database/schema.json @@ -26567,6 +26567,78 @@ ], "Triggers": [] }, + { + "Name": "telemetry_events_export_queue", + "Comment": "", + "Columns": [ + { + "Name": "exported_at", + "Index": 4, + "TypeName": "timestamp with time zone", + "IsNullable": true, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "id", + "Index": 1, + "TypeName": "text", + "IsNullable": false, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "payload_pb", + "Index": 3, + "TypeName": "bytea", + "IsNullable": false, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "timestamp", + "Index": 2, + "TypeName": "timestamp with time zone", + "IsNullable": false, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + } + ], + "Indexes": [ + { + "Name": "telemetry_events_export_queue_pkey", + "IsPrimaryKey": true, + "IsUnique": true, + "IsExclusion": false, + "IsDeferrable": false, + "IndexDefinition": "CREATE UNIQUE INDEX telemetry_events_export_queue_pkey ON telemetry_events_export_queue USING btree (id)", + "ConstraintType": "p", + "ConstraintDefinition": "PRIMARY KEY (id)" + } + ], + "Constraints": null, + "Triggers": [] + }, { "Name": "temporary_settings", "Comment": "Stores per-user temporary settings used in the UI, for example, which modals have been dimissed or what theme is preferred.", diff --git a/internal/database/schema.md b/internal/database/schema.md index f03402246ed..1e4e096a99d 100755 --- a/internal/database/schema.md +++ b/internal/database/schema.md @@ -4064,6 +4064,19 @@ Referenced by: ``` +# Table "public.telemetry_events_export_queue" +``` + Column | Type | Collation | Nullable | Default +-------------+--------------------------+-----------+----------+--------- + id | text | | not null | + timestamp | timestamp with time zone | | not null | + payload_pb | bytea | | not null | + exported_at | timestamp with time zone | | | +Indexes: + "telemetry_events_export_queue_pkey" PRIMARY KEY, btree (id) + +``` + # Table "public.temporary_settings" ``` Column | Type | Collation | Nullable | Default diff --git a/internal/database/telemetry_export_store.go b/internal/database/telemetry_export_store.go new file mode 100644 index 00000000000..b37fc51bf6d --- /dev/null +++ b/internal/database/telemetry_export_store.go @@ -0,0 +1,190 @@ +package database + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "google.golang.org/protobuf/proto" + + "github.com/lib/pq" + "github.com/sourcegraph/log" + + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/batch" + "github.com/sourcegraph/sourcegraph/internal/featureflag" + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" + "github.com/sourcegraph/sourcegraph/internal/trace" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +// FeatureFlagTelemetryExport enables telemetry export by allowing events to be +// queued for export via (TelemetryEventsExportQueueStore).QueueForExport +const FeatureFlagTelemetryExport = "telemetry-export" + +type TelemetryEventsExportQueueStore interface { + basestore.ShareableStore + + // QueueForExport caches a set of events for later export. It is currently + // feature-flagged, such that if the flag is not enabled for the given + // context, we do not cache the event for export. + QueueForExport(context.Context, []*telemetrygatewayv1.Event) error + + // ListForExport returns the cached events that should be exported next. All + // events returned should be exported. + // + // 🚨 SECURITY: Potentially sensitive parts of the payload are retained at + // this stage. The caller is responsible for ensuring sensitive data is + // stripped. + ListForExport(ctx context.Context, limit int) ([]*telemetrygatewayv1.Event, error) + + // MarkAsExported marks all events in the set of IDs as exported. + MarkAsExported(ctx context.Context, eventIDs []string) error + + // DeletedExported deletes all events exported before the given timestamp, + // returning the number of affected events. + DeletedExported(ctx context.Context, before time.Time) (int64, error) +} + +func TelemetryEventsExportQueueWith(logger log.Logger, other basestore.ShareableStore) TelemetryEventsExportQueueStore { + return &telemetryEventsExportQueueStore{ + logger: logger, + ShareableStore: other, + } +} + +type telemetryEventsExportQueueStore struct { + logger log.Logger + basestore.ShareableStore +} + +// See interface docstring. +func (s *telemetryEventsExportQueueStore) QueueForExport(ctx context.Context, events []*telemetrygatewayv1.Event) error { + var tr trace.Trace + tr, ctx = trace.New(ctx, "telemetryevents.QueueForExport", + attribute.Int("events", len(events))) + defer tr.End() + + logger := trace.Logger(ctx, s.logger) + + if flags := featureflag.FromContext(ctx); flags == nil || !flags.GetBoolOr(FeatureFlagTelemetryExport, false) { + tr.SetAttributes(attribute.Bool("enabled", false)) + return nil // no-op + } else { + tr.SetAttributes(attribute.Bool("enabled", true)) + } + + if len(events) == 0 { + return nil + } + return batch.InsertValues(ctx, + s.Handle(), + "telemetry_events_export_queue", + batch.MaxNumPostgresParameters, + []string{ + "id", + "timestamp", + "payload_pb", + }, + insertChannel(logger, events)) +} + +func insertChannel(logger log.Logger, events []*telemetrygatewayv1.Event) <-chan []any { + ch := make(chan []any, len(events)) + + go func() { + defer close(ch) + + for _, ev := range events { + payloadPB, err := proto.Marshal(ev) + if err != nil { + logger.Error("failed to marshal telemetry event", + log.String("event.feature", ev.GetFeature()), + log.String("event.action", ev.GetAction()), + log.String("event.source.client.name", ev.GetSource().GetClient().GetName()), + log.String("event.source.client.version", ev.GetSource().GetClient().GetVersion()), + log.Error(err)) + continue + } + ch <- []any{ + ev.Id, // id + ev.Timestamp.AsTime(), // timestamp + payloadPB, // payload_pb + } + } + }() + + return ch +} + +// See interface docstring. +func (s *telemetryEventsExportQueueStore) ListForExport(ctx context.Context, limit int) ([]*telemetrygatewayv1.Event, error) { + var tr trace.Trace + tr, ctx = trace.New(ctx, "telemetryevents.ListForExport", + attribute.Int("limit", limit)) + defer tr.End() + + logger := trace.Logger(ctx, s.logger) + + rows, err := s.ShareableStore.Handle().QueryContext(ctx, ` + SELECT id, payload_pb + FROM telemetry_events_export_queue + WHERE exported_at IS NULL + ORDER BY timestamp ASC + LIMIT $1`, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + events := make([]*telemetrygatewayv1.Event, 0, limit) + for rows.Next() { + var id string + var payloadPB []byte + err := rows.Scan(&id, &payloadPB) + if err != nil { + return nil, err + } + var event telemetrygatewayv1.Event + if err := proto.Unmarshal(payloadPB, &event); err != nil { + tr.RecordError(err) + logger.Error("failed to unmarshal telemetry event payload", + log.String("id", id), + log.Error(err)) + // Not fatal, just ignore this event for now, leaving it in DB for + // investigation. + continue + } + events = append(events, &event) + } + tr.SetAttributes(attribute.Int("events", len(events))) + if err := rows.Err(); err != nil { + return nil, err + } + return events, nil +} + +// See interface docstring. +func (s *telemetryEventsExportQueueStore) MarkAsExported(ctx context.Context, eventIDs []string) error { + if _, err := s.ShareableStore.Handle().ExecContext(ctx, ` + UPDATE telemetry_events_export_queue + SET exported_at = NOW() + WHERE id = ANY($1); + `, pq.Array(eventIDs)); err != nil { + return errors.Wrap(err, "failed to mark events as exported") + } + return nil +} + +func (s *telemetryEventsExportQueueStore) DeletedExported(ctx context.Context, before time.Time) (int64, error) { + result, err := s.ShareableStore.Handle().ExecContext(ctx, ` + DELETE FROM telemetry_events_export_queue + WHERE + exported_at IS NOT NULL + AND exported_at < $1; +`, before) + if err != nil { + return 0, errors.Wrap(err, "failed to mark events as exported") + } + return result.RowsAffected() +} diff --git a/internal/database/telemetry_export_store_test.go b/internal/database/telemetry_export_store_test.go new file mode 100644 index 00000000000..0b208abfbef --- /dev/null +++ b/internal/database/telemetry_export_store_test.go @@ -0,0 +1,101 @@ +package database + +import ( + "context" + "testing" + "time" + + "github.com/sourcegraph/log/logtest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/sourcegraph/sourcegraph/internal/database/dbtest" + "github.com/sourcegraph/sourcegraph/internal/featureflag" + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" +) + +func TestTelemetryEventsExportQueueLifecycle(t *testing.T) { + // Context with FF enabled. + ff := featureflag.NewMemoryStore( + nil, nil, map[string]bool{FeatureFlagTelemetryExport: true}) + ctx := featureflag.WithFlags(context.Background(), ff) + + logger := logtest.Scoped(t) + db := NewDB(logger, dbtest.NewDB(logger, t)) + + store := TelemetryEventsExportQueueWith(logger, db) + + events := []*telemetrygatewayv1.Event{{ + Id: "1", + Feature: "Feature", + Action: "View", + Timestamp: timestamppb.New(time.Date(2022, 11, 3, 1, 0, 0, 0, time.UTC)), + Parameters: &telemetrygatewayv1.EventParameters{ + Metadata: map[string]int64{"public": 1}, + PrivateMetadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{"sensitive": structpb.NewStringValue("sensitive")}, + }, + }, + }, { + Id: "2", + Feature: "Feature", + Action: "Click", + Timestamp: timestamppb.New(time.Date(2022, 11, 3, 2, 0, 0, 0, time.UTC)), + }, { + Id: "3", + Feature: "Feature", + Action: "Show", + Timestamp: timestamppb.New(time.Date(2022, 11, 3, 3, 0, 0, 0, time.UTC)), + }} + eventsToExport := []string{"1", "2"} + + t.Run("feature flag off", func(t *testing.T) { + require.NoError(t, store.QueueForExport(context.Background(), events)) + export, err := store.ListForExport(ctx, 100) + require.NoError(t, err) + assert.Len(t, export, 0) + }) + + t.Run("QueueForExport", func(t *testing.T) { + require.NoError(t, store.QueueForExport(ctx, events)) + }) + + t.Run("ListForExport", func(t *testing.T) { + limit := len(events) - 1 + export, err := store.ListForExport(ctx, limit) + require.NoError(t, err) + assert.Len(t, export, limit) + + // Check integrity of first item + original, err := proto.Marshal(events[0]) + require.NoError(t, err) + got, err := proto.Marshal(export[0]) + require.NoError(t, err) + require.Equal(t, string(original), string(got)) + }) + + t.Run("before export: DeleteExported", func(t *testing.T) { + affected, err := store.DeletedExported(ctx, time.Now()) + require.NoError(t, err) + assert.Zero(t, affected) + }) + + t.Run("MarkAsExported", func(t *testing.T) { + require.NoError(t, store.MarkAsExported(ctx, eventsToExport)) + }) + + t.Run("after export: QueueForExport", func(t *testing.T) { + export, err := store.ListForExport(ctx, len(events)) + require.NoError(t, err) + assert.Len(t, export, len(events)-len(eventsToExport)) + }) + + t.Run("after export: DeleteExported", func(t *testing.T) { + affected, err := store.DeletedExported(ctx, time.Now()) + require.NoError(t, err) + assert.Equal(t, int(affected), len(eventsToExport)) + }) +} diff --git a/internal/featureflag/cache.go b/internal/featureflag/cache.go index 5c7de508905..017b7e5b09e 100644 --- a/internal/featureflag/cache.go +++ b/internal/featureflag/cache.go @@ -40,7 +40,7 @@ func setEvaluatedFlagToCache(a *actor.Actor, flagName string, value bool) { return } - evalStore.HSet(getFlagCacheKey(flagName), visitorID, strconv.FormatBool(value)) + _ = evalStore.HSet(getFlagCacheKey(flagName), visitorID, strconv.FormatBool(value)) } func getVisitorIDForActor(a *actor.Actor) (string, error) { diff --git a/internal/telemetry/BUILD.bazel b/internal/telemetry/BUILD.bazel new file mode 100644 index 00000000000..65f9a33e934 --- /dev/null +++ b/internal/telemetry/BUILD.bazel @@ -0,0 +1,43 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("//dev:go_defs.bzl", "go_test") + +go_library( + name = "telemetry", + srcs = [ + "billing_categories.go", + "billing_products.go", + "events.go", + "telemetry.go", + "telemetrygateway.go", + ], + importpath = "github.com/sourcegraph/sourcegraph/internal/telemetry", + visibility = ["//:__subpackages__"], + deps = [ + "//internal/telemetry/teestore", + "//internal/telemetrygateway/v1:telemetrygateway", + "//internal/version", + "@org_golang_google_protobuf//types/known/structpb", + ], +) + +go_test( + name = "telemetry_test", + srcs = [ + "telemetry_test.go", + "telemetrygateway_test.go", + ], + embed = [":telemetry"], + tags = ["requires-network"], + deps = [ + "//internal/actor", + "//internal/database", + "//internal/database/dbtest", + "//internal/featureflag", + "//internal/telemetry/teestore", + "@com_github_hexops_autogold_v2//:autogold", + "@com_github_sourcegraph_log//logtest", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_protobuf//encoding/protojson", + ], +) diff --git a/internal/telemetry/billing_categories.go b/internal/telemetry/billing_categories.go new file mode 100644 index 00000000000..fc1ab292a2b --- /dev/null +++ b/internal/telemetry/billing_categories.go @@ -0,0 +1,12 @@ +package telemetry + +// billingCategory is used in EventBillingMetadata to identify a product. +// +// This is a private type, requiring the values to be declared in-package +// and preventing strings from being cast to this type. +type billingCategory string + +// All billing category IDs in Sourcegraph's Go services. +const ( + BillingCategoryExample billingCategory = "EXAMPLE" +) diff --git a/internal/telemetry/billing_products.go b/internal/telemetry/billing_products.go new file mode 100644 index 00000000000..a705918b1be --- /dev/null +++ b/internal/telemetry/billing_products.go @@ -0,0 +1,12 @@ +package telemetry + +// billingProduct is used in EventBillingMetadata to identify a product. +// +// This is a private type, requiring the values to be declared in-package +// and preventing strings from being cast to this type. +type billingProduct string + +// All billing product IDs in Sourcegraph's Go services. +const ( + BillingProductExample billingProduct = "EXAMPLE" +) diff --git a/internal/telemetry/events.go b/internal/telemetry/events.go new file mode 100644 index 00000000000..fcbafb80830 --- /dev/null +++ b/internal/telemetry/events.go @@ -0,0 +1,24 @@ +package telemetry + +// eventFeature defines the feature associated with an event. Values should +// be in camelCase, e.g. 'myFeature' +// +// This is a private type, requiring the values to be declared in-package +// and preventing strings from being cast to this type. +type eventFeature string + +// All event names in Sourcegraph's Go services. +const ( + FeatureExample eventFeature = "exampleFeature" +) + +// eventAction defines the action associated with an event. Values should +// be in camelCase, e.g. 'myAction' +// +// This is a private type, requiring the values to be declared in-package +// and preventing strings from being cast to this type. +type eventAction string + +const ( + ActionExample eventAction = "exampleAction" +) diff --git a/internal/telemetry/teestore/BUILD.bazel b/internal/telemetry/teestore/BUILD.bazel new file mode 100644 index 00000000000..5895bbb925c --- /dev/null +++ b/internal/telemetry/teestore/BUILD.bazel @@ -0,0 +1,31 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("//dev:go_defs.bzl", "go_test") + +go_library( + name = "teestore", + srcs = ["teestore.go"], + importpath = "github.com/sourcegraph/sourcegraph/internal/telemetry/teestore", + visibility = ["//:__subpackages__"], + deps = [ + "//internal/database", + "//internal/featureflag", + "//internal/telemetrygateway/v1:telemetrygateway", + "//lib/errors", + "//lib/pointers", + "@com_github_sourcegraph_conc//pool", + ], +) + +go_test( + name = "teestore_test", + srcs = ["teestore_test.go"], + embed = [":teestore"], + deps = [ + "//internal/telemetrygateway/v1:telemetrygateway", + "//lib/pointers", + "@com_github_hexops_autogold_v2//:autogold", + "@com_github_stretchr_testify//require", + "@org_golang_google_protobuf//types/known/structpb", + "@org_golang_google_protobuf//types/known/timestamppb", + ], +) diff --git a/internal/telemetry/teestore/teestore.go b/internal/telemetry/teestore/teestore.go new file mode 100644 index 00000000000..7325b16b822 --- /dev/null +++ b/internal/telemetry/teestore/teestore.go @@ -0,0 +1,151 @@ +package teestore + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/sourcegraph/conc/pool" + + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/featureflag" + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" + "github.com/sourcegraph/sourcegraph/lib/errors" + "github.com/sourcegraph/sourcegraph/lib/pointers" +) + +// Store tees events into both the event_logs table and the new telemetry export +// queue, translating the message into the existing event_logs format on a +// best-effort basis. +type Store interface { + StoreEvents(context.Context, []*telemetrygatewayv1.Event) error +} + +type store struct { + exportQueue database.TelemetryEventsExportQueueStore + eventLogs database.EventLogStore +} + +func NewStore(exportQueue database.TelemetryEventsExportQueueStore, eventLogs database.EventLogStore) Store { + return &store{exportQueue, eventLogs} +} + +func (s *store) StoreEvents(ctx context.Context, events []*telemetrygatewayv1.Event) error { + // Write to both stores at the same time. + wg := pool.New().WithErrors() + wg.Go(func() error { + if err := s.exportQueue.QueueForExport(ctx, events); err != nil { + return errors.Wrap(err, "bulk inserting telemetry events") + } + return nil + }) + wg.Go(func() error { + if err := s.eventLogs.BulkInsert(ctx, toEventLogs(time.Now, events)); err != nil { + return errors.Wrap(err, "bulk inserting events logs") + } + return nil + }) + return wg.Wait() +} + +func toEventLogs(now func() time.Time, telemetryEvents []*telemetrygatewayv1.Event) []*database.Event { + eventLogs := make([]*database.Event, len(telemetryEvents)) + for i, e := range telemetryEvents { + // Note that all generated proto getters are nil-safe, so use those to + // get fields rather than accessing fields directly. + eventLogs[i] = &database.Event{ + ID: 0, // not required on insert + InsertID: nil, // not required on insert + + // Identifiers + Name: fmt.Sprintf("%s%s.%s", maybeSourceEventNamePrefix(e.GetSource()), e.GetFeature(), e.GetAction()), + Timestamp: func() time.Time { + if e.GetTimestamp() == nil { + return now() + } + return e.GetTimestamp().AsTime() + }(), + + // User + UserID: uint32(e.GetUser().GetUserId()), + AnonymousUserID: e.GetUser().GetAnonymousUserId(), + + // GetParameters.Metadata + PublicArgument: func() json.RawMessage { + md := e.GetParameters().GetMetadata() + if len(md) == 0 { + return nil + } + data, err := json.Marshal(md) + if err != nil { + data, _ = json.Marshal(map[string]string{"marshal.error": err.Error()}) + } + return data + }(), + + // GetParameters.PrivateMetadata + Argument: func() json.RawMessage { + md := e.GetParameters().GetPrivateMetadata().AsMap() + if len(md) == 0 { + return nil + } + data, err := json.Marshal(md) + if err != nil { + data, _ = json.Marshal(map[string]string{"marshal.error": err.Error()}) + } + return data + }(), + + // Parameters.BillingMetadata + BillingProductCategory: pointers.NonZeroPtr(e.GetParameters().GetBillingMetadata().GetCategory()), + BillingEventID: nil, // No equivalents in telemetry events + + // Source.Client + Source: func() string { + if source := e.GetSource().GetClient().GetName(); source != "" { + return source + } + return "BACKEND" // must be non-empty + }(), + Client: func() *string { + if c := e.GetSource().GetClient(); c != nil { + return pointers.Ptr(fmt.Sprintf("%s:%s", + c.GetName(), c.GetVersion())) + } + return nil + }(), + + // Source.Server + Version: e.GetSource().GetServer().GetVersion(), + + // MarketingTracking + URL: e.GetMarketingTracking().GetUrl(), + CohortID: pointers.NonZeroPtr(e.GetMarketingTracking().GetCohortId()), + FirstSourceURL: pointers.NonZeroPtr(e.GetMarketingTracking().GetFirstSourceUrl()), + LastSourceURL: pointers.NonZeroPtr(e.GetMarketingTracking().GetLastSourceUrl()), + Referrer: pointers.NonZeroPtr(e.GetMarketingTracking().GetReferrer()), + DeviceID: pointers.NonZeroPtr(e.GetMarketingTracking().GetDeviceSessionId()), + + // FeatureFlags + EvaluatedFlagSet: func() featureflag.EvaluatedFlagSet { + flags := e.GetFeatureFlags().GetFlags() + set := make(featureflag.EvaluatedFlagSet, len(flags)) + for k, v := range flags { + // We can expect all values to be bools for now + set[k], _ = strconv.ParseBool(v) + } + return set + }(), + } + } + return eventLogs +} + +func maybeSourceEventNamePrefix(s *telemetrygatewayv1.EventSource) string { + if name := s.GetClient().GetName(); name != "" { + return name + ":" + } + return "" +} diff --git a/internal/telemetry/teestore/teestore_test.go b/internal/telemetry/teestore/teestore_test.go new file mode 100644 index 00000000000..4fe8b34c52a --- /dev/null +++ b/internal/telemetry/teestore/teestore_test.go @@ -0,0 +1,163 @@ +package teestore + +import ( + "encoding/json" + "testing" + "time" + + "github.com/hexops/autogold/v2" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" + + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" + "github.com/sourcegraph/sourcegraph/lib/pointers" +) + +// see TestRecorderEndToEnd for tests that include teestore.Store and the database + +func TestToEventLogs(t *testing.T) { + testCases := []struct { + name string + events []*telemetrygatewayv1.Event + expectEventLogs autogold.Value + }{ + { + name: "handles all nil", + events: nil, + expectEventLogs: autogold.Expect("[]"), + }, + { + name: "handles nil entry", + events: []*telemetrygatewayv1.Event{nil}, + expectEventLogs: autogold.Expect(`[ + { + "ID": 0, + "Name": ".", + "URL": "", + "UserID": 0, + "AnonymousUserID": "", + "Argument": null, + "PublicArgument": null, + "Source": "BACKEND", + "Version": "", + "Timestamp": "2022-11-03T02:00:00Z", + "EvaluatedFlagSet": {}, + "CohortID": null, + "FirstSourceURL": null, + "LastSourceURL": null, + "Referrer": null, + "DeviceID": null, + "InsertID": null, + "Client": null, + "BillingProductCategory": null, + "BillingEventID": null + } +]`), + }, + { + name: "handles nil fields", + events: []*telemetrygatewayv1.Event{{}}, + expectEventLogs: autogold.Expect(`[ + { + "ID": 0, + "Name": ".", + "URL": "", + "UserID": 0, + "AnonymousUserID": "", + "Argument": null, + "PublicArgument": null, + "Source": "BACKEND", + "Version": "", + "Timestamp": "2022-11-03T02:00:00Z", + "EvaluatedFlagSet": {}, + "CohortID": null, + "FirstSourceURL": null, + "LastSourceURL": null, + "Referrer": null, + "DeviceID": null, + "InsertID": null, + "Client": null, + "BillingProductCategory": null, + "BillingEventID": null + } +]`), + }, + { + name: "simple event", + events: []*telemetrygatewayv1.Event{{ + Id: "1", + Timestamp: timestamppb.New(time.Date(2022, 11, 2, 1, 0, 0, 0, time.UTC)), + Feature: "CodeSearch", + Action: "Seach", + Source: &telemetrygatewayv1.EventSource{ + Client: &telemetrygatewayv1.EventSource_Client{ + Name: "VSCODE", + Version: pointers.Ptr("1.2.3"), + }, + Server: &telemetrygatewayv1.EventSource_Server{ + Version: "dev", + }, + }, + Parameters: &telemetrygatewayv1.EventParameters{ + Metadata: map[string]int64{"public": 2}, + PrivateMetadata: &structpb.Struct{Fields: map[string]*structpb.Value{ + "private": structpb.NewStringValue("sensitive-data"), + }}, + BillingMetadata: &telemetrygatewayv1.EventBillingMetadata{ + Product: "product", + Category: "category", + }, + }, + User: &telemetrygatewayv1.EventUser{ + UserId: pointers.Ptr(int64(1234)), + AnonymousUserId: pointers.Ptr("anonymous"), + }, + MarketingTracking: &telemetrygatewayv1.EventMarketingTracking{ + Url: pointers.Ptr("sourcegraph.com/foobar"), + }, + }}, + expectEventLogs: autogold.Expect(`[ + { + "ID": 0, + "Name": "VSCODE:CodeSearch.Seach", + "URL": "sourcegraph.com/foobar", + "UserID": 1234, + "AnonymousUserID": "anonymous", + "Argument": { + "private": "sensitive-data" + }, + "PublicArgument": { + "public": 2 + }, + "Source": "VSCODE", + "Version": "dev", + "Timestamp": "2022-11-02T01:00:00Z", + "EvaluatedFlagSet": {}, + "CohortID": null, + "FirstSourceURL": null, + "LastSourceURL": null, + "Referrer": null, + "DeviceID": null, + "InsertID": null, + "Client": "VSCODE:1.2.3", + "BillingProductCategory": "category", + "BillingEventID": null + } +]`), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + eventLogs := toEventLogs( + func() time.Time { return time.Date(2022, 11, 3, 2, 0, 0, 0, time.UTC) }, + tc.events) + require.Len(t, eventLogs, len(tc.events)) + // Compare JSON for ease of reading + data, err := json.MarshalIndent(eventLogs, "", " ") + require.NoError(t, err) + tc.expectEventLogs.Equal(t, string(data)) + }) + } +} diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go new file mode 100644 index 00000000000..3cba49546d0 --- /dev/null +++ b/internal/telemetry/telemetry.go @@ -0,0 +1,76 @@ +// Package telemetry implements "Telemetry V2", which supercedes event_logs +// as the mechanism for reporting telemetry from all Sourcegraph instances to +// Sourcergraph. +package telemetry + +import ( + "context" + "time" + + "github.com/sourcegraph/sourcegraph/internal/telemetry/teestore" + + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" +) + +type Event struct { + Feature eventFeature + Action eventAction + Parameters EventParameters +} + +// constString effectively requires strings to be statically defined constants. +type constString string + +// EventMetadata is secure, PII-free metadata that can be attached to events. +// Keys must be const strings. +type EventMetadata map[constString]int64 + +// EventBillingMetadata records metadata that attributes the event to product +// billing categories. +type EventBillingMetadata struct { + // Product identifier. + Product billingProduct + // Category identifier. + Category billingCategory +} + +type EventParameters struct { + // Version can be used to denote the "shape" of this event. + Version int + // Metadata is PII-free metadata about the event that we export. + Metadata EventMetadata + // PrivateMetadata is arbitrary metadata that is generally not exported. + PrivateMetadata map[string]any + // BillingMetadata contains metadata we can use for billing purposes. + BillingMetadata *EventBillingMetadata +} + +// EventRecorder is for creating and recording telemetry events in the backend +// using Telemetry V2, which exports events to Sourcergraph. +type EventRecorder struct{ teestore teestore.Store } + +// â— Experimental - do not use! +func NewEventRecorder(store teestore.Store) *EventRecorder { + return &EventRecorder{teestore: store} +} + +// Record records a single telemetry event with the context's Sourcegraph +// actor. +func (r *EventRecorder) Record(ctx context.Context, feature eventFeature, action eventAction, parameters EventParameters) error { + return r.teestore.StoreEvents(ctx, []*telemetrygatewayv1.Event{ + newTelemetryGatewayEvent(ctx, time.Now(), telemetrygatewayv1.DefaultEventIDFunc, feature, action, parameters), + }) +} + +// BatchRecord records a set of telemetry events with the context's +// Sourcegraph actor. +func (r *EventRecorder) BatchRecord(ctx context.Context, events ...Event) error { + if len(events) == 0 { + return nil + } + rawEvents := make([]*telemetrygatewayv1.Event, len(events)) + for i, e := range events { + rawEvents[i] = newTelemetryGatewayEvent(ctx, time.Now(), telemetrygatewayv1.DefaultEventIDFunc, e.Feature, e.Action, e.Parameters) + } + return r.teestore.StoreEvents(ctx, rawEvents) +} diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go new file mode 100644 index 00000000000..ac931130019 --- /dev/null +++ b/internal/telemetry/telemetry_test.go @@ -0,0 +1,67 @@ +package telemetry_test + +import ( + "context" + "testing" + + "github.com/sourcegraph/log/logtest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/dbtest" + "github.com/sourcegraph/sourcegraph/internal/featureflag" + "github.com/sourcegraph/sourcegraph/internal/telemetry" + "github.com/sourcegraph/sourcegraph/internal/telemetry/teestore" +) + +func TestRecorderEndToEnd(t *testing.T) { + var userID int32 = 123 + ctx := actor.WithActor(context.Background(), actor.FromMockUser(userID)) + + // Context with FF enabled. + ff := featureflag.NewMemoryStore( + map[string]bool{database.FeatureFlagTelemetryExport: true}, nil, nil) + ctx = featureflag.WithFlags(ctx, ff) + + logger := logtest.Scoped(t) + db := database.NewDB(logger, dbtest.NewDB(logger, t)) + + recorder := telemetry.NewEventRecorder(teestore.NewStore(db.TelemetryEventsExportQueue(), db.EventLogs())) + + wantEvents := 3 + t.Run("Record and BatchRecord", func(t *testing.T) { + assert.NoError(t, recorder.Record(ctx, + "Test", "Action1", + telemetry.EventParameters{ + Metadata: telemetry.EventMetadata{ + "metadata": 1, + }, + PrivateMetadata: map[string]any{ + "private": "sensitive", + }, + })) + assert.NoError(t, recorder.BatchRecord(ctx, + telemetry.Event{ + Feature: "Test", + Action: "Action2", + }, + telemetry.Event{ + Feature: "Test", + Action: "Action3", + })) + }) + + t.Run("tee to EventLogs", func(t *testing.T) { + eventLogs, err := db.EventLogs().ListAll(ctx, database.EventLogsListOptions{UserID: userID}) + require.NoError(t, err) + assert.Len(t, eventLogs, wantEvents) + }) + + t.Run("tee to TelemetryEvents", func(t *testing.T) { + telemetryEvents, err := db.TelemetryEventsExportQueue().ListForExport(ctx, 999) + require.NoError(t, err) + assert.Len(t, telemetryEvents, wantEvents) + }) +} diff --git a/internal/telemetry/telemetrygateway.go b/internal/telemetry/telemetrygateway.go new file mode 100644 index 00000000000..1322d2f8fe9 --- /dev/null +++ b/internal/telemetry/telemetrygateway.go @@ -0,0 +1,69 @@ +package telemetry + +import ( + "context" + "time" + + "google.golang.org/protobuf/types/known/structpb" + + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" + "github.com/sourcegraph/sourcegraph/internal/version" +) + +// newTelemetryGatewayEvent translates recording to raw events for storage and +// export. It extracts actor from context as the event user. +func newTelemetryGatewayEvent( + ctx context.Context, + now time.Time, + newUUID func() string, + feature eventFeature, + action eventAction, + parameters EventParameters, +) *telemetrygatewayv1.Event { + event := telemetrygatewayv1.NewEventWithDefaults(ctx, now, newUUID) + event.Feature = string(feature) + event.Action = string(action) + event.Source = &telemetrygatewayv1.EventSource{ + Server: &telemetrygatewayv1.EventSource_Server{ + Version: version.Version(), + }, + Client: nil, // no client, this is recorded directly in backend + } + event.Parameters = &telemetrygatewayv1.EventParameters{ + Version: int32(parameters.Version), + Metadata: func() map[string]int64 { + if len(parameters.Metadata) == 0 { + return nil + } + m := make(map[string]int64, len(parameters.Metadata)) + for k, v := range parameters.Metadata { + m[string(k)] = v + } + return m + }(), + PrivateMetadata: func() *structpb.Struct { + if len(parameters.PrivateMetadata) == 0 { + return nil + } + s, err := structpb.NewStruct(parameters.PrivateMetadata) + if err != nil { + return &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "telemetry.error": structpb.NewStringValue("failed to marshal private metadata: " + err.Error()), + }, + } + } + return s + }(), + BillingMetadata: func() *telemetrygatewayv1.EventBillingMetadata { + if parameters.BillingMetadata == nil { + return nil + } + return &telemetrygatewayv1.EventBillingMetadata{ + Product: string(parameters.BillingMetadata.Product), + Category: string(parameters.BillingMetadata.Category), + } + }(), + } + return event +} diff --git a/internal/telemetry/telemetrygateway_test.go b/internal/telemetry/telemetrygateway_test.go new file mode 100644 index 00000000000..1de8cbda2fd --- /dev/null +++ b/internal/telemetry/telemetrygateway_test.go @@ -0,0 +1,158 @@ +package telemetry + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/hexops/autogold/v2" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/sourcegraph/sourcegraph/internal/actor" +) + +func TestMakeRawEvent(t *testing.T) { + staticTime, err := time.Parse(time.RFC3339, "2023-02-24T14:48:30Z") + require.NoError(t, err) + + for _, tc := range []struct { + name string + ctx context.Context + event Event + expect autogold.Value + }{ + { + name: "basic", + ctx: context.Background(), + event: Event{ + Feature: FeatureExample, + Action: ActionExample, + }, + expect: autogold.Expect(`{ + "action": "exampleAction", + "feature": "exampleFeature", + "id": "basic", + "parameters": {}, + "source": { + "server": { + "version": "0.0.0+dev" + } + }, + "timestamp": "2023-02-24T14:48:30Z" +}`), + }, + { + name: "with anonymous user", + ctx: actor.WithActor(context.Background(), actor.FromAnonymousUser("1234")), + event: Event{ + Feature: FeatureExample, + Action: ActionExample, + }, + expect: autogold.Expect(`{ + "action": "exampleAction", + "feature": "exampleFeature", + "id": "with anonymous user", + "parameters": {}, + "source": { + "server": { + "version": "0.0.0+dev" + } + }, + "timestamp": "2023-02-24T14:48:30Z", + "user": { + "anonymousUserId": "1234" + } +}`), + }, + { + name: "with authenticated user", + ctx: actor.WithActor(context.Background(), actor.FromMockUser(1234)), + event: Event{ + Feature: FeatureExample, + Action: ActionExample, + }, + expect: autogold.Expect(`{ + "action": "exampleAction", + "feature": "exampleFeature", + "id": "with authenticated user", + "parameters": {}, + "source": { + "server": { + "version": "0.0.0+dev" + } + }, + "timestamp": "2023-02-24T14:48:30Z", + "user": { + "userId": "1234" + } +}`), + }, + { + name: "with parameters", + ctx: context.Background(), + event: Event{ + Feature: FeatureExample, + Action: ActionExample, + Parameters: EventParameters{ + Version: 0, + Metadata: EventMetadata{ + "foobar": 3, + }, + PrivateMetadata: map[string]any{ + "barbaz": "hello world!", + }, + BillingMetadata: &EventBillingMetadata{ + Product: BillingProductExample, + Category: BillingCategoryExample, + }, + }, + }, + expect: autogold.Expect(`{ + "action": "exampleAction", + "feature": "exampleFeature", + "id": "with parameters", + "parameters": { + "billingMetadata": { + "category": "EXAMPLE", + "product": "EXAMPLE" + }, + "metadata": { + "foobar": "3" + }, + "privateMetadata": { + "barbaz": "hello world!" + } + }, + "source": { + "server": { + "version": "0.0.0+dev" + } + }, + "timestamp": "2023-02-24T14:48:30Z" +}`), + }, + } { + t.Run(tc.name, func(t *testing.T) { + got := newTelemetryGatewayEvent(tc.ctx, + staticTime, + func() string { return tc.name }, + tc.event.Feature, + tc.event.Action, + tc.event.Parameters) + + protodata, err := protojson.Marshal(got) + require.NoError(t, err) + + // Protojson output isn't stable by injecting randomized whitespace, + // so we re-marshal it to stabilize the output for golden tests. + // https://github.com/golang/protobuf/issues/1082 + var gotJSON map[string]any + require.NoError(t, json.Unmarshal(protodata, &gotJSON)) + jsondata, err := json.MarshalIndent(gotJSON, "", " ") + require.NoError(t, err) + tc.expect.Equal(t, string(jsondata)) + }) + } +} diff --git a/internal/telemetrygateway/BUILD.bazel b/internal/telemetrygateway/BUILD.bazel new file mode 100644 index 00000000000..e69de29bb2d diff --git a/internal/telemetrygateway/v1/BUILD.bazel b/internal/telemetrygateway/v1/BUILD.bazel index 8b7157df4ee..e4aa411cae6 100644 --- a/internal/telemetrygateway/v1/BUILD.bazel +++ b/internal/telemetrygateway/v1/BUILD.bazel @@ -29,18 +29,30 @@ go_proto_library( go_library( name = "telemetrygateway", + srcs = ["event.go"], embed = [":v1_go_proto"], importpath = "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1", visibility = [ + "//cmd/frontend/internal/telemetry/resolvers:__pkg__", "//cmd/telemetrygateway/server:__pkg__", "//cmd/telemetrygateway/shared:__pkg__", "//internal/api:__pkg__", + "//internal/database:__pkg__", "//internal/extsvc/gitolite:__pkg__", + "//internal/telemetry:__pkg__", + "//internal/telemetry/teestore:__pkg__", "//internal/telemetrygateway:__pkg__", "//internal/telemetrygateway/gitdomain:__pkg__", "//internal/telemetrygateway/integration_tests:__pkg__", "//internal/telemetrygateway/protocol:__pkg__", ], + deps = [ + "//internal/actor", + "//internal/featureflag", + "//lib/pointers", + "@com_github_google_uuid//:uuid", + "@org_golang_google_protobuf//types/known/timestamppb", + ], ) # See https://github.com/sourcegraph/sourcegraph/issues/50032 @@ -56,3 +68,25 @@ buf_lint_test( config = "//internal:buf.yaml", targets = [":v1_proto"], ) + +go_test( + name = "telemetrygateway_test", + srcs = [ + "backcompat_test.go", + "event_test.go", + "main_test.go", + ], + data = glob(["testdata/**"]), + embed = [":telemetrygateway"], + deps = [ + "//internal/actor", + "//lib/pointers", + "@com_github_hexops_autogold_v2//:autogold", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_protobuf//encoding/protojson", + "@org_golang_google_protobuf//proto", + "@org_golang_google_protobuf//types/known/structpb", + "@org_golang_google_protobuf//types/known/timestamppb", + ], +) diff --git a/internal/telemetrygateway/v1/backcompat_test.go b/internal/telemetrygateway/v1/backcompat_test.go new file mode 100644 index 00000000000..e099e8d0605 --- /dev/null +++ b/internal/telemetrygateway/v1/backcompat_test.go @@ -0,0 +1,125 @@ +package v1_test + +import ( + "flag" + "fmt" + "io/fs" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/sourcegraph/sourcegraph/lib/pointers" + + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" +) + +var ( + createSnapshot = flag.Bool("snapshot", false, "generate a new snapshot") + snapshotsDir = filepath.Join("testdata", "snapshots") + + // sampleEvent should have all fields populated - a snapshot from this event + // is generated with the -snapshot flag, see TestBackcompat() for more + // details. + sampleEvent = &telemetrygatewayv1.Event{ + Id: "1234", + Feature: "Feature", + Action: "Action", + Timestamp: timestamppb.New(must(time.Parse(time.RFC3339, "2023-02-24T14:48:30Z"))), + Source: &telemetrygatewayv1.EventSource{ + Server: &telemetrygatewayv1.EventSource_Server{ + Version: "dev", + }, + Client: &telemetrygatewayv1.EventSource_Client{ + Name: "CLIENT", + Version: pointers.Ptr("VERSION"), + }, + }, + Parameters: &telemetrygatewayv1.EventParameters{ + Version: 1, + Metadata: map[string]int64{"metadata": 1}, + PrivateMetadata: must(structpb.NewStruct(map[string]any{"private": "data"})), + BillingMetadata: &telemetrygatewayv1.EventBillingMetadata{ + Product: "Product", + Category: "Category", + }, + }, + User: &telemetrygatewayv1.EventUser{ + UserId: pointers.Ptr(int64(1234)), + AnonymousUserId: pointers.Ptr("anonymous"), + }, + FeatureFlags: &telemetrygatewayv1.EventFeatureFlags{ + Flags: map[string]string{"feature": "true"}, + }, + MarketingTracking: &telemetrygatewayv1.EventMarketingTracking{ + Url: pointers.Ptr("value"), + FirstSourceUrl: pointers.Ptr("value"), + CohortId: pointers.Ptr("value"), + Referrer: pointers.Ptr("value"), + LastSourceUrl: pointers.Ptr("value"), + DeviceSessionId: pointers.Ptr("value"), + SessionReferrer: pointers.Ptr("value"), + SessionFirstUrl: pointers.Ptr("value"), + }, + } +) + +// TestBackcompat asserts that past events marshalled in the proto wire format, +// tracked in internal/telemetrygateway/v1/testdata/snapshots, continue to be +// able to be marshalled by the current v1 types to ensure we don't introduce +// any breaking changes. +// +// New snapshots should be manually created as the spec evolves by updating +// sampleEvent and running the test with the '-snapshot' flag: +// +// go test -v ./internal/telemetrygateway/v1 -snapshot +// +// Without the '-snapshot' flag, this test just loads existing snapshots and +// asserts they can still be unmarshalled. +func TestBackcompat(t *testing.T) { + if *createSnapshot { + data, err := proto.Marshal(sampleEvent) + require.NoError(t, err) + + f := filepath.Join(snapshotsDir, time.Now().Format(time.DateOnly)+".pb") + if _, err := os.Stat(f); err == nil { + t.Logf("Snapshot %s exists, recreating it", f) + _ = os.Remove(f) + } + require.NoError(t, os.WriteFile(f, data, 0644)) + t.Logf("Wrote snapshot to %s", f) + } + + var tested int + require.NoError(t, filepath.WalkDir(snapshotsDir, func(path string, d fs.DirEntry, err error) error { + if d.IsDir() { + return nil + } + + tested += 1 + t.Run(fmt.Sprintf("snapshot %s", path), func(t *testing.T) { + data, err := os.ReadFile(path) + require.NoError(t, err) + + // Existing snapshot must unmarshal without error. + var event telemetrygatewayv1.Event + assert.NoError(t, proto.Unmarshal(data, &event)) + // TODO: Assert somehow that the unmarshalled event looks as expected. + }) + return nil + })) + t.Logf("Tested %d snapshots", tested) +} + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/internal/telemetrygateway/v1/event.go b/internal/telemetrygateway/v1/event.go new file mode 100644 index 00000000000..65fae2eb154 --- /dev/null +++ b/internal/telemetrygateway/v1/event.go @@ -0,0 +1,50 @@ +package v1 + +import ( + "context" + "strconv" + "time" + + "github.com/google/uuid" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/featureflag" + "github.com/sourcegraph/sourcegraph/lib/pointers" +) + +// DefaultEventIDFunc is the default generator for telemetry event IDs. +var DefaultEventIDFunc = uuid.NewString + +// NewEventWithDefaults creates a uniform event with defaults filled in. All +// constructors making raw events should start with this. In particular, this +// adds any relevant data required from context. +func NewEventWithDefaults(ctx context.Context, now time.Time, newEventID func() string) *Event { + return &Event{ + Id: newEventID(), + Timestamp: timestamppb.New(now), + User: func() *EventUser { + act := actor.FromContext(ctx) + if !act.IsAuthenticated() && act.AnonymousUID == "" { + return nil + } + return &EventUser{ + UserId: pointers.NonZeroPtr(int64(act.UID)), + AnonymousUserId: pointers.NonZeroPtr(act.AnonymousUID), + } + }(), + FeatureFlags: func() *EventFeatureFlags { + flags := featureflag.GetEvaluatedFlagSet(ctx) + if len(flags) == 0 { + return nil + } + data := make(map[string]string, len(flags)) + for k, v := range flags { + data[k] = strconv.FormatBool(v) + } + return &EventFeatureFlags{ + Flags: data, + } + }(), + } +} diff --git a/internal/telemetrygateway/v1/event_test.go b/internal/telemetrygateway/v1/event_test.go new file mode 100644 index 00000000000..bd5934b95b9 --- /dev/null +++ b/internal/telemetrygateway/v1/event_test.go @@ -0,0 +1,53 @@ +package v1_test + +import ( + context "context" + "encoding/json" + "testing" + "time" + + "github.com/hexops/autogold/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/sourcegraph/sourcegraph/internal/actor" + + telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1" +) + +func TestNewEventWithDefaults(t *testing.T) { + staticTime, err := time.Parse(time.RFC3339, "2023-02-24T14:48:30Z") + require.NoError(t, err) + + t.Run("extract actor and flags", func(t *testing.T) { + var userID int32 = 123 + ctx := actor.WithActor(context.Background(), actor.FromMockUser(userID)) + + // NOTE: We can't test the feature flag part easily because + // featureflag.GetEvaluatedFlagSet depends on Redis, and the package + // is not designed for it to easily be stubbed out for testing. + // Since it's used for existing telemetry, we trust it works. + + got := telemetrygatewayv1.NewEventWithDefaults(ctx, staticTime, func() string { return "id" }) + assert.NotNil(t, got.User) + + protodata, err := protojson.Marshal(got) + require.NoError(t, err) + + // Protojson output isn't stable by injecting randomized whitespace, + // so we re-marshal it to stabilize the output for golden tests. + // https://github.com/golang/protobuf/issues/1082 + var gotJSON map[string]any + require.NoError(t, json.Unmarshal(protodata, &gotJSON)) + jsondata, err := json.MarshalIndent(gotJSON, "", " ") + require.NoError(t, err) + autogold.Expect(`{ + "id": "id", + "timestamp": "2023-02-24T14:48:30Z", + "user": { + "userId": "123" + } +}`).Equal(t, string(jsondata)) + }) +} diff --git a/internal/telemetrygateway/v1/main_test.go b/internal/telemetrygateway/v1/main_test.go new file mode 100644 index 00000000000..b98d6682c9e --- /dev/null +++ b/internal/telemetrygateway/v1/main_test.go @@ -0,0 +1,12 @@ +package v1 + +import ( + "flag" + "os" + "testing" +) + +func TestMain(m *testing.M) { + flag.Parse() + os.Exit(m.Run()) +} diff --git a/internal/telemetrygateway/v1/testdata/snapshots/2023-09-15.pb b/internal/telemetrygateway/v1/testdata/snapshots/2023-09-15.pb new file mode 100644 index 00000000000..772439abaa5 --- /dev/null +++ b/internal/telemetrygateway/v1/testdata/snapshots/2023-09-15.pb @@ -0,0 +1,12 @@ + +1234¾œãŸFeature"Action* + +dev +CLIENTVERSION2: +metadata + +privatedata" +ProductCategory:Ò  anonymousB + +featuretrueJ8 +valuevaluevalue"value*value2value:valueBvalue \ No newline at end of file diff --git a/migrations/frontend/1694806099_add_telemetryeventsexportqueuestore/down.sql b/migrations/frontend/1694806099_add_telemetryeventsexportqueuestore/down.sql new file mode 100644 index 00000000000..99ef58486b3 --- /dev/null +++ b/migrations/frontend/1694806099_add_telemetryeventsexportqueuestore/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS telemetry_events_export_queue; diff --git a/migrations/frontend/1694806099_add_telemetryeventsexportqueuestore/metadata.yaml b/migrations/frontend/1694806099_add_telemetryeventsexportqueuestore/metadata.yaml new file mode 100644 index 00000000000..10a6b371bce --- /dev/null +++ b/migrations/frontend/1694806099_add_telemetryeventsexportqueuestore/metadata.yaml @@ -0,0 +1,2 @@ +name: Add TelemetryEventsExportQueueStore +parents: [1691759644, 1693825517] diff --git a/migrations/frontend/1694806099_add_telemetryeventsexportqueuestore/up.sql b/migrations/frontend/1694806099_add_telemetryeventsexportqueuestore/up.sql new file mode 100644 index 00000000000..7c6d59db0cd --- /dev/null +++ b/migrations/frontend/1694806099_add_telemetryeventsexportqueuestore/up.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS telemetry_events_export_queue ( + id TEXT PRIMARY KEY, + timestamp TIMESTAMPTZ NOT NULL, + payload_pb BYTEA NOT NULL, + exported_at TIMESTAMPTZ DEFAULT NULL +); diff --git a/migrations/frontend/squashed.sql b/migrations/frontend/squashed.sql index 6f1b5b84762..87b8341b250 100755 --- a/migrations/frontend/squashed.sql +++ b/migrations/frontend/squashed.sql @@ -4677,6 +4677,13 @@ CREATE SEQUENCE teams_id_seq ALTER SEQUENCE teams_id_seq OWNED BY teams.id; +CREATE TABLE telemetry_events_export_queue ( + id text NOT NULL, + "timestamp" timestamp with time zone NOT NULL, + payload_pb bytea NOT NULL, + exported_at timestamp with time zone +); + CREATE TABLE temporary_settings ( id integer NOT NULL, user_id integer NOT NULL, @@ -5761,6 +5768,9 @@ ALTER TABLE ONLY team_members ALTER TABLE ONLY teams ADD CONSTRAINT teams_pkey PRIMARY KEY (id); +ALTER TABLE ONLY telemetry_events_export_queue + ADD CONSTRAINT telemetry_events_export_queue_pkey PRIMARY KEY (id); + ALTER TABLE ONLY temporary_settings ADD CONSTRAINT temporary_settings_pkey PRIMARY KEY (id);