telemetry: init internal APIs and events cache store layer (#56520)

This PR introduces:

1. a new backend telemetry API, with `internal/telemetry.EventRecorder`, for backend services to generate their own events, based on [the proposal](https://docs.google.com/document/d/14WBt80sbmVm73B-R1Srs5cSunZo2IhDMtKiyAgkYujU/edit#bookmark=id.wy4lr5f3lxyk)
   - this is similar to the clientside SDK: https://github.com/sourcegraph/telemetry
3. a caching layer in the database, `internal/database.TelemetryEventsExportQueueStore`, that stores raw Protobuf messages so that a worker can be implemented on top to pull events out and export them in batches
   - planned strategy is to mark exported entries as exported, and then periodically prune exported events from the table after N day(s)
4. an `internal/telemetry/teestore.Store` which tees events into the existing `event_logs` table (which we are no longer considering revamping due to extensive existing integrations with it) as well as the new `TelemetryEventsExportQueueStore`
   - adding things to this store is behind an off-by-default feature flag `telemetry-export` - when the flag is disabled, adding to the store is a no-op
5. adapters to send events from `telemetry.EventRecorder` and the GraphQL mutation added in https://github.com/sourcegraph/sourcegraph/pull/56297 to the `teestore.Store`

Actually exporting things, and the destination service itself, will be built in https://github.com/sourcegraph/sourcegraph/pull/56699
Stacked on https://github.com/sourcegraph/sourcegraph/pull/56519

Closes https://github.com/sourcegraph/sourcegraph/issues/56283
Closes https://github.com/sourcegraph/sourcegraph/issues/56285
Closes https://github.com/sourcegraph/sourcegraph/issues/56286

## Test plan

Unit and integration tests, and manual test plan in https://github.com/sourcegraph/sourcegraph/pull/56699
This commit is contained in:
Robert Lin 2023-09-19 21:53:42 -07:00 committed by GitHub
parent 9e717bb874
commit fb2a4a670e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1938 additions and 6 deletions

View File

@ -1,14 +1,36 @@
load("//dev:go_defs.bzl", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "resolvers",
srcs = ["resolvers.go"],
srcs = [
"resolvers.go",
"telemetrygateway.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/frontend/internal/telemetry/resolvers",
visibility = ["//cmd/frontend:__subpackages__"],
deps = [
"//cmd/frontend/graphqlbackend",
"//internal/database",
"//internal/telemetry/teestore",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/version",
"//lib/errors",
"@com_github_sourcegraph_log//:log",
"@org_golang_google_protobuf//types/known/structpb",
],
)
go_test(
name = "resolvers_test",
srcs = ["telemetrygateway_test.go"],
embed = [":resolvers"],
deps = [
"//cmd/frontend/graphqlbackend",
"//internal/actor",
"//lib/pointers",
"@com_github_hexops_autogold_v2//:autogold",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//encoding/protojson",
],
)

View File

@ -2,27 +2,47 @@ package resolvers
import (
"context"
"encoding/json"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/telemetry/teestore"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// Resolver is the GraphQL resolver of all things related to telemetry V2.
type Resolver struct {
logger log.Logger
db database.DB
logger log.Logger
teestore teestore.Store
}
// New returns a new Resolver whose store uses the given database
func New(logger log.Logger, db database.DB) graphqlbackend.TelemetryResolver {
return &Resolver{logger: logger, db: db}
return &Resolver{logger: logger, teestore: teestore.NewStore(db.TelemetryEventsExportQueue(), db.EventLogs())}
}
var _ graphqlbackend.TelemetryResolver = &Resolver{}
func (r *Resolver) RecordEvents(ctx context.Context, args *graphqlbackend.RecordEventsArgs) (*graphqlbackend.EmptyResponse, error) {
return nil, errors.New("not implemented")
if args == nil || len(args.Events) == 0 {
return nil, errors.New("no events provided")
}
gatewayEvents, err := newTelemetryGatewayEvents(ctx, time.Now(), telemetrygatewayv1.DefaultEventIDFunc, args.Events)
if err != nil {
// This is an important failure, make sure we surface it, as it could be
// an implementation error.
data, _ := json.Marshal(args.Events)
r.logger.Error("failed to convert telemetry events to internal format",
log.Error(err),
log.String("eventData", string(data)))
return nil, errors.Wrap(err, "invalid events provided")
}
if err := r.teestore.StoreEvents(ctx, gatewayEvents); err != nil {
return nil, errors.Wrap(err, "error storing events")
}
return &graphqlbackend.EmptyResponse{}, nil
}

View File

@ -0,0 +1,97 @@
package resolvers
import (
"context"
"encoding/json"
"time"
"google.golang.org/protobuf/types/known/structpb"
"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/internal/version"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func newTelemetryGatewayEvents(
ctx context.Context,
now time.Time,
newUUID func() string,
events []graphqlbackend.TelemetryEventInput,
) ([]*telemetrygatewayv1.Event, error) {
gatewayEvents := make([]*telemetrygatewayv1.Event, len(events))
for i, e := range events {
event := telemetrygatewayv1.NewEventWithDefaults(ctx, now, newUUID)
event.Feature = e.Feature
event.Action = e.Action
// Parse private metadata
var privateMetadata *structpb.Struct
if e.Parameters.PrivateMetadata != nil && len(*e.Parameters.PrivateMetadata) > 0 {
data, err := e.Parameters.PrivateMetadata.MarshalJSON()
if err != nil {
return nil, errors.Wrapf(err, "error marshaling privateMetadata for event %d", i)
}
var privateData map[string]any
if err := json.Unmarshal(data, &privateData); err != nil {
return nil, errors.Wrapf(err, "error unmarshaling privateMetadata for event %d", i)
}
privateMetadata, err = structpb.NewStruct(privateData)
if err != nil {
return nil, errors.Wrapf(err, "error converting privateMetadata to protobuf for event %d", i)
}
}
// Configure parameters
event.Parameters = &telemetrygatewayv1.EventParameters{
Version: e.Parameters.Version,
Metadata: func() map[string]int64 {
if e.Parameters.Metadata == nil || len(*e.Parameters.Metadata) == 0 {
return nil
}
metadata := make(map[string]int64, len(*e.Parameters.Metadata))
for _, kv := range *e.Parameters.Metadata {
metadata[kv.Key] = int64(kv.Value)
}
return metadata
}(),
PrivateMetadata: privateMetadata,
BillingMetadata: func() *telemetrygatewayv1.EventBillingMetadata {
if e.Parameters.BillingMetadata == nil {
return nil
}
return &telemetrygatewayv1.EventBillingMetadata{
Product: e.Parameters.BillingMetadata.Product,
Category: e.Parameters.BillingMetadata.Category,
}
}(),
}
event.Source = &telemetrygatewayv1.EventSource{
Server: &telemetrygatewayv1.EventSource_Server{
Version: version.Version(),
},
Client: &telemetrygatewayv1.EventSource_Client{
Name: e.Source.Client,
Version: e.Source.ClientVersion,
},
}
if e.MarketingTracking != nil {
event.MarketingTracking = &telemetrygatewayv1.EventMarketingTracking{
Url: e.MarketingTracking.Url,
FirstSourceUrl: e.MarketingTracking.FirstSourceURL,
CohortId: e.MarketingTracking.CohortID,
Referrer: e.MarketingTracking.Referrer,
LastSourceUrl: e.MarketingTracking.LastSourceURL,
DeviceSessionId: e.MarketingTracking.DeviceSessionID,
SessionReferrer: e.MarketingTracking.SessionReferrer,
SessionFirstUrl: e.MarketingTracking.SessionFirstURL,
}
}
// Done!
gatewayEvents[i] = event
}
return gatewayEvents, nil
}

View File

@ -0,0 +1,167 @@
package resolvers
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/hexops/autogold/v2"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/lib/pointers"
)
func TestNewTelemetryGatewayEvents(t *testing.T) {
staticTime, err := time.Parse(time.RFC3339, "2023-02-24T14:48:30Z")
require.NoError(t, err)
for _, tc := range []struct {
name string
ctx context.Context
event graphqlbackend.TelemetryEventInput
expect autogold.Value
}{
{
name: "basic",
ctx: context.Background(),
event: graphqlbackend.TelemetryEventInput{
Feature: "Feature",
Action: "Example",
},
expect: autogold.Expect(`{
"action": "Example",
"feature": "Feature",
"id": "basic",
"parameters": {},
"source": {
"client": {},
"server": {
"version": "0.0.0+dev"
}
},
"timestamp": "2023-02-24T14:48:30Z"
}`),
},
{
name: "with anonymous user",
ctx: actor.WithActor(context.Background(), actor.FromAnonymousUser("1234")),
event: graphqlbackend.TelemetryEventInput{
Feature: "Feature",
Action: "Example",
},
expect: autogold.Expect(`{
"action": "Example",
"feature": "Feature",
"id": "with anonymous user",
"parameters": {},
"source": {
"client": {},
"server": {
"version": "0.0.0+dev"
}
},
"timestamp": "2023-02-24T14:48:30Z",
"user": {
"anonymousUserId": "1234"
}
}`),
},
{
name: "with authenticated user",
ctx: actor.WithActor(context.Background(), actor.FromMockUser(1234)),
event: graphqlbackend.TelemetryEventInput{
Feature: "Feature",
Action: "Example",
},
expect: autogold.Expect(`{
"action": "Example",
"feature": "Feature",
"id": "with authenticated user",
"parameters": {},
"source": {
"client": {},
"server": {
"version": "0.0.0+dev"
}
},
"timestamp": "2023-02-24T14:48:30Z",
"user": {
"userId": "1234"
}
}`),
},
{
name: "with parameters",
ctx: context.Background(),
event: graphqlbackend.TelemetryEventInput{
Feature: "Feature",
Action: "Example",
Parameters: graphqlbackend.TelemetryEventParametersInput{
Version: 0,
Metadata: &[]graphqlbackend.TelemetryEventMetadataInput{
{
Key: "metadata",
Value: 123,
},
},
PrivateMetadata: pointers.Ptr(json.RawMessage(`{"private": "super-sensitive"}`)),
BillingMetadata: &graphqlbackend.TelemetryEventBillingMetadataInput{
Product: "Product",
Category: "Category",
},
},
},
expect: autogold.Expect(`{
"action": "Example",
"feature": "Feature",
"id": "with parameters",
"parameters": {
"billingMetadata": {
"category": "Category",
"product": "Product"
},
"metadata": {
"metadata": "123"
},
"privateMetadata": {
"private": "super-sensitive"
}
},
"source": {
"client": {},
"server": {
"version": "0.0.0+dev"
}
},
"timestamp": "2023-02-24T14:48:30Z"
}`),
},
} {
t.Run(tc.name, func(t *testing.T) {
got, err := newTelemetryGatewayEvents(tc.ctx,
staticTime,
func() string { return tc.name },
[]graphqlbackend.TelemetryEventInput{
tc.event,
})
require.NoError(t, err)
require.Len(t, got, 1)
protodata, err := protojson.Marshal(got[0])
require.NoError(t, err)
// Protojson output isn't stable by injecting randomized whitespace,
// so we re-marshal it to stabilize the output for golden tests.
// https://github.com/golang/protobuf/issues/1082
var gotJSON map[string]any
require.NoError(t, json.Unmarshal(protodata, &gotJSON))
jsondata, err := json.MarshalIndent(gotJSON, "", " ")
require.NoError(t, err)
tc.expect.Equal(t, string(jsondata))
})
}
}

View File

@ -80,6 +80,7 @@ go_library(
"sub_repo_perms_store.go",
"survey_responses.go",
"teams.go",
"telemetry_export_store.go",
"temporary_settings.go",
"user_credentials.go",
"user_emails.go",
@ -141,6 +142,7 @@ go_library(
"//internal/rbac/types",
"//internal/search/result",
"//internal/security",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/temporarysettings",
"//internal/timeutil",
"//internal/trace",
@ -249,6 +251,7 @@ go_test(
"sub_repo_perms_store_test.go",
"survey_responses_test.go",
"teams_test.go",
"telemetry_export_store_test.go",
"temporary_settings_test.go",
"user_credentials_test.go",
"user_emails_test.go",
@ -295,6 +298,7 @@ go_test(
"//internal/perforce",
"//internal/rbac/types",
"//internal/search/result",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/temporarysettings",
"//internal/timeutil",
"//internal/trace",
@ -322,7 +326,10 @@ go_test(
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tidwall_gjson//:gjson",
"@org_golang_google_protobuf//proto",
"@org_golang_google_protobuf//testing/protocmp",
"@org_golang_google_protobuf//types/known/structpb",
"@org_golang_google_protobuf//types/known/timestamppb",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",

View File

@ -64,6 +64,7 @@ type DB interface {
Settings() SettingsStore
SubRepoPerms() SubRepoPermsStore
TemporarySettings() TemporarySettingsStore
TelemetryEventsExportQueue() TelemetryEventsExportQueueStore
UserCredentials(encryption.Key) UserCredentialsStore
UserEmails() UserEmailsStore
UserExternalAccounts() UserExternalAccountsStore
@ -301,6 +302,13 @@ func (d *db) TemporarySettings() TemporarySettingsStore {
return TemporarySettingsWith(d.Store)
}
func (d *db) TelemetryEventsExportQueue() TelemetryEventsExportQueueStore {
return TelemetryEventsExportQueueWith(
d.logger.Scoped("telemetry_events", "telemetry events export queue store"),
d.Store,
)
}
func (d *db) UserCredentials(key encryption.Key) UserCredentialsStore {
return UserCredentialsWith(d.logger, d.Store, key)
}

View File

@ -15061,6 +15061,10 @@ type MockDB struct {
// TeamsFunc is an instance of a mock function object controlling the
// behavior of the method Teams.
TeamsFunc *DBTeamsFunc
// TelemetryEventsExportQueueFunc is an instance of a mock function
// object controlling the behavior of the method
// TelemetryEventsExportQueue.
TelemetryEventsExportQueueFunc *DBTelemetryEventsExportQueueFunc
// TemporarySettingsFunc is an instance of a mock function object
// controlling the behavior of the method TemporarySettings.
TemporarySettingsFunc *DBTemporarySettingsFunc
@ -15372,6 +15376,11 @@ func NewMockDB() *MockDB {
return
},
},
TelemetryEventsExportQueueFunc: &DBTelemetryEventsExportQueueFunc{
defaultHook: func() (r0 database.TelemetryEventsExportQueueStore) {
return
},
},
TemporarySettingsFunc: &DBTemporarySettingsFunc{
defaultHook: func() (r0 database.TemporarySettingsStore) {
return
@ -15704,6 +15713,11 @@ func NewStrictMockDB() *MockDB {
panic("unexpected invocation of MockDB.Teams")
},
},
TelemetryEventsExportQueueFunc: &DBTelemetryEventsExportQueueFunc{
defaultHook: func() database.TelemetryEventsExportQueueStore {
panic("unexpected invocation of MockDB.TelemetryEventsExportQueue")
},
},
TemporarySettingsFunc: &DBTemporarySettingsFunc{
defaultHook: func() database.TemporarySettingsStore {
panic("unexpected invocation of MockDB.TemporarySettings")
@ -15926,6 +15940,9 @@ func NewMockDBFrom(i database.DB) *MockDB {
TeamsFunc: &DBTeamsFunc{
defaultHook: i.Teams,
},
TelemetryEventsExportQueueFunc: &DBTelemetryEventsExportQueueFunc{
defaultHook: i.TelemetryEventsExportQueue,
},
TemporarySettingsFunc: &DBTemporarySettingsFunc{
defaultHook: i.TemporarySettings,
},
@ -21443,6 +21460,106 @@ func (c DBTeamsFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// DBTelemetryEventsExportQueueFunc describes the behavior when the
// TelemetryEventsExportQueue method of the parent MockDB instance is
// invoked.
type DBTelemetryEventsExportQueueFunc struct {
defaultHook func() database.TelemetryEventsExportQueueStore
hooks []func() database.TelemetryEventsExportQueueStore
history []DBTelemetryEventsExportQueueFuncCall
mutex sync.Mutex
}
// TelemetryEventsExportQueue delegates to the next hook function in the
// queue and stores the parameter and result values of this invocation.
func (m *MockDB) TelemetryEventsExportQueue() database.TelemetryEventsExportQueueStore {
r0 := m.TelemetryEventsExportQueueFunc.nextHook()()
m.TelemetryEventsExportQueueFunc.appendCall(DBTelemetryEventsExportQueueFuncCall{r0})
return r0
}
// SetDefaultHook sets function that is called when the
// TelemetryEventsExportQueue method of the parent MockDB instance is
// invoked and the hook queue is empty.
func (f *DBTelemetryEventsExportQueueFunc) SetDefaultHook(hook func() database.TelemetryEventsExportQueueStore) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// TelemetryEventsExportQueue method of the parent MockDB instance invokes
// the hook at the front of the queue and discards it. After the queue is
// empty, the default hook function is invoked for any future action.
func (f *DBTelemetryEventsExportQueueFunc) PushHook(hook func() database.TelemetryEventsExportQueueStore) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *DBTelemetryEventsExportQueueFunc) SetDefaultReturn(r0 database.TelemetryEventsExportQueueStore) {
f.SetDefaultHook(func() database.TelemetryEventsExportQueueStore {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *DBTelemetryEventsExportQueueFunc) PushReturn(r0 database.TelemetryEventsExportQueueStore) {
f.PushHook(func() database.TelemetryEventsExportQueueStore {
return r0
})
}
func (f *DBTelemetryEventsExportQueueFunc) nextHook() func() database.TelemetryEventsExportQueueStore {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *DBTelemetryEventsExportQueueFunc) appendCall(r0 DBTelemetryEventsExportQueueFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of DBTelemetryEventsExportQueueFuncCall
// objects describing the invocations of this function.
func (f *DBTelemetryEventsExportQueueFunc) History() []DBTelemetryEventsExportQueueFuncCall {
f.mutex.Lock()
history := make([]DBTelemetryEventsExportQueueFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// DBTelemetryEventsExportQueueFuncCall is an object that describes an
// invocation of method TelemetryEventsExportQueue on an instance of MockDB.
type DBTelemetryEventsExportQueueFuncCall struct {
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 database.TelemetryEventsExportQueueStore
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c DBTelemetryEventsExportQueueFuncCall) Args() []interface{} {
return []interface{}{}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c DBTelemetryEventsExportQueueFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// DBTemporarySettingsFunc describes the behavior when the TemporarySettings
// method of the parent MockDB instance is invoked.
type DBTemporarySettingsFunc struct {

View File

@ -12,6 +12,7 @@ import (
"github.com/gofrs/uuid"
"github.com/keegancsmith/sqlf"
"github.com/lib/pq"
"go.opentelemetry.io/otel/attribute"
sgactor "github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/conf"
@ -22,6 +23,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/featureflag"
"github.com/sourcegraph/sourcegraph/internal/jsonc"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/internal/types"
"github.com/sourcegraph/sourcegraph/internal/version"
"github.com/sourcegraph/sourcegraph/lib/errors"
@ -216,6 +218,11 @@ func (l *eventLogStore) Insert(ctx context.Context, e *Event) error {
const EventLogsSourcegraphOperatorKey = "sourcegraph_operator"
func (l *eventLogStore) BulkInsert(ctx context.Context, events []*Event) error {
var tr trace.Trace
tr, ctx = trace.New(ctx, "eventLogs.BulkInsert",
attribute.Int("events", len(events)))
defer tr.End()
coalesce := func(v json.RawMessage) json.RawMessage {
if v != nil {
return v

View File

@ -26567,6 +26567,78 @@
],
"Triggers": []
},
{
"Name": "telemetry_events_export_queue",
"Comment": "",
"Columns": [
{
"Name": "exported_at",
"Index": 4,
"TypeName": "timestamp with time zone",
"IsNullable": true,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "id",
"Index": 1,
"TypeName": "text",
"IsNullable": false,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "payload_pb",
"Index": 3,
"TypeName": "bytea",
"IsNullable": false,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "timestamp",
"Index": 2,
"TypeName": "timestamp with time zone",
"IsNullable": false,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
}
],
"Indexes": [
{
"Name": "telemetry_events_export_queue_pkey",
"IsPrimaryKey": true,
"IsUnique": true,
"IsExclusion": false,
"IsDeferrable": false,
"IndexDefinition": "CREATE UNIQUE INDEX telemetry_events_export_queue_pkey ON telemetry_events_export_queue USING btree (id)",
"ConstraintType": "p",
"ConstraintDefinition": "PRIMARY KEY (id)"
}
],
"Constraints": null,
"Triggers": []
},
{
"Name": "temporary_settings",
"Comment": "Stores per-user temporary settings used in the UI, for example, which modals have been dimissed or what theme is preferred.",

View File

@ -4064,6 +4064,19 @@ Referenced by:
```
# Table "public.telemetry_events_export_queue"
```
Column | Type | Collation | Nullable | Default
-------------+--------------------------+-----------+----------+---------
id | text | | not null |
timestamp | timestamp with time zone | | not null |
payload_pb | bytea | | not null |
exported_at | timestamp with time zone | | |
Indexes:
"telemetry_events_export_queue_pkey" PRIMARY KEY, btree (id)
```
# Table "public.temporary_settings"
```
Column | Type | Collation | Nullable | Default

View File

@ -0,0 +1,190 @@
package database
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/protobuf/proto"
"github.com/lib/pq"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/batch"
"github.com/sourcegraph/sourcegraph/internal/featureflag"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// FeatureFlagTelemetryExport enables telemetry export by allowing events to be
// queued for export via (TelemetryEventsExportQueueStore).QueueForExport
const FeatureFlagTelemetryExport = "telemetry-export"
type TelemetryEventsExportQueueStore interface {
basestore.ShareableStore
// QueueForExport caches a set of events for later export. It is currently
// feature-flagged, such that if the flag is not enabled for the given
// context, we do not cache the event for export.
QueueForExport(context.Context, []*telemetrygatewayv1.Event) error
// ListForExport returns the cached events that should be exported next. All
// events returned should be exported.
//
// 🚨 SECURITY: Potentially sensitive parts of the payload are retained at
// this stage. The caller is responsible for ensuring sensitive data is
// stripped.
ListForExport(ctx context.Context, limit int) ([]*telemetrygatewayv1.Event, error)
// MarkAsExported marks all events in the set of IDs as exported.
MarkAsExported(ctx context.Context, eventIDs []string) error
// DeletedExported deletes all events exported before the given timestamp,
// returning the number of affected events.
DeletedExported(ctx context.Context, before time.Time) (int64, error)
}
func TelemetryEventsExportQueueWith(logger log.Logger, other basestore.ShareableStore) TelemetryEventsExportQueueStore {
return &telemetryEventsExportQueueStore{
logger: logger,
ShareableStore: other,
}
}
type telemetryEventsExportQueueStore struct {
logger log.Logger
basestore.ShareableStore
}
// See interface docstring.
func (s *telemetryEventsExportQueueStore) QueueForExport(ctx context.Context, events []*telemetrygatewayv1.Event) error {
var tr trace.Trace
tr, ctx = trace.New(ctx, "telemetryevents.QueueForExport",
attribute.Int("events", len(events)))
defer tr.End()
logger := trace.Logger(ctx, s.logger)
if flags := featureflag.FromContext(ctx); flags == nil || !flags.GetBoolOr(FeatureFlagTelemetryExport, false) {
tr.SetAttributes(attribute.Bool("enabled", false))
return nil // no-op
} else {
tr.SetAttributes(attribute.Bool("enabled", true))
}
if len(events) == 0 {
return nil
}
return batch.InsertValues(ctx,
s.Handle(),
"telemetry_events_export_queue",
batch.MaxNumPostgresParameters,
[]string{
"id",
"timestamp",
"payload_pb",
},
insertChannel(logger, events))
}
func insertChannel(logger log.Logger, events []*telemetrygatewayv1.Event) <-chan []any {
ch := make(chan []any, len(events))
go func() {
defer close(ch)
for _, ev := range events {
payloadPB, err := proto.Marshal(ev)
if err != nil {
logger.Error("failed to marshal telemetry event",
log.String("event.feature", ev.GetFeature()),
log.String("event.action", ev.GetAction()),
log.String("event.source.client.name", ev.GetSource().GetClient().GetName()),
log.String("event.source.client.version", ev.GetSource().GetClient().GetVersion()),
log.Error(err))
continue
}
ch <- []any{
ev.Id, // id
ev.Timestamp.AsTime(), // timestamp
payloadPB, // payload_pb
}
}
}()
return ch
}
// See interface docstring.
func (s *telemetryEventsExportQueueStore) ListForExport(ctx context.Context, limit int) ([]*telemetrygatewayv1.Event, error) {
var tr trace.Trace
tr, ctx = trace.New(ctx, "telemetryevents.ListForExport",
attribute.Int("limit", limit))
defer tr.End()
logger := trace.Logger(ctx, s.logger)
rows, err := s.ShareableStore.Handle().QueryContext(ctx, `
SELECT id, payload_pb
FROM telemetry_events_export_queue
WHERE exported_at IS NULL
ORDER BY timestamp ASC
LIMIT $1`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
events := make([]*telemetrygatewayv1.Event, 0, limit)
for rows.Next() {
var id string
var payloadPB []byte
err := rows.Scan(&id, &payloadPB)
if err != nil {
return nil, err
}
var event telemetrygatewayv1.Event
if err := proto.Unmarshal(payloadPB, &event); err != nil {
tr.RecordError(err)
logger.Error("failed to unmarshal telemetry event payload",
log.String("id", id),
log.Error(err))
// Not fatal, just ignore this event for now, leaving it in DB for
// investigation.
continue
}
events = append(events, &event)
}
tr.SetAttributes(attribute.Int("events", len(events)))
if err := rows.Err(); err != nil {
return nil, err
}
return events, nil
}
// See interface docstring.
func (s *telemetryEventsExportQueueStore) MarkAsExported(ctx context.Context, eventIDs []string) error {
if _, err := s.ShareableStore.Handle().ExecContext(ctx, `
UPDATE telemetry_events_export_queue
SET exported_at = NOW()
WHERE id = ANY($1);
`, pq.Array(eventIDs)); err != nil {
return errors.Wrap(err, "failed to mark events as exported")
}
return nil
}
func (s *telemetryEventsExportQueueStore) DeletedExported(ctx context.Context, before time.Time) (int64, error) {
result, err := s.ShareableStore.Handle().ExecContext(ctx, `
DELETE FROM telemetry_events_export_queue
WHERE
exported_at IS NOT NULL
AND exported_at < $1;
`, before)
if err != nil {
return 0, errors.Wrap(err, "failed to mark events as exported")
}
return result.RowsAffected()
}

View File

@ -0,0 +1,101 @@
package database
import (
"context"
"testing"
"time"
"github.com/sourcegraph/log/logtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/featureflag"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
func TestTelemetryEventsExportQueueLifecycle(t *testing.T) {
// Context with FF enabled.
ff := featureflag.NewMemoryStore(
nil, nil, map[string]bool{FeatureFlagTelemetryExport: true})
ctx := featureflag.WithFlags(context.Background(), ff)
logger := logtest.Scoped(t)
db := NewDB(logger, dbtest.NewDB(logger, t))
store := TelemetryEventsExportQueueWith(logger, db)
events := []*telemetrygatewayv1.Event{{
Id: "1",
Feature: "Feature",
Action: "View",
Timestamp: timestamppb.New(time.Date(2022, 11, 3, 1, 0, 0, 0, time.UTC)),
Parameters: &telemetrygatewayv1.EventParameters{
Metadata: map[string]int64{"public": 1},
PrivateMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{"sensitive": structpb.NewStringValue("sensitive")},
},
},
}, {
Id: "2",
Feature: "Feature",
Action: "Click",
Timestamp: timestamppb.New(time.Date(2022, 11, 3, 2, 0, 0, 0, time.UTC)),
}, {
Id: "3",
Feature: "Feature",
Action: "Show",
Timestamp: timestamppb.New(time.Date(2022, 11, 3, 3, 0, 0, 0, time.UTC)),
}}
eventsToExport := []string{"1", "2"}
t.Run("feature flag off", func(t *testing.T) {
require.NoError(t, store.QueueForExport(context.Background(), events))
export, err := store.ListForExport(ctx, 100)
require.NoError(t, err)
assert.Len(t, export, 0)
})
t.Run("QueueForExport", func(t *testing.T) {
require.NoError(t, store.QueueForExport(ctx, events))
})
t.Run("ListForExport", func(t *testing.T) {
limit := len(events) - 1
export, err := store.ListForExport(ctx, limit)
require.NoError(t, err)
assert.Len(t, export, limit)
// Check integrity of first item
original, err := proto.Marshal(events[0])
require.NoError(t, err)
got, err := proto.Marshal(export[0])
require.NoError(t, err)
require.Equal(t, string(original), string(got))
})
t.Run("before export: DeleteExported", func(t *testing.T) {
affected, err := store.DeletedExported(ctx, time.Now())
require.NoError(t, err)
assert.Zero(t, affected)
})
t.Run("MarkAsExported", func(t *testing.T) {
require.NoError(t, store.MarkAsExported(ctx, eventsToExport))
})
t.Run("after export: QueueForExport", func(t *testing.T) {
export, err := store.ListForExport(ctx, len(events))
require.NoError(t, err)
assert.Len(t, export, len(events)-len(eventsToExport))
})
t.Run("after export: DeleteExported", func(t *testing.T) {
affected, err := store.DeletedExported(ctx, time.Now())
require.NoError(t, err)
assert.Equal(t, int(affected), len(eventsToExport))
})
}

View File

@ -40,7 +40,7 @@ func setEvaluatedFlagToCache(a *actor.Actor, flagName string, value bool) {
return
}
evalStore.HSet(getFlagCacheKey(flagName), visitorID, strconv.FormatBool(value))
_ = evalStore.HSet(getFlagCacheKey(flagName), visitorID, strconv.FormatBool(value))
}
func getVisitorIDForActor(a *actor.Actor) (string, error) {

View File

@ -0,0 +1,43 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("//dev:go_defs.bzl", "go_test")
go_library(
name = "telemetry",
srcs = [
"billing_categories.go",
"billing_products.go",
"events.go",
"telemetry.go",
"telemetrygateway.go",
],
importpath = "github.com/sourcegraph/sourcegraph/internal/telemetry",
visibility = ["//:__subpackages__"],
deps = [
"//internal/telemetry/teestore",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/version",
"@org_golang_google_protobuf//types/known/structpb",
],
)
go_test(
name = "telemetry_test",
srcs = [
"telemetry_test.go",
"telemetrygateway_test.go",
],
embed = [":telemetry"],
tags = ["requires-network"],
deps = [
"//internal/actor",
"//internal/database",
"//internal/database/dbtest",
"//internal/featureflag",
"//internal/telemetry/teestore",
"@com_github_hexops_autogold_v2//:autogold",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//encoding/protojson",
],
)

View File

@ -0,0 +1,12 @@
package telemetry
// billingCategory is used in EventBillingMetadata to identify a product.
//
// This is a private type, requiring the values to be declared in-package
// and preventing strings from being cast to this type.
type billingCategory string
// All billing category IDs in Sourcegraph's Go services.
const (
BillingCategoryExample billingCategory = "EXAMPLE"
)

View File

@ -0,0 +1,12 @@
package telemetry
// billingProduct is used in EventBillingMetadata to identify a product.
//
// This is a private type, requiring the values to be declared in-package
// and preventing strings from being cast to this type.
type billingProduct string
// All billing product IDs in Sourcegraph's Go services.
const (
BillingProductExample billingProduct = "EXAMPLE"
)

View File

@ -0,0 +1,24 @@
package telemetry
// eventFeature defines the feature associated with an event. Values should
// be in camelCase, e.g. 'myFeature'
//
// This is a private type, requiring the values to be declared in-package
// and preventing strings from being cast to this type.
type eventFeature string
// All event names in Sourcegraph's Go services.
const (
FeatureExample eventFeature = "exampleFeature"
)
// eventAction defines the action associated with an event. Values should
// be in camelCase, e.g. 'myAction'
//
// This is a private type, requiring the values to be declared in-package
// and preventing strings from being cast to this type.
type eventAction string
const (
ActionExample eventAction = "exampleAction"
)

View File

@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("//dev:go_defs.bzl", "go_test")
go_library(
name = "teestore",
srcs = ["teestore.go"],
importpath = "github.com/sourcegraph/sourcegraph/internal/telemetry/teestore",
visibility = ["//:__subpackages__"],
deps = [
"//internal/database",
"//internal/featureflag",
"//internal/telemetrygateway/v1:telemetrygateway",
"//lib/errors",
"//lib/pointers",
"@com_github_sourcegraph_conc//pool",
],
)
go_test(
name = "teestore_test",
srcs = ["teestore_test.go"],
embed = [":teestore"],
deps = [
"//internal/telemetrygateway/v1:telemetrygateway",
"//lib/pointers",
"@com_github_hexops_autogold_v2//:autogold",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//types/known/structpb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)

View File

@ -0,0 +1,151 @@
package teestore
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/sourcegraph/conc/pool"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/featureflag"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/pointers"
)
// Store tees events into both the event_logs table and the new telemetry export
// queue, translating the message into the existing event_logs format on a
// best-effort basis.
type Store interface {
StoreEvents(context.Context, []*telemetrygatewayv1.Event) error
}
type store struct {
exportQueue database.TelemetryEventsExportQueueStore
eventLogs database.EventLogStore
}
func NewStore(exportQueue database.TelemetryEventsExportQueueStore, eventLogs database.EventLogStore) Store {
return &store{exportQueue, eventLogs}
}
func (s *store) StoreEvents(ctx context.Context, events []*telemetrygatewayv1.Event) error {
// Write to both stores at the same time.
wg := pool.New().WithErrors()
wg.Go(func() error {
if err := s.exportQueue.QueueForExport(ctx, events); err != nil {
return errors.Wrap(err, "bulk inserting telemetry events")
}
return nil
})
wg.Go(func() error {
if err := s.eventLogs.BulkInsert(ctx, toEventLogs(time.Now, events)); err != nil {
return errors.Wrap(err, "bulk inserting events logs")
}
return nil
})
return wg.Wait()
}
func toEventLogs(now func() time.Time, telemetryEvents []*telemetrygatewayv1.Event) []*database.Event {
eventLogs := make([]*database.Event, len(telemetryEvents))
for i, e := range telemetryEvents {
// Note that all generated proto getters are nil-safe, so use those to
// get fields rather than accessing fields directly.
eventLogs[i] = &database.Event{
ID: 0, // not required on insert
InsertID: nil, // not required on insert
// Identifiers
Name: fmt.Sprintf("%s%s.%s", maybeSourceEventNamePrefix(e.GetSource()), e.GetFeature(), e.GetAction()),
Timestamp: func() time.Time {
if e.GetTimestamp() == nil {
return now()
}
return e.GetTimestamp().AsTime()
}(),
// User
UserID: uint32(e.GetUser().GetUserId()),
AnonymousUserID: e.GetUser().GetAnonymousUserId(),
// GetParameters.Metadata
PublicArgument: func() json.RawMessage {
md := e.GetParameters().GetMetadata()
if len(md) == 0 {
return nil
}
data, err := json.Marshal(md)
if err != nil {
data, _ = json.Marshal(map[string]string{"marshal.error": err.Error()})
}
return data
}(),
// GetParameters.PrivateMetadata
Argument: func() json.RawMessage {
md := e.GetParameters().GetPrivateMetadata().AsMap()
if len(md) == 0 {
return nil
}
data, err := json.Marshal(md)
if err != nil {
data, _ = json.Marshal(map[string]string{"marshal.error": err.Error()})
}
return data
}(),
// Parameters.BillingMetadata
BillingProductCategory: pointers.NonZeroPtr(e.GetParameters().GetBillingMetadata().GetCategory()),
BillingEventID: nil, // No equivalents in telemetry events
// Source.Client
Source: func() string {
if source := e.GetSource().GetClient().GetName(); source != "" {
return source
}
return "BACKEND" // must be non-empty
}(),
Client: func() *string {
if c := e.GetSource().GetClient(); c != nil {
return pointers.Ptr(fmt.Sprintf("%s:%s",
c.GetName(), c.GetVersion()))
}
return nil
}(),
// Source.Server
Version: e.GetSource().GetServer().GetVersion(),
// MarketingTracking
URL: e.GetMarketingTracking().GetUrl(),
CohortID: pointers.NonZeroPtr(e.GetMarketingTracking().GetCohortId()),
FirstSourceURL: pointers.NonZeroPtr(e.GetMarketingTracking().GetFirstSourceUrl()),
LastSourceURL: pointers.NonZeroPtr(e.GetMarketingTracking().GetLastSourceUrl()),
Referrer: pointers.NonZeroPtr(e.GetMarketingTracking().GetReferrer()),
DeviceID: pointers.NonZeroPtr(e.GetMarketingTracking().GetDeviceSessionId()),
// FeatureFlags
EvaluatedFlagSet: func() featureflag.EvaluatedFlagSet {
flags := e.GetFeatureFlags().GetFlags()
set := make(featureflag.EvaluatedFlagSet, len(flags))
for k, v := range flags {
// We can expect all values to be bools for now
set[k], _ = strconv.ParseBool(v)
}
return set
}(),
}
}
return eventLogs
}
func maybeSourceEventNamePrefix(s *telemetrygatewayv1.EventSource) string {
if name := s.GetClient().GetName(); name != "" {
return name + ":"
}
return ""
}

View File

@ -0,0 +1,163 @@
package teestore
import (
"encoding/json"
"testing"
"time"
"github.com/hexops/autogold/v2"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/lib/pointers"
)
// see TestRecorderEndToEnd for tests that include teestore.Store and the database
func TestToEventLogs(t *testing.T) {
testCases := []struct {
name string
events []*telemetrygatewayv1.Event
expectEventLogs autogold.Value
}{
{
name: "handles all nil",
events: nil,
expectEventLogs: autogold.Expect("[]"),
},
{
name: "handles nil entry",
events: []*telemetrygatewayv1.Event{nil},
expectEventLogs: autogold.Expect(`[
{
"ID": 0,
"Name": ".",
"URL": "",
"UserID": 0,
"AnonymousUserID": "",
"Argument": null,
"PublicArgument": null,
"Source": "BACKEND",
"Version": "",
"Timestamp": "2022-11-03T02:00:00Z",
"EvaluatedFlagSet": {},
"CohortID": null,
"FirstSourceURL": null,
"LastSourceURL": null,
"Referrer": null,
"DeviceID": null,
"InsertID": null,
"Client": null,
"BillingProductCategory": null,
"BillingEventID": null
}
]`),
},
{
name: "handles nil fields",
events: []*telemetrygatewayv1.Event{{}},
expectEventLogs: autogold.Expect(`[
{
"ID": 0,
"Name": ".",
"URL": "",
"UserID": 0,
"AnonymousUserID": "",
"Argument": null,
"PublicArgument": null,
"Source": "BACKEND",
"Version": "",
"Timestamp": "2022-11-03T02:00:00Z",
"EvaluatedFlagSet": {},
"CohortID": null,
"FirstSourceURL": null,
"LastSourceURL": null,
"Referrer": null,
"DeviceID": null,
"InsertID": null,
"Client": null,
"BillingProductCategory": null,
"BillingEventID": null
}
]`),
},
{
name: "simple event",
events: []*telemetrygatewayv1.Event{{
Id: "1",
Timestamp: timestamppb.New(time.Date(2022, 11, 2, 1, 0, 0, 0, time.UTC)),
Feature: "CodeSearch",
Action: "Seach",
Source: &telemetrygatewayv1.EventSource{
Client: &telemetrygatewayv1.EventSource_Client{
Name: "VSCODE",
Version: pointers.Ptr("1.2.3"),
},
Server: &telemetrygatewayv1.EventSource_Server{
Version: "dev",
},
},
Parameters: &telemetrygatewayv1.EventParameters{
Metadata: map[string]int64{"public": 2},
PrivateMetadata: &structpb.Struct{Fields: map[string]*structpb.Value{
"private": structpb.NewStringValue("sensitive-data"),
}},
BillingMetadata: &telemetrygatewayv1.EventBillingMetadata{
Product: "product",
Category: "category",
},
},
User: &telemetrygatewayv1.EventUser{
UserId: pointers.Ptr(int64(1234)),
AnonymousUserId: pointers.Ptr("anonymous"),
},
MarketingTracking: &telemetrygatewayv1.EventMarketingTracking{
Url: pointers.Ptr("sourcegraph.com/foobar"),
},
}},
expectEventLogs: autogold.Expect(`[
{
"ID": 0,
"Name": "VSCODE:CodeSearch.Seach",
"URL": "sourcegraph.com/foobar",
"UserID": 1234,
"AnonymousUserID": "anonymous",
"Argument": {
"private": "sensitive-data"
},
"PublicArgument": {
"public": 2
},
"Source": "VSCODE",
"Version": "dev",
"Timestamp": "2022-11-02T01:00:00Z",
"EvaluatedFlagSet": {},
"CohortID": null,
"FirstSourceURL": null,
"LastSourceURL": null,
"Referrer": null,
"DeviceID": null,
"InsertID": null,
"Client": "VSCODE:1.2.3",
"BillingProductCategory": "category",
"BillingEventID": null
}
]`),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eventLogs := toEventLogs(
func() time.Time { return time.Date(2022, 11, 3, 2, 0, 0, 0, time.UTC) },
tc.events)
require.Len(t, eventLogs, len(tc.events))
// Compare JSON for ease of reading
data, err := json.MarshalIndent(eventLogs, "", " ")
require.NoError(t, err)
tc.expectEventLogs.Equal(t, string(data))
})
}
}

View File

@ -0,0 +1,76 @@
// Package telemetry implements "Telemetry V2", which supercedes event_logs
// as the mechanism for reporting telemetry from all Sourcegraph instances to
// Sourcergraph.
package telemetry
import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/telemetry/teestore"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
type Event struct {
Feature eventFeature
Action eventAction
Parameters EventParameters
}
// constString effectively requires strings to be statically defined constants.
type constString string
// EventMetadata is secure, PII-free metadata that can be attached to events.
// Keys must be const strings.
type EventMetadata map[constString]int64
// EventBillingMetadata records metadata that attributes the event to product
// billing categories.
type EventBillingMetadata struct {
// Product identifier.
Product billingProduct
// Category identifier.
Category billingCategory
}
type EventParameters struct {
// Version can be used to denote the "shape" of this event.
Version int
// Metadata is PII-free metadata about the event that we export.
Metadata EventMetadata
// PrivateMetadata is arbitrary metadata that is generally not exported.
PrivateMetadata map[string]any
// BillingMetadata contains metadata we can use for billing purposes.
BillingMetadata *EventBillingMetadata
}
// EventRecorder is for creating and recording telemetry events in the backend
// using Telemetry V2, which exports events to Sourcergraph.
type EventRecorder struct{ teestore teestore.Store }
// ❗ Experimental - do not use!
func NewEventRecorder(store teestore.Store) *EventRecorder {
return &EventRecorder{teestore: store}
}
// Record records a single telemetry event with the context's Sourcegraph
// actor.
func (r *EventRecorder) Record(ctx context.Context, feature eventFeature, action eventAction, parameters EventParameters) error {
return r.teestore.StoreEvents(ctx, []*telemetrygatewayv1.Event{
newTelemetryGatewayEvent(ctx, time.Now(), telemetrygatewayv1.DefaultEventIDFunc, feature, action, parameters),
})
}
// BatchRecord records a set of telemetry events with the context's
// Sourcegraph actor.
func (r *EventRecorder) BatchRecord(ctx context.Context, events ...Event) error {
if len(events) == 0 {
return nil
}
rawEvents := make([]*telemetrygatewayv1.Event, len(events))
for i, e := range events {
rawEvents[i] = newTelemetryGatewayEvent(ctx, time.Now(), telemetrygatewayv1.DefaultEventIDFunc, e.Feature, e.Action, e.Parameters)
}
return r.teestore.StoreEvents(ctx, rawEvents)
}

View File

@ -0,0 +1,67 @@
package telemetry_test
import (
"context"
"testing"
"github.com/sourcegraph/log/logtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/featureflag"
"github.com/sourcegraph/sourcegraph/internal/telemetry"
"github.com/sourcegraph/sourcegraph/internal/telemetry/teestore"
)
func TestRecorderEndToEnd(t *testing.T) {
var userID int32 = 123
ctx := actor.WithActor(context.Background(), actor.FromMockUser(userID))
// Context with FF enabled.
ff := featureflag.NewMemoryStore(
map[string]bool{database.FeatureFlagTelemetryExport: true}, nil, nil)
ctx = featureflag.WithFlags(ctx, ff)
logger := logtest.Scoped(t)
db := database.NewDB(logger, dbtest.NewDB(logger, t))
recorder := telemetry.NewEventRecorder(teestore.NewStore(db.TelemetryEventsExportQueue(), db.EventLogs()))
wantEvents := 3
t.Run("Record and BatchRecord", func(t *testing.T) {
assert.NoError(t, recorder.Record(ctx,
"Test", "Action1",
telemetry.EventParameters{
Metadata: telemetry.EventMetadata{
"metadata": 1,
},
PrivateMetadata: map[string]any{
"private": "sensitive",
},
}))
assert.NoError(t, recorder.BatchRecord(ctx,
telemetry.Event{
Feature: "Test",
Action: "Action2",
},
telemetry.Event{
Feature: "Test",
Action: "Action3",
}))
})
t.Run("tee to EventLogs", func(t *testing.T) {
eventLogs, err := db.EventLogs().ListAll(ctx, database.EventLogsListOptions{UserID: userID})
require.NoError(t, err)
assert.Len(t, eventLogs, wantEvents)
})
t.Run("tee to TelemetryEvents", func(t *testing.T) {
telemetryEvents, err := db.TelemetryEventsExportQueue().ListForExport(ctx, 999)
require.NoError(t, err)
assert.Len(t, telemetryEvents, wantEvents)
})
}

View File

@ -0,0 +1,69 @@
package telemetry
import (
"context"
"time"
"google.golang.org/protobuf/types/known/structpb"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/internal/version"
)
// newTelemetryGatewayEvent translates recording to raw events for storage and
// export. It extracts actor from context as the event user.
func newTelemetryGatewayEvent(
ctx context.Context,
now time.Time,
newUUID func() string,
feature eventFeature,
action eventAction,
parameters EventParameters,
) *telemetrygatewayv1.Event {
event := telemetrygatewayv1.NewEventWithDefaults(ctx, now, newUUID)
event.Feature = string(feature)
event.Action = string(action)
event.Source = &telemetrygatewayv1.EventSource{
Server: &telemetrygatewayv1.EventSource_Server{
Version: version.Version(),
},
Client: nil, // no client, this is recorded directly in backend
}
event.Parameters = &telemetrygatewayv1.EventParameters{
Version: int32(parameters.Version),
Metadata: func() map[string]int64 {
if len(parameters.Metadata) == 0 {
return nil
}
m := make(map[string]int64, len(parameters.Metadata))
for k, v := range parameters.Metadata {
m[string(k)] = v
}
return m
}(),
PrivateMetadata: func() *structpb.Struct {
if len(parameters.PrivateMetadata) == 0 {
return nil
}
s, err := structpb.NewStruct(parameters.PrivateMetadata)
if err != nil {
return &structpb.Struct{
Fields: map[string]*structpb.Value{
"telemetry.error": structpb.NewStringValue("failed to marshal private metadata: " + err.Error()),
},
}
}
return s
}(),
BillingMetadata: func() *telemetrygatewayv1.EventBillingMetadata {
if parameters.BillingMetadata == nil {
return nil
}
return &telemetrygatewayv1.EventBillingMetadata{
Product: string(parameters.BillingMetadata.Product),
Category: string(parameters.BillingMetadata.Category),
}
}(),
}
return event
}

View File

@ -0,0 +1,158 @@
package telemetry
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/hexops/autogold/v2"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
"github.com/sourcegraph/sourcegraph/internal/actor"
)
func TestMakeRawEvent(t *testing.T) {
staticTime, err := time.Parse(time.RFC3339, "2023-02-24T14:48:30Z")
require.NoError(t, err)
for _, tc := range []struct {
name string
ctx context.Context
event Event
expect autogold.Value
}{
{
name: "basic",
ctx: context.Background(),
event: Event{
Feature: FeatureExample,
Action: ActionExample,
},
expect: autogold.Expect(`{
"action": "exampleAction",
"feature": "exampleFeature",
"id": "basic",
"parameters": {},
"source": {
"server": {
"version": "0.0.0+dev"
}
},
"timestamp": "2023-02-24T14:48:30Z"
}`),
},
{
name: "with anonymous user",
ctx: actor.WithActor(context.Background(), actor.FromAnonymousUser("1234")),
event: Event{
Feature: FeatureExample,
Action: ActionExample,
},
expect: autogold.Expect(`{
"action": "exampleAction",
"feature": "exampleFeature",
"id": "with anonymous user",
"parameters": {},
"source": {
"server": {
"version": "0.0.0+dev"
}
},
"timestamp": "2023-02-24T14:48:30Z",
"user": {
"anonymousUserId": "1234"
}
}`),
},
{
name: "with authenticated user",
ctx: actor.WithActor(context.Background(), actor.FromMockUser(1234)),
event: Event{
Feature: FeatureExample,
Action: ActionExample,
},
expect: autogold.Expect(`{
"action": "exampleAction",
"feature": "exampleFeature",
"id": "with authenticated user",
"parameters": {},
"source": {
"server": {
"version": "0.0.0+dev"
}
},
"timestamp": "2023-02-24T14:48:30Z",
"user": {
"userId": "1234"
}
}`),
},
{
name: "with parameters",
ctx: context.Background(),
event: Event{
Feature: FeatureExample,
Action: ActionExample,
Parameters: EventParameters{
Version: 0,
Metadata: EventMetadata{
"foobar": 3,
},
PrivateMetadata: map[string]any{
"barbaz": "hello world!",
},
BillingMetadata: &EventBillingMetadata{
Product: BillingProductExample,
Category: BillingCategoryExample,
},
},
},
expect: autogold.Expect(`{
"action": "exampleAction",
"feature": "exampleFeature",
"id": "with parameters",
"parameters": {
"billingMetadata": {
"category": "EXAMPLE",
"product": "EXAMPLE"
},
"metadata": {
"foobar": "3"
},
"privateMetadata": {
"barbaz": "hello world!"
}
},
"source": {
"server": {
"version": "0.0.0+dev"
}
},
"timestamp": "2023-02-24T14:48:30Z"
}`),
},
} {
t.Run(tc.name, func(t *testing.T) {
got := newTelemetryGatewayEvent(tc.ctx,
staticTime,
func() string { return tc.name },
tc.event.Feature,
tc.event.Action,
tc.event.Parameters)
protodata, err := protojson.Marshal(got)
require.NoError(t, err)
// Protojson output isn't stable by injecting randomized whitespace,
// so we re-marshal it to stabilize the output for golden tests.
// https://github.com/golang/protobuf/issues/1082
var gotJSON map[string]any
require.NoError(t, json.Unmarshal(protodata, &gotJSON))
jsondata, err := json.MarshalIndent(gotJSON, "", " ")
require.NoError(t, err)
tc.expect.Equal(t, string(jsondata))
})
}
}

View File

View File

@ -29,18 +29,30 @@ go_proto_library(
go_library(
name = "telemetrygateway",
srcs = ["event.go"],
embed = [":v1_go_proto"],
importpath = "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1",
visibility = [
"//cmd/frontend/internal/telemetry/resolvers:__pkg__",
"//cmd/telemetrygateway/server:__pkg__",
"//cmd/telemetrygateway/shared:__pkg__",
"//internal/api:__pkg__",
"//internal/database:__pkg__",
"//internal/extsvc/gitolite:__pkg__",
"//internal/telemetry:__pkg__",
"//internal/telemetry/teestore:__pkg__",
"//internal/telemetrygateway:__pkg__",
"//internal/telemetrygateway/gitdomain:__pkg__",
"//internal/telemetrygateway/integration_tests:__pkg__",
"//internal/telemetrygateway/protocol:__pkg__",
],
deps = [
"//internal/actor",
"//internal/featureflag",
"//lib/pointers",
"@com_github_google_uuid//:uuid",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
# See https://github.com/sourcegraph/sourcegraph/issues/50032
@ -56,3 +68,25 @@ buf_lint_test(
config = "//internal:buf.yaml",
targets = [":v1_proto"],
)
go_test(
name = "telemetrygateway_test",
srcs = [
"backcompat_test.go",
"event_test.go",
"main_test.go",
],
data = glob(["testdata/**"]),
embed = [":telemetrygateway"],
deps = [
"//internal/actor",
"//lib/pointers",
"@com_github_hexops_autogold_v2//:autogold",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//encoding/protojson",
"@org_golang_google_protobuf//proto",
"@org_golang_google_protobuf//types/known/structpb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)

View File

@ -0,0 +1,125 @@
package v1_test
import (
"flag"
"fmt"
"io/fs"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/sourcegraph/sourcegraph/lib/pointers"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
var (
createSnapshot = flag.Bool("snapshot", false, "generate a new snapshot")
snapshotsDir = filepath.Join("testdata", "snapshots")
// sampleEvent should have all fields populated - a snapshot from this event
// is generated with the -snapshot flag, see TestBackcompat() for more
// details.
sampleEvent = &telemetrygatewayv1.Event{
Id: "1234",
Feature: "Feature",
Action: "Action",
Timestamp: timestamppb.New(must(time.Parse(time.RFC3339, "2023-02-24T14:48:30Z"))),
Source: &telemetrygatewayv1.EventSource{
Server: &telemetrygatewayv1.EventSource_Server{
Version: "dev",
},
Client: &telemetrygatewayv1.EventSource_Client{
Name: "CLIENT",
Version: pointers.Ptr("VERSION"),
},
},
Parameters: &telemetrygatewayv1.EventParameters{
Version: 1,
Metadata: map[string]int64{"metadata": 1},
PrivateMetadata: must(structpb.NewStruct(map[string]any{"private": "data"})),
BillingMetadata: &telemetrygatewayv1.EventBillingMetadata{
Product: "Product",
Category: "Category",
},
},
User: &telemetrygatewayv1.EventUser{
UserId: pointers.Ptr(int64(1234)),
AnonymousUserId: pointers.Ptr("anonymous"),
},
FeatureFlags: &telemetrygatewayv1.EventFeatureFlags{
Flags: map[string]string{"feature": "true"},
},
MarketingTracking: &telemetrygatewayv1.EventMarketingTracking{
Url: pointers.Ptr("value"),
FirstSourceUrl: pointers.Ptr("value"),
CohortId: pointers.Ptr("value"),
Referrer: pointers.Ptr("value"),
LastSourceUrl: pointers.Ptr("value"),
DeviceSessionId: pointers.Ptr("value"),
SessionReferrer: pointers.Ptr("value"),
SessionFirstUrl: pointers.Ptr("value"),
},
}
)
// TestBackcompat asserts that past events marshalled in the proto wire format,
// tracked in internal/telemetrygateway/v1/testdata/snapshots, continue to be
// able to be marshalled by the current v1 types to ensure we don't introduce
// any breaking changes.
//
// New snapshots should be manually created as the spec evolves by updating
// sampleEvent and running the test with the '-snapshot' flag:
//
// go test -v ./internal/telemetrygateway/v1 -snapshot
//
// Without the '-snapshot' flag, this test just loads existing snapshots and
// asserts they can still be unmarshalled.
func TestBackcompat(t *testing.T) {
if *createSnapshot {
data, err := proto.Marshal(sampleEvent)
require.NoError(t, err)
f := filepath.Join(snapshotsDir, time.Now().Format(time.DateOnly)+".pb")
if _, err := os.Stat(f); err == nil {
t.Logf("Snapshot %s exists, recreating it", f)
_ = os.Remove(f)
}
require.NoError(t, os.WriteFile(f, data, 0644))
t.Logf("Wrote snapshot to %s", f)
}
var tested int
require.NoError(t, filepath.WalkDir(snapshotsDir, func(path string, d fs.DirEntry, err error) error {
if d.IsDir() {
return nil
}
tested += 1
t.Run(fmt.Sprintf("snapshot %s", path), func(t *testing.T) {
data, err := os.ReadFile(path)
require.NoError(t, err)
// Existing snapshot must unmarshal without error.
var event telemetrygatewayv1.Event
assert.NoError(t, proto.Unmarshal(data, &event))
// TODO: Assert somehow that the unmarshalled event looks as expected.
})
return nil
}))
t.Logf("Tested %d snapshots", tested)
}
func must[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}

View File

@ -0,0 +1,50 @@
package v1
import (
"context"
"strconv"
"time"
"github.com/google/uuid"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/featureflag"
"github.com/sourcegraph/sourcegraph/lib/pointers"
)
// DefaultEventIDFunc is the default generator for telemetry event IDs.
var DefaultEventIDFunc = uuid.NewString
// NewEventWithDefaults creates a uniform event with defaults filled in. All
// constructors making raw events should start with this. In particular, this
// adds any relevant data required from context.
func NewEventWithDefaults(ctx context.Context, now time.Time, newEventID func() string) *Event {
return &Event{
Id: newEventID(),
Timestamp: timestamppb.New(now),
User: func() *EventUser {
act := actor.FromContext(ctx)
if !act.IsAuthenticated() && act.AnonymousUID == "" {
return nil
}
return &EventUser{
UserId: pointers.NonZeroPtr(int64(act.UID)),
AnonymousUserId: pointers.NonZeroPtr(act.AnonymousUID),
}
}(),
FeatureFlags: func() *EventFeatureFlags {
flags := featureflag.GetEvaluatedFlagSet(ctx)
if len(flags) == 0 {
return nil
}
data := make(map[string]string, len(flags))
for k, v := range flags {
data[k] = strconv.FormatBool(v)
}
return &EventFeatureFlags{
Flags: data,
}
}(),
}
}

View File

@ -0,0 +1,53 @@
package v1_test
import (
context "context"
"encoding/json"
"testing"
"time"
"github.com/hexops/autogold/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
"github.com/sourcegraph/sourcegraph/internal/actor"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
func TestNewEventWithDefaults(t *testing.T) {
staticTime, err := time.Parse(time.RFC3339, "2023-02-24T14:48:30Z")
require.NoError(t, err)
t.Run("extract actor and flags", func(t *testing.T) {
var userID int32 = 123
ctx := actor.WithActor(context.Background(), actor.FromMockUser(userID))
// NOTE: We can't test the feature flag part easily because
// featureflag.GetEvaluatedFlagSet depends on Redis, and the package
// is not designed for it to easily be stubbed out for testing.
// Since it's used for existing telemetry, we trust it works.
got := telemetrygatewayv1.NewEventWithDefaults(ctx, staticTime, func() string { return "id" })
assert.NotNil(t, got.User)
protodata, err := protojson.Marshal(got)
require.NoError(t, err)
// Protojson output isn't stable by injecting randomized whitespace,
// so we re-marshal it to stabilize the output for golden tests.
// https://github.com/golang/protobuf/issues/1082
var gotJSON map[string]any
require.NoError(t, json.Unmarshal(protodata, &gotJSON))
jsondata, err := json.MarshalIndent(gotJSON, "", " ")
require.NoError(t, err)
autogold.Expect(`{
"id": "id",
"timestamp": "2023-02-24T14:48:30Z",
"user": {
"userId": "123"
}
}`).Equal(t, string(jsondata))
})
}

View File

@ -0,0 +1,12 @@
package v1
import (
"flag"
"os"
"testing"
)
func TestMain(m *testing.M) {
flag.Parse()
os.Exit(m.Run())
}

View File

@ -0,0 +1,12 @@
1234¾œãŸFeature"Action*

dev
CLIENTVERSION2:
metadata

privatedata"
ProductCategory:Ò  anonymousB

featuretrueJ8
valuevaluevalue"value*value2value:valueBvalue

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS telemetry_events_export_queue;

View File

@ -0,0 +1,2 @@
name: Add TelemetryEventsExportQueueStore
parents: [1691759644, 1693825517]

View File

@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS telemetry_events_export_queue (
id TEXT PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
payload_pb BYTEA NOT NULL,
exported_at TIMESTAMPTZ DEFAULT NULL
);

View File

@ -4677,6 +4677,13 @@ CREATE SEQUENCE teams_id_seq
ALTER SEQUENCE teams_id_seq OWNED BY teams.id;
CREATE TABLE telemetry_events_export_queue (
id text NOT NULL,
"timestamp" timestamp with time zone NOT NULL,
payload_pb bytea NOT NULL,
exported_at timestamp with time zone
);
CREATE TABLE temporary_settings (
id integer NOT NULL,
user_id integer NOT NULL,
@ -5761,6 +5768,9 @@ ALTER TABLE ONLY team_members
ALTER TABLE ONLY teams
ADD CONSTRAINT teams_pkey PRIMARY KEY (id);
ALTER TABLE ONLY telemetry_events_export_queue
ADD CONSTRAINT telemetry_events_export_queue_pkey PRIMARY KEY (id);
ALTER TABLE ONLY temporary_settings
ADD CONSTRAINT temporary_settings_pkey PRIMARY KEY (id);