telemetrygateway: add exporter and service (#56699)

This change adds:

- telemetry export background jobs: flagged behind `TELEMETRY_GATEWAY_EXPORTER_EXPORT_ADDR`, default empty => disabled
- telemetry redaction: configured in package `internal/telemetry/sensitivemetadataallowlist`
- telemetry-gateway service receiving events and forwarding it to a pub/sub topic (or just logging it, as configured in local dev)
- utilities for easily creating an event recorder: `internal/telemetry/telemetryrecorder`
Notes:

- all changes are feature-flagged to some degree, off by default, so the merge should be fairly low-risk.
- we decided that transmitting the full license key continues to be the way to go. we transmit it once per stream and attach it on all events in the telemetry-gateway. there is no auth mechanism at the moment
- GraphQL return type `EventLog.Source` is now a plain string instead of string enum. This should not be a breaking change in our clients, but must be made so that our generated V2 events do not break requesting of event logs

Stacked on https://github.com/sourcegraph/sourcegraph/pull/56520

Closes https://github.com/sourcegraph/sourcegraph/issues/56289
Closes https://github.com/sourcegraph/sourcegraph/issues/56287

## Test plan

Add an override to make the export super frequent:

```
env:
  TELEMETRY_GATEWAY_EXPORTER_EXPORT_INTERVAL: "10s"
  TELEMETRY_GATEWAY_EXPORTER_EXPORTED_EVENTS_RETENTION: "5m"
```

Start sourcegraph:

```
sg start
```

Enable `telemetry-export` featureflag (from https://github.com/sourcegraph/sourcegraph/pull/56520)

Emit some events in GraphQL:

```gql
mutation {
  telemetry {
    recordEvents(events:[{
      feature:"foobar"
      action:"view"
      source:{
        client:"WEB"
      }
      parameters:{
        version:0
      }
    }]) {
      alwaysNil
    }
  }
```

See series of log events:

```
[         worker] INFO worker.telemetrygateway-exporter telemetrygatewayexporter/telemetrygatewayexporter.go:61 Telemetry Gateway export enabled - initializing background routines
[         worker] INFO worker.telemetrygateway-exporter telemetrygatewayexporter/exporter.go:99 exporting events {"maxBatchSize": 10000, "count": 1}
[telemetry-g...y] INFO telemetry-gateway.pubsub pubsub/topic.go:115 Publish {"TraceId": "7852903434f0d2f647d397ee83b4009b", "SpanId": "8d945234bccf319b", "message": "{\"event\":{\"id\":\"dc96ae84-4ac4-4760-968f-0a0307b8bb3d\",\"timestamp\":\"2023-09-19T01:57:13.590266Z\",\"feature\":\"foobar\", ....
```

Build:

```
export VERSION="insiders"
bazel run //cmd/telemetry-gateway:candidate_push --config darwin-docker --stamp --workspace_status_command=./dev/bazel_stamp_vars.sh -- --tag $VERSION --repository us.gcr.io/sourcegraph-dev/telemetry-gateway
```

Deploy: https://github.com/sourcegraph/managed-services/pull/7

Add override:

```yaml
env:
  # Port required. TODO: What's the best way to provide gRPC addresses, such that a
  # localhost address is also possible?
  TELEMETRY_GATEWAY_EXPORTER_EXPORT_ADDR: "https://telemetry-gateway.sgdev.org:443"
```

Repeat the above (`sg start` and emit some events):

```
[         worker] INFO worker.telemetrygateway-exporter telemetrygatewayexporter/exporter.go:94 exporting events {"maxBatchSize": 10000, "count": 6}
[         worker] INFO worker.telemetrygateway-exporter telemetrygatewayexporter/exporter.go:113 events exported {"maxBatchSize": 10000, "succeeded": 6}
[         worker] INFO worker.telemetrygateway-exporter telemetrygatewayexporter/exporter.go:94 exporting events {"maxBatchSize": 10000, "count": 1}
[         worker] INFO worker.telemetrygateway-exporter telemetrygatewayexporter/exporter.go:113 events exported {"maxBatchSize": 10000, "succeeded": 1}
```
This commit is contained in:
Robert Lin 2023-09-19 22:20:15 -07:00 committed by GitHub
parent fb2a4a670e
commit e835a66c76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
73 changed files with 2275 additions and 69 deletions

View File

@ -177,7 +177,7 @@ func (s *Source) checkAccessToken(ctx context.Context, token string) (*dotcom.Ch
}
for _, gqlerr := range gqlerrs {
if gqlerr.Extensions != nil && gqlerr.Extensions["code"] == codygateway.GQLErrCodeProductSubscriptionNotFound {
if gqlerr.Extensions != nil && gqlerr.Extensions["code"] == productsubscription.GQLErrCodeProductSubscriptionNotFound {
return nil, actor.ErrAccessTokenDenied{
Source: s.Name(),
Reason: "associated product subscription not found",

View File

@ -3,16 +3,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "auth",
srcs = [
"auth.go",
"bearer.go",
],
srcs = ["auth.go"],
importpath = "github.com/sourcegraph/sourcegraph/cmd/cody-gateway/internal/auth",
visibility = ["//cmd/cody-gateway:__subpackages__"],
deps = [
"//cmd/cody-gateway/internal/actor",
"//cmd/cody-gateway/internal/events",
"//cmd/cody-gateway/internal/response",
"//internal/authbearer",
"//internal/codygateway",
"//internal/trace",
"//lib/errors",
@ -35,6 +33,7 @@ go_test(
"//cmd/cody-gateway/internal/events",
"//internal/codygateway",
"//internal/licensing",
"//internal/productsubscription",
"//lib/errors",
"@com_github_derision_test_go_mockgen//testutil/require",
"@com_github_gregjones_httpcache//:httpcache",

View File

@ -8,6 +8,7 @@ import (
"github.com/sourcegraph/sourcegraph/cmd/cody-gateway/internal/actor"
"github.com/sourcegraph/sourcegraph/cmd/cody-gateway/internal/events"
"github.com/sourcegraph/sourcegraph/cmd/cody-gateway/internal/response"
"github.com/sourcegraph/sourcegraph/internal/authbearer"
"github.com/sourcegraph/sourcegraph/internal/codygateway"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/lib/errors"
@ -22,7 +23,7 @@ type Authenticator struct {
func (a *Authenticator) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger := trace.Logger(r.Context(), a.Logger)
token, err := ExtractBearer(r.Header)
token, err := authbearer.ExtractBearer(r.Header)
if err != nil {
response.JSONError(logger, w, http.StatusBadRequest, err)
return

View File

@ -22,6 +22,7 @@ import (
"github.com/sourcegraph/sourcegraph/cmd/cody-gateway/internal/events"
"github.com/sourcegraph/sourcegraph/internal/codygateway"
"github.com/sourcegraph/sourcegraph/internal/licensing"
internalproductsubscription "github.com/sourcegraph/sourcegraph/internal/productsubscription"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -153,7 +154,7 @@ func TestAuthenticatorMiddleware(t *testing.T) {
return gqlerror.List{
{
Message: "access denied",
Extensions: map[string]any{"code": codygateway.GQLErrCodeProductSubscriptionNotFound},
Extensions: map[string]any{"code": internalproductsubscription.GQLErrCodeProductSubscriptionNotFound},
},
}
})

View File

@ -19,6 +19,7 @@ go_library(
"//cmd/cody-gateway/internal/limiter",
"//cmd/cody-gateway/internal/notify",
"//cmd/cody-gateway/internal/response",
"//internal/authbearer",
"//internal/httpcli",
"//internal/instrumentation",
"//internal/redispool",

View File

@ -12,9 +12,9 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"github.com/sourcegraph/sourcegraph/cmd/cody-gateway/internal/actor"
"github.com/sourcegraph/sourcegraph/cmd/cody-gateway/internal/auth"
"github.com/sourcegraph/sourcegraph/cmd/cody-gateway/internal/httpapi/requestlogger"
"github.com/sourcegraph/sourcegraph/cmd/cody-gateway/internal/response"
"github.com/sourcegraph/sourcegraph/internal/authbearer"
"github.com/sourcegraph/sourcegraph/internal/instrumentation"
"github.com/sourcegraph/sourcegraph/internal/redispool"
sgtrace "github.com/sourcegraph/sourcegraph/internal/trace"
@ -30,7 +30,7 @@ func NewDiagnosticsHandler(baseLogger log.Logger, next http.Handler, secret stri
baseLogger = baseLogger.Scoped("diagnostics", "healthz checks")
hasValidSecret := func(l log.Logger, w http.ResponseWriter, r *http.Request) (yes bool) {
token, err := auth.ExtractBearer(r.Header)
token, err := authbearer.ExtractBearer(r.Header)
if err != nil {
response.JSONError(l, w, http.StatusBadRequest, err)
return false

View File

@ -8791,7 +8791,7 @@ type EventLog {
"""
The source of the event.
"""
source: EventSource!
source: String!
"""
The additional argument information.
"""

View File

@ -5,8 +5,8 @@ import (
"fmt"
"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
"github.com/sourcegraph/sourcegraph/internal/codygateway"
"github.com/sourcegraph/sourcegraph/internal/errcode"
internalproductsubscription "github.com/sourcegraph/sourcegraph/internal/productsubscription"
)
type ErrProductSubscriptionNotFound struct {
@ -21,7 +21,7 @@ func (e ErrProductSubscriptionNotFound) Error() string {
}
func (e ErrProductSubscriptionNotFound) Extensions() map[string]any {
return map[string]any{"code": codygateway.GQLErrCodeProductSubscriptionNotFound}
return map[string]any{"code": internalproductsubscription.GQLErrCodeProductSubscriptionNotFound}
}
// ProductSubscriptionByAccessToken retrieves the subscription corresponding to the

View File

@ -17,7 +17,7 @@ import (
// Resolver is the GraphQL resolver of all things related to telemetry V2.
type Resolver struct {
logger log.Logger
teestore teestore.Store
teestore *teestore.Store
}
// New returns a new Resolver whose store uses the given database

View File

@ -0,0 +1,72 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
load("@rules_oci//oci:defs.bzl", "oci_image", "oci_push", "oci_tarball")
load("@rules_pkg//:pkg.bzl", "pkg_tar")
load("@container_structure_test//:defs.bzl", "container_structure_test")
load("//dev:oci_defs.bzl", "image_repository")
go_binary(
name = "telemetry-gateway",
embed = [":telemetry-gateway_lib"],
visibility = ["//visibility:public"],
x_defs = {
"github.com/sourcegraph/sourcegraph/internal/version.version": "{STABLE_VERSION}",
"github.com/sourcegraph/sourcegraph/internal/version.timestamp": "{VERSION_TIMESTAMP}",
},
)
pkg_tar(
name = "tar_telemetry-gateway",
srcs = [":telemetry-gateway"],
)
oci_image(
name = "image",
base = "@wolfi_base",
entrypoint = [
"/sbin/tini",
"--",
"/telemetry-gateway",
],
tars = [":tar_telemetry-gateway"],
user = "sourcegraph",
)
oci_tarball(
name = "image_tarball",
image = ":image",
repo_tags = ["telemetry-gateway:candidate"],
)
container_structure_test(
name = "image_test",
timeout = "short",
configs = ["image_test.yaml"],
driver = "docker",
image = ":image",
tags = [
"exclusive",
"requires-network",
],
)
oci_push(
name = "candidate_push",
image = ":image",
repository = image_repository("telemetry-gateway"),
)
go_library(
name = "telemetry-gateway_lib",
srcs = ["main.go"],
importpath = "github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway",
visibility = ["//visibility:private"],
deps = [
"//cmd/telemetry-gateway/shared",
"//internal/conf",
"//internal/env",
"//internal/sanitycheck",
"//internal/service/svcmain",
"@com_github_getsentry_sentry_go//:sentry-go",
"@com_github_sourcegraph_log//:log",
],
)

View File

@ -0,0 +1,3 @@
# See https://github.com/sourcegraph/codenotify for documentation.
**/* @core-services

View File

@ -0,0 +1,15 @@
schemaVersion: "2.0.0"
commandTests:
- name: "binary is runnable"
command: "/telemetry-gateway"
envVars:
- key: "SANITY_CHECK"
value: "true"
- name: "not running as root"
command: "/usr/bin/id"
args:
- -u
excludedOutput: ["^0"]
exitCode: 0

View File

@ -0,0 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "diagnosticsserver",
srcs = ["diagnosticsserver.go"],
importpath = "github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/diagnosticsserver",
visibility = ["//cmd/telemetry-gateway:__subpackages__"],
deps = [
"//internal/authbearer",
"//internal/trace",
"//internal/version",
"@com_github_sourcegraph_log//:log",
],
)

View File

@ -0,0 +1,71 @@
package diagnosticsserver
import (
"context"
"encoding/json"
"net/http"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/authbearer"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/internal/version"
)
// NewDiagnosticsHandler creates a handler for diagnostic endpoints typically served
// on "/-/..." paths. It should be placed before any authentication middleware, since
// we do a simple auth on a static secret instead that is uniquely generated per
// deployment.
func NewDiagnosticsHandler(
baseLogger log.Logger,
secret string,
healthCheck func(context.Context) error,
) http.Handler {
baseLogger = baseLogger.Scoped("diagnostics", "healthz checks")
hasValidSecret := func(w http.ResponseWriter, r *http.Request) (yes bool) {
token, err := authbearer.ExtractBearer(r.Header)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(map[string]string{
"error": err.Error(),
})
return false
}
if token != secret {
w.WriteHeader(http.StatusUnauthorized)
return false
}
return true
}
mux := http.NewServeMux()
// For sanity-checking what's live. Intentionally doesn't require the
// secret for convenience, and it's a mostly harmless endpoint.
mux.HandleFunc("/-/version", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(version.Version()))
})
mux.HandleFunc("/-/healthz", func(w http.ResponseWriter, r *http.Request) {
logger := trace.Logger(r.Context(), baseLogger)
if !hasValidSecret(w, r) {
return
}
if err := healthCheck(r.Context()); err != nil {
logger.Error("check failed", log.Error(err))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("healthz: " + err.Error()))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("healthz: ok"))
})
return mux
}

View File

@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "events",
srcs = ["events.go"],
importpath = "github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/events",
visibility = ["//cmd/telemetry-gateway:__subpackages__"],
deps = [
"//internal/pubsub",
"//internal/telemetrygateway/v1:telemetrygateway",
"//lib/errors",
"@com_github_sourcegraph_conc//pool",
"@org_golang_google_protobuf//encoding/protojson",
],
)

View File

@ -0,0 +1,77 @@
package events
import (
"context"
"encoding/json"
"google.golang.org/protobuf/encoding/protojson"
"github.com/sourcegraph/conc/pool"
"github.com/sourcegraph/sourcegraph/internal/pubsub"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type Publisher struct {
topic pubsub.TopicClient
metadataJSON json.RawMessage
}
func NewPublisherForStream(eventsTopic pubsub.TopicClient, metadata *telemetrygatewayv1.RecordEventsRequestMetadata) (*Publisher, error) {
metadataJSON, err := protojson.Marshal(metadata)
if err != nil {
return nil, errors.Wrap(err, "marshaling metadata")
}
return &Publisher{
topic: eventsTopic,
metadataJSON: metadataJSON,
}, nil
}
type PublishEventResult struct {
// EventID is the ID of the event that was published.
EventID string
// PublishError, if non-nil, indicates an error occurred publishing the event.
PublishError error
}
func (p *Publisher) Publish(ctx context.Context, events []*telemetrygatewayv1.Event) []PublishEventResult {
wg := pool.NewWithResults[PublishEventResult]().
WithMaxGoroutines(100) // limit each batch to some degree
for _, event := range events {
doPublish := func(event *telemetrygatewayv1.Event) error {
eventJSON, err := protojson.Marshal(event)
if err != nil {
return errors.Wrap(err, "marshalling event")
}
// Join our raw JSON payloads into a single message
payload, err := json.Marshal(map[string]json.RawMessage{
"metadata": p.metadataJSON,
"event": json.RawMessage(eventJSON),
})
if err != nil {
return errors.Wrap(err, "marshalling event payload")
}
// Publish a single message in each callback to manage concurrency
// ourselves, and
if err := p.topic.Publish(ctx, payload); err != nil {
return errors.Wrap(err, "publishing event")
}
return nil
}
wg.Go(func() PublishEventResult {
return PublishEventResult{
EventID: event.GetId(),
PublishError: doPublish(event),
}
})
}
return wg.Wait()
}

View File

@ -0,0 +1,39 @@
load("//dev:go_defs.bzl", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "server",
srcs = [
"metrics.go",
"publish_events.go",
"server.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/server",
visibility = ["//cmd/telemetry-gateway:__subpackages__"],
deps = [
"//cmd/telemetry-gateway/internal/events",
"//internal/licensing",
"//internal/pubsub",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/trace",
"//lib/errors",
"@com_github_sourcegraph_log//:log",
"@io_opentelemetry_go_otel//:otel",
"@io_opentelemetry_go_otel//attribute",
"@io_opentelemetry_go_otel_metric//:metric",
"@io_opentelemetry_go_otel_trace//:trace",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)
go_test(
name = "server_test",
srcs = ["publish_events_test.go"],
embed = [":server"],
deps = [
"//cmd/telemetry-gateway/internal/events",
"@com_github_hexops_autogold_v2//:autogold",
"@com_github_stretchr_testify//assert",
],
)

View File

@ -0,0 +1,46 @@
package server
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)
var meter = otel.GetMeterProvider().Meter("cmd/telemetry-gateway/internal/server")
type recordEventsMetrics struct {
// Record total event lengths of requests
totalLength metric.Int64Histogram
// Record per-payload metrics
payload recordEventsRequestPayloadMetrics
}
type recordEventsRequestPayloadMetrics struct {
// Record event length of individual payloads
length metric.Int64Histogram
// Count number of failedEvents
failedEvents metric.Int64Counter
}
func newRecordEventsMetrics() (m recordEventsMetrics, err error) {
m.totalLength, err = meter.Int64Histogram(
"telemetry-gateway.record_events.total_length",
metric.WithDescription("Total number of events in record_events requests"))
if err != nil {
return m, err
}
m.payload.length, err = meter.Int64Histogram(
"telemetry-gateway.record_events.payload_length",
metric.WithDescription("Number of events in indvidiual record_events request payloads"))
if err != nil {
return m, err
}
m.payload.failedEvents, err = meter.Int64Counter(
"telemetry-gateway.record_events.payload_failed_events_count",
metric.WithDescription("Number of events that failed to submit in indvidiual record_events request payloads"))
if err != nil {
return m, err
}
return m, err
}

View File

@ -0,0 +1,113 @@
package server
import (
"context"
"fmt"
"github.com/sourcegraph/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/events"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
sgtrace "github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func handlePublishEvents(
ctx context.Context,
logger log.Logger,
payloadMetrics *recordEventsRequestPayloadMetrics,
publisher *events.Publisher,
events []*telemetrygatewayv1.Event,
) *telemetrygatewayv1.RecordEventsResponse {
var tr sgtrace.Trace
tr, ctx = sgtrace.New(ctx, "handlePublishEvents",
attribute.Int("events", len(events)))
defer tr.End()
logger = sgtrace.Logger(ctx, logger)
// Send off our events
results := publisher.Publish(ctx, events)
// Aggregate failure details
summary := summarizePublishEventsResults(results)
// Record the result on the trace and metrics
resultAttribute := attribute.String("result", summary.result)
tr.SetAttributes(resultAttribute)
payloadMetrics.length.Record(ctx, int64(len(events)),
metric.WithAttributes(resultAttribute))
payloadMetrics.failedEvents.Add(ctx, int64(len(summary.failedEvents)),
metric.WithAttributes(resultAttribute))
// Generate a log message for convenience
summaryFields := []log.Field{
log.String("result", summary.result),
log.Int("submitted", len(events)),
log.Int("succeeded", len(summary.succeededEvents)),
log.Int("failed", len(summary.failedEvents)),
}
if len(summary.failedEvents) > 0 {
tr.RecordError(errors.New(summary.message),
trace.WithAttributes(attribute.Int("failed", len(summary.failedEvents))))
logger.Error(summary.message, append(summaryFields, summary.errorFields...)...)
} else {
logger.Info(summary.message, summaryFields...)
}
return &telemetrygatewayv1.RecordEventsResponse{
SucceededEvents: summary.succeededEvents,
}
}
type publishEventsSummary struct {
// message is a human-readable summary summarizing the result
message string
// result is a low-cardinality indicator of the result category
result string
errorFields []log.Field
succeededEvents []string
failedEvents []events.PublishEventResult
}
func summarizePublishEventsResults(results []events.PublishEventResult) publishEventsSummary {
var (
errFields = make([]log.Field, 0)
succeeded = make([]string, 0, len(results))
failed = make([]events.PublishEventResult, 0)
)
for i, result := range results {
if result.PublishError != nil {
failed = append(failed, result)
errFields = append(errFields, log.NamedError(fmt.Sprintf("error.%d", i), result.PublishError))
} else {
succeeded = append(succeeded, result.EventID)
}
}
var message, category string
switch {
case len(failed) == len(results):
message = "all events in batch failed to submit"
category = "complete_failure"
case len(failed) > 0 && len(failed) < len(results):
message = "some events in batch failed to submit"
category = "partial_failure"
case len(failed) == 0:
message = "all events in batch submitted successfully"
category = "success"
}
return publishEventsSummary{
message: message,
result: category,
errorFields: errFields,
succeededEvents: succeeded,
failedEvents: failed,
}
}

View File

@ -0,0 +1,60 @@
package server
import (
"errors"
"strconv"
"testing"
"github.com/hexops/autogold/v2"
"github.com/stretchr/testify/assert"
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/events"
)
func TestSummarizeFailedEvents(t *testing.T) {
t.Run("all failed", func(t *testing.T) {
results := make([]events.PublishEventResult, 0)
for i := range results {
results[i].EventID = "id_" + strconv.Itoa(i)
results[i].PublishError = errors.New("failed")
}
summary := summarizePublishEventsResults(results)
autogold.Expect("all events in batch failed to submit").Equal(t, summary.message)
autogold.Expect("complete_failure").Equal(t, summary.result)
assert.Len(t, summary.errorFields, len(results))
assert.Len(t, summary.succeededEvents, 0)
assert.Len(t, summary.failedEvents, len(results))
})
t.Run("some failed", func(t *testing.T) {
results := []events.PublishEventResult{{
EventID: "asdf",
PublishError: errors.New("oh no"),
}, {
EventID: "asdfasdf",
}}
summary := summarizePublishEventsResults(results)
autogold.Expect("some events in batch failed to submit").Equal(t, summary.message)
autogold.Expect("partial_failure").Equal(t, summary.result)
assert.Len(t, summary.errorFields, 1)
assert.Len(t, summary.succeededEvents, 1)
assert.Len(t, summary.failedEvents, 1)
})
t.Run("all succeeded", func(t *testing.T) {
results := []events.PublishEventResult{{
EventID: "asdf",
}, {
EventID: "asdfasdf",
}}
summary := summarizePublishEventsResults(results)
autogold.Expect("all events in batch submitted successfully").Equal(t, summary.message)
autogold.Expect("success").Equal(t, summary.result)
assert.Len(t, summary.errorFields, 0)
assert.Len(t, summary.succeededEvents, 2)
assert.Len(t, summary.failedEvents, 0)
})
}

View File

@ -0,0 +1,138 @@
package server
import (
"fmt"
"io"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/events"
"github.com/sourcegraph/sourcegraph/internal/licensing"
"github.com/sourcegraph/sourcegraph/internal/pubsub"
sgtrace "github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/lib/errors"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
type Server struct {
logger log.Logger
eventsTopic pubsub.TopicClient
recordEventsMetrics recordEventsMetrics
// Fallback unimplemented handler
telemetrygatewayv1.UnimplementedTelemeteryGatewayServiceServer
}
var _ telemetrygatewayv1.TelemeteryGatewayServiceServer = (*Server)(nil)
func New(logger log.Logger, eventsTopic pubsub.TopicClient) (*Server, error) {
m, err := newRecordEventsMetrics()
if err != nil {
return nil, err
}
return &Server{
logger: logger.Scoped("server", "grpc server"),
eventsTopic: eventsTopic,
recordEventsMetrics: m,
}, nil
}
func (s *Server) RecordEvents(stream telemetrygatewayv1.TelemeteryGatewayService_RecordEventsServer) (err error) {
var (
logger = sgtrace.Logger(stream.Context(), s.logger)
// publisher is initialized once for RecordEventsRequestMetadata.
publisher *events.Publisher
// count of all processed events, collected at the end of a request
totalProcessedEvents int64
)
defer func() {
s.recordEventsMetrics.totalLength.Record(stream.Context(),
totalProcessedEvents,
metric.WithAttributes(attribute.Bool("error", err != nil)))
}()
for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return err
}
switch msg.Payload.(type) {
case *telemetrygatewayv1.RecordEventsRequest_Metadata:
if publisher != nil {
return status.Error(codes.InvalidArgument, "received metadata more than once")
}
metadata := msg.GetMetadata()
logger = logger.With(log.String("request_id", metadata.GetRequestId()))
// Validate self-reported instance identifier
switch metadata.GetIdentifier().Identifier.(type) {
case *telemetrygatewayv1.Identifier_LicensedInstance:
licenseKey := metadata.Identifier.GetLicensedInstance().GetLicenseKey()
licenseInfo, _, err := licensing.ParseProductLicenseKey(licenseKey)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid license key: %s", err)
}
logger.Info("handling events submission stream for licensed instance",
log.Stringp("license.salesforceOpportunityID", licenseInfo.SalesforceOpportunityID),
log.Stringp("license.salesforceSubscriptionID", licenseInfo.SalesforceSubscriptionID))
default:
logger.Error("unknown identifier type",
log.String("type", fmt.Sprintf("%T", metadata.Identifier.Identifier)))
return status.Error(codes.Unimplemented, "unsupported identifier type")
}
// Set up a publisher with the provided metadata
publisher, err = events.NewPublisherForStream(s.eventsTopic, metadata)
if err != nil {
return status.Errorf(codes.Internal, "failed to create publisher: %v", err)
}
case *telemetrygatewayv1.RecordEventsRequest_Events:
events := msg.GetEvents().GetEvents()
if publisher == nil {
return status.Error(codes.InvalidArgument, "got events when metadata not yet received")
}
// Publish events
resp := handlePublishEvents(
stream.Context(),
logger,
&s.recordEventsMetrics.payload,
publisher,
events)
// Update total count
totalProcessedEvents += int64(len(events))
// Let the client know what happened
if err := stream.Send(resp); err != nil {
return err
}
case nil:
continue
default:
return status.Errorf(codes.InvalidArgument, "got malformed message %T", msg.Payload)
}
}
logger.Info("request done")
return nil
}

View File

@ -0,0 +1,33 @@
package main
import (
"github.com/getsentry/sentry-go"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/shared"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/sanitycheck"
"github.com/sourcegraph/sourcegraph/internal/service/svcmain"
)
var sentryDSN = env.Get("TELEMETRY_GATEWAY_SENTRY_DSN", "", "Sentry DSN")
func main() {
sanitycheck.Pass()
svcmain.SingleServiceMainWithoutConf(shared.Service, svcmain.Config{}, svcmain.OutOfBandConfiguration{
Logging: func() conf.LogSinksSource {
if sentryDSN == "" {
return nil
}
return conf.NewStaticLogsSinksSource(log.SinksConfig{
Sentry: &log.SentrySink{
ClientOptions: sentry.ClientOptions{
Dsn: sentryDSN,
},
},
})
}(),
Tracing: nil,
})
}

View File

@ -0,0 +1,43 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "shared",
srcs = [
"config.go",
"main.go",
"metrics.go",
"service.go",
"tracing.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/shared",
visibility = ["//visibility:public"],
deps = [
"//cmd/telemetry-gateway/internal/diagnosticsserver",
"//cmd/telemetry-gateway/internal/server",
"//internal/debugserver",
"//internal/env",
"//internal/goroutine",
"//internal/grpc",
"//internal/grpc/defaults",
"//internal/httpserver",
"//internal/observation",
"//internal/pubsub",
"//internal/service",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/trace/policy",
"//internal/tracer/oteldefaults",
"//internal/tracer/oteldefaults/exporters",
"//internal/version",
"//lib/errors",
"@com_github_googlecloudplatform_opentelemetry_operations_go_exporter_metric//:metric",
"@com_github_googlecloudplatform_opentelemetry_operations_go_exporter_trace//:trace",
"@com_github_sourcegraph_conc//:conc",
"@com_github_sourcegraph_log//:log",
"@io_opentelemetry_go_contrib_detectors_gcp//:gcp",
"@io_opentelemetry_go_otel//:otel",
"@io_opentelemetry_go_otel//semconv/v1.4.0:v1_4_0",
"@io_opentelemetry_go_otel_sdk//resource",
"@io_opentelemetry_go_otel_sdk//trace",
"@io_opentelemetry_go_otel_sdk_metric//:metric",
],
)

View File

@ -0,0 +1,54 @@
package shared
import (
"fmt"
"os"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/trace/policy"
)
type Config struct {
env.BaseConfig
Port int
DiagnosticsSecret string
Events struct {
PubSub struct {
Enabled bool
ProjectID string
TopicID string
}
}
OpenTelemetry OpenTelemetryConfig
}
type OpenTelemetryConfig struct {
TracePolicy policy.TracePolicy
GCPProjectID string
}
func (c *Config) Load() {
c.Port = c.GetInt("PORT", "10080", "Port to serve Telemetry Gateway service on, generally injected by Cloud Run.")
c.DiagnosticsSecret = c.Get("DIAGNOSTICS_SECRET", "", "Secret for accessing diagnostics - "+
"should be used as 'Authorization: Bearer $secret' header when accessing diagnostics endpoints.")
c.Events.PubSub.Enabled = c.GetBool("TELEMETRY_GATEWAY_EVENTS_PUBSUB_ENABLED", "true",
"If false, logs Pub/Sub messages instead of actually sending them")
c.Events.PubSub.ProjectID = c.GetOptional("TELEMETRY_GATEWAY_EVENTS_PUBSUB_PROJECT_ID",
"The project ID for the Pub/Sub.")
c.Events.PubSub.TopicID = c.GetOptional("TELEMETRY_GATEWAY_EVENTS_PUBSUB_TOPIC_ID",
"The topic ID for the Pub/Sub.")
c.OpenTelemetry.TracePolicy = policy.TracePolicy(c.Get("CODY_GATEWAY_TRACE_POLICY", "all", "Trace policy, one of 'all', 'selective', 'none'."))
c.OpenTelemetry.GCPProjectID = c.GetOptional("CODY_GATEWAY_OTEL_GCP_PROJECT_ID", "Google Cloud Traces project ID.")
if c.OpenTelemetry.GCPProjectID == "" {
c.OpenTelemetry.GCPProjectID = os.Getenv("GOOGLE_CLOUD_PROJECT")
}
}
func (c *Config) GetListenAdress() string {
return fmt.Sprintf(":%d", c.Port)
}

View File

@ -0,0 +1,133 @@
package shared
import (
"context"
"net/http"
"time"
"github.com/sourcegraph/conc"
"github.com/sourcegraph/log"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
internalgrpc "github.com/sourcegraph/sourcegraph/internal/grpc"
"github.com/sourcegraph/sourcegraph/internal/grpc/defaults"
"github.com/sourcegraph/sourcegraph/internal/httpserver"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/pubsub"
"github.com/sourcegraph/sourcegraph/internal/service"
"github.com/sourcegraph/sourcegraph/internal/version"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/diagnosticsserver"
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/server"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
func Main(ctx context.Context, obctx *observation.Context, ready service.ReadyFunc, config *Config) error {
shutdownOtel, err := initOpenTelemetry(ctx, obctx.Logger, config.OpenTelemetry)
if err != nil {
return errors.Wrap(err, "initOpenTelemetry")
}
defer shutdownOtel()
var eventsTopic pubsub.TopicClient
if !config.Events.PubSub.Enabled {
obctx.Logger.Warn("pub/sub events publishing disabled, logging messages instead")
eventsTopic = pubsub.NewLoggingTopicClient(obctx.Logger)
} else {
eventsTopic, err = pubsub.NewTopicClient(config.Events.PubSub.ProjectID, config.Events.PubSub.TopicID)
if err != nil {
return errors.Errorf("create Events Pub/Sub client: %v", err)
}
}
// Initialize our gRPC server
// TODO(@bobheadxi): Maybe don't use defaults.NewServer, which is geared
// towards in-Sourcegraph services.
grpcServer := defaults.NewServer(obctx.Logger)
defer grpcServer.GracefulStop()
telemetryGatewayServer, err := server.New(obctx.Logger, eventsTopic)
if err != nil {
return errors.Wrap(err, "init telemetry gateway server")
}
telemetrygatewayv1.RegisterTelemeteryGatewayServiceServer(grpcServer, telemetryGatewayServer)
// Start up the service
addr := config.GetListenAdress()
server := httpserver.NewFromAddr(
addr,
&http.Server{
ReadTimeout: 2 * time.Minute,
WriteTimeout: 2 * time.Minute,
Handler: internalgrpc.MultiplexHandlers(
grpcServer,
diagnosticsserver.NewDiagnosticsHandler(
obctx.Logger,
config.DiagnosticsSecret,
func(ctx context.Context) error {
if err := eventsTopic.Ping(ctx); err != nil {
return errors.Wrap(err, "eventsPubSubClient.Ping")
}
return nil
},
),
),
},
)
// Mark health server as ready and go!
ready()
obctx.Logger.Info("service ready", log.String("address", addr))
// Block until done
goroutine.MonitorBackgroundRoutines(ctx, server)
return nil
}
func initOpenTelemetry(ctx context.Context, logger log.Logger, config OpenTelemetryConfig) (func(), error) {
res, err := getOpenTelemetryResource(ctx)
if err != nil {
return nil, err
}
// Enable tracing, at this point tracing wouldn't have been enabled yet because
// we run Cody Gateway without conf which means Sourcegraph tracing is not enabled.
shutdownTracing, err := maybeEnableTracing(ctx,
logger.Scoped("tracing", "OpenTelemetry tracing"),
config, res)
if err != nil {
return nil, errors.Wrap(err, "maybeEnableTracing")
}
shutdownMetrics, err := maybeEnableMetrics(ctx,
logger.Scoped("metrics", "OpenTelemetry metrics"),
config, res)
if err != nil {
return nil, errors.Wrap(err, "maybeEnableMetrics")
}
return func() {
var wg conc.WaitGroup
wg.Go(shutdownTracing)
wg.Go(shutdownMetrics)
wg.Wait()
}, nil
}
func getOpenTelemetryResource(ctx context.Context) (*resource.Resource, error) {
// Identify your application using resource detection
return resource.New(ctx,
// Use the GCP resource detector to detect information about the GCP platform
resource.WithDetectors(gcp.NewDetector()),
// Keep the default detectors
resource.WithTelemetrySDK(),
// Add your own custom attributes to identify your application
resource.WithAttributes(
semconv.ServiceNameKey.String("telemetry-gateway"),
semconv.ServiceVersionKey.String(version.Version()),
),
)
}

View File

@ -0,0 +1,58 @@
package shared
import (
"context"
"time"
gcpmetricexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"github.com/sourcegraph/log"
"go.opentelemetry.io/otel"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"github.com/sourcegraph/sourcegraph/internal/tracer/oteldefaults/exporters"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func maybeEnableMetrics(_ context.Context, logger log.Logger, config OpenTelemetryConfig, otelResource *resource.Resource) (func(), error) {
var reader sdkmetric.Reader
if config.GCPProjectID != "" {
logger.Info("initializing GCP trace exporter", log.String("projectID", config.GCPProjectID))
exporter, err := gcpmetricexporter.New(
gcpmetricexporter.WithProjectID(config.GCPProjectID))
if err != nil {
return nil, errors.Wrap(err, "gcpmetricexporter.New")
}
reader = sdkmetric.NewPeriodicReader(exporter,
sdkmetric.WithInterval(30*time.Second))
} else {
logger.Info("initializing Prometheus exporter")
var err error
reader, err = exporters.NewPrometheusExporter()
if err != nil {
return nil, errors.Wrap(err, "exporters.NewPrometheusExporter")
}
}
// Create and set global tracer
provider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(reader),
sdkmetric.WithResource(otelResource))
otel.SetMeterProvider(provider)
logger.Info("metrics configured")
return func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
start := time.Now()
logger.Info("Shutting down metrics")
if err := provider.ForceFlush(shutdownCtx); err != nil {
logger.Warn("error occurred force-flushing metrics", log.Error(err))
}
if err := provider.Shutdown(shutdownCtx); err != nil {
logger.Warn("error occured shutting down metrics", log.Error(err))
}
logger.Info("metrics shut down", log.Duration("elapsed", time.Since(start)))
}, nil
}

View File

@ -0,0 +1,31 @@
package shared
import (
"context"
"github.com/sourcegraph/sourcegraph/internal/debugserver"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/service"
)
// Service is the shared ping service.
var Service service.Service = svc{}
type svc struct{}
func (svc) Name() string { return "telemetry-gateway" }
func (svc) Configure() (env.Config, []debugserver.Endpoint) {
c := &Config{}
c.Load()
return c, []debugserver.Endpoint{
// Requires GRPC_WEB_UI_ENABLED to be set to enable - only use in local
// development!
debugserver.NewGRPCWebUIEndpoint("telemetry-gateway", c.GetListenAdress()),
}
}
func (svc) Start(ctx context.Context, observationCtx *observation.Context, ready service.ReadyFunc, config env.Config) error {
return Main(ctx, observationCtx, ready, config.(*Config))
}

View File

@ -0,0 +1,77 @@
package shared
import (
"context"
"time"
"github.com/sourcegraph/log"
gcptraceexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"github.com/sourcegraph/sourcegraph/internal/trace/policy"
"github.com/sourcegraph/sourcegraph/internal/tracer/oteldefaults"
"github.com/sourcegraph/sourcegraph/internal/tracer/oteldefaults/exporters"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// maybeEnableTracing configures OpenTelemetry tracing if the GOOGLE_CLOUD_PROJECT is set.
// It differs from Sourcegraph's default tracing because we need to export directly to GCP,
// and the use case is more niche as a standalone service.
//
// Based on https://cloud.google.com/trace/docs/setup/go-ot
func maybeEnableTracing(ctx context.Context, logger log.Logger, config OpenTelemetryConfig, otelResource *resource.Resource) (func(), error) {
// Set globals
policy.SetTracePolicy(config.TracePolicy)
otel.SetTextMapPropagator(oteldefaults.Propagator())
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
logger.Debug("OpenTelemetry error", log.Error(err))
}))
// Initialize exporter
var exporter sdktrace.SpanExporter
if config.GCPProjectID != "" {
logger.Info("initializing GCP trace exporter", log.String("projectID", config.GCPProjectID))
var err error
exporter, err = gcptraceexporter.New(
gcptraceexporter.WithProjectID(config.GCPProjectID),
gcptraceexporter.WithErrorHandler(otel.ErrorHandlerFunc(func(err error) {
logger.Warn("gcptraceexporter error", log.Error(err))
})),
)
if err != nil {
return nil, errors.Wrap(err, "gcptraceexporter.New")
}
} else {
logger.Info("initializing OTLP exporter")
var err error
exporter, err = exporters.NewOTLPTraceExporter(ctx, logger)
if err != nil {
return nil, errors.Wrap(err, "exporters.NewOTLPExporter")
}
}
// Create and set global tracer
provider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(otelResource))
otel.SetTracerProvider(provider)
logger.Info("tracing configured")
return func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
start := time.Now()
logger.Info("Shutting down tracing")
if err := provider.ForceFlush(shutdownCtx); err != nil {
logger.Warn("error occurred force-flushing traces", log.Error(err))
}
if err := provider.Shutdown(shutdownCtx); err != nil {
logger.Warn("error occured shutting down tracing", log.Error(err))
}
logger.Info("Tracing shut down", log.Duration("elapsed", time.Since(start)))
}, nil
}

View File

@ -73,3 +73,8 @@
targets:
# cody gateway
- host.docker.internal:6098
- labels:
job: telemetry-gateway
targets:
# cody gateway
- host.docker.internal:6080

View File

@ -73,3 +73,8 @@
targets:
# cody gateway
- 127.0.0.1:6098
- labels:
job: telemetry-gateway
targets:
# telemetry gateway
- 127.0.0.1:6080

View File

@ -135,6 +135,7 @@ Available commands in `sg.config.yaml`:
* symbols
* syntax-highlighter
* tauri: App shell (Tauri)
* telemetry-gateway
* web-integration-build-prod: Build production web application for integration tests
* web-integration-build: Build development web application for integration tests
* web-standalone-http-prod: Standalone web frontend (production) with API proxy to a configurable URL

View File

@ -70,6 +70,10 @@
- [How to set up Cody Gateway locally](cody_gateway.md)
## Telemetry Gateway
- [How to set up Telemetry Gateway locally](telemetry_gateway.md)
## Windows support
Running Sourcegraph on Windows is not actively tested, but should be possible within the Windows Subsystem for Linux (WSL).

View File

@ -0,0 +1,13 @@
# How to set up Telemetry Gateway locally
By default, exports of Telemetry V2 events to a local Telemetry Gateway instance is enabled in `sg start` and `sg start dotcom`.
You can increase the frequency of exports by setting the following in `sg.config.yaml`:
```yaml
env:
TELEMETRY_GATEWAY_EXPORTER_EXPORT_INTERVAL: "10s"
TELEMETRY_GATEWAY_EXPORTER_EXPORTED_EVENTS_RETENTION: "5m"
```
In development, a gRPC interface is enabled for Telemetry Gateway as well at `http://127.0.0.1:10085/debug/grpcui/`.

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "telemetrygatewayexporter",
srcs = [
"backlog_metrics.go",
"exporter.go",
"queue_cleanup.go",
"telemetrygatewayexporter.go",
],
importpath = "github.com/sourcegraph/sourcegraph/enterprise/cmd/worker/internal/telemetrygatewayexporter",
visibility = ["//enterprise/cmd/worker:__subpackages__"],
deps = [
"//cmd/worker/shared/init/db",
"//internal/conf",
"//internal/database",
"//internal/env",
"//internal/goroutine",
"//internal/metrics",
"//internal/observation",
"//internal/telemetrygateway",
"//internal/trace",
"//lib/errors",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_golang//prometheus/promauto",
"@com_github_sourcegraph_log//:log",
],
)

View File

@ -0,0 +1,48 @@
package telemetrygatewayexporter
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type backlogMetricsJob struct {
store database.TelemetryEventsExportQueueStore
sizeGauge prometheus.Gauge
}
func newBacklogMetricsJob(store database.TelemetryEventsExportQueueStore) goroutine.BackgroundRoutine {
job := &backlogMetricsJob{
store: store,
sizeGauge: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "src",
Subsystem: "telemetrygatewayexport",
Name: "backlog_size",
Help: "Current number of events waiting to be exported.",
}),
}
return goroutine.NewPeriodicGoroutine(
context.Background(),
job,
goroutine.WithName("telemetrygatewayexporter.backlog_metrics"),
goroutine.WithDescription("telemetrygatewayexporter backlog metrics"),
goroutine.WithInterval(time.Minute*5),
)
}
func (j *backlogMetricsJob) Handle(ctx context.Context) error {
count, err := j.store.CountUnexported(ctx)
if err != nil {
return errors.Wrap(err, "store.CountUnexported")
}
j.sizeGauge.Set(float64(count))
return nil
}

View File

@ -0,0 +1,114 @@
package telemetrygatewayexporter
import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/metrics"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/telemetrygateway"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type exporterJob struct {
logger log.Logger
store database.TelemetryEventsExportQueueStore
exporter telemetrygateway.Exporter
maxBatchSize int
// batchSizeHistogram records real batch sizes of each export.
batchSizeHistogram prometheus.Histogram
// exportedEventsCounter records successfully exported events.
exportedEventsCounter prometheus.Counter
}
func newExporterJob(
obctx *observation.Context,
store database.TelemetryEventsExportQueueStore,
exporter telemetrygateway.Exporter,
cfg config,
) goroutine.BackgroundRoutine {
job := &exporterJob{
logger: obctx.Logger,
store: store,
maxBatchSize: cfg.MaxExportBatchSize,
exporter: exporter,
batchSizeHistogram: promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "src",
Subsystem: "telemetrygatewayexport",
Name: "batch_size",
Help: "Size of event batches exported from the queue.",
}),
exportedEventsCounter: promauto.NewCounter(prometheus.CounterOpts{
Namespace: "src",
Subsystem: "telemetrygatewayexport",
Name: "exported_events",
Help: "Number of events exported from the queue.",
}),
}
return goroutine.NewPeriodicGoroutine(
context.Background(),
job,
goroutine.WithName("telemetrygatewayexporter.exporter"),
goroutine.WithDescription("telemetrygatewayexporter events export job"),
goroutine.WithInterval(cfg.ExportInterval),
goroutine.WithOperation(obctx.Operation(observation.Op{
Name: "TelemetryGateway.Export",
Metrics: metrics.NewREDMetrics(prometheus.DefaultRegisterer, "telemetrygatewayexporter_exporter"),
})),
)
}
var _ goroutine.Finalizer = (*exporterJob)(nil)
func (j *exporterJob) OnShutdown() { _ = j.exporter.Close() }
func (j *exporterJob) Handle(ctx context.Context) error {
logger := trace.Logger(ctx, j.logger).
With(log.Int("maxBatchSize", j.maxBatchSize))
if conf.Get().LicenseKey == "" {
logger.Debug("license key not set, skipping export")
return nil
}
// Get events from the queue
batch, err := j.store.ListForExport(ctx, j.maxBatchSize)
if err != nil {
return errors.Wrap(err, "ListForExport")
}
j.batchSizeHistogram.Observe(float64(len(batch)))
if len(batch) == 0 {
logger.Debug("no events to export")
return nil
}
logger.Info("exporting events", log.Int("count", len(batch)))
// Send out events
succeeded, exportErr := j.exporter.ExportEvents(ctx, batch)
// Mark succeeded events
j.exportedEventsCounter.Add(float64(len(succeeded)))
if err := j.store.MarkAsExported(ctx, succeeded); err != nil {
logger.Error("failed to mark exported events as exported",
log.Strings("succeeded", succeeded),
log.Error(err))
}
// Report export status
if exportErr != nil {
return exportErr
}
logger.Info("events exported", log.Int("succeeded", len(succeeded)))
return nil
}

View File

@ -0,0 +1,50 @@
package telemetrygatewayexporter
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type queueCleanupJob struct {
store database.TelemetryEventsExportQueueStore
retentionWindow time.Duration
prunedHistogram prometheus.Histogram
}
func newQueueCleanupJob(store database.TelemetryEventsExportQueueStore, cfg config) goroutine.BackgroundRoutine {
job := &queueCleanupJob{
store: store,
prunedHistogram: promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "src",
Subsystem: "telemetrygatewayexport",
Name: "pruned",
Help: "Size of exported events pruned from the queue table.",
}),
}
return goroutine.NewPeriodicGoroutine(
context.Background(),
job,
goroutine.WithName("telemetrygatewayexporter.queue_cleanup"),
goroutine.WithDescription("telemetrygatewayexporter queue cleanup"),
goroutine.WithInterval(cfg.QueueCleanupInterval),
)
}
func (j *queueCleanupJob) Handle(ctx context.Context) error {
count, err := j.store.DeletedExported(ctx, time.Now().Add(-j.retentionWindow))
if err != nil {
return errors.Wrap(err, "store.DeletedExported")
}
j.prunedHistogram.Observe(float64(count))
return nil
}

View File

@ -0,0 +1,100 @@
package telemetrygatewayexporter
import (
"context"
"time"
"github.com/sourcegraph/log"
workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/telemetrygateway"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type config struct {
env.BaseConfig
ExportAddress string
ExportInterval time.Duration
MaxExportBatchSize int
ExportedEventsRetentionWindow time.Duration
QueueCleanupInterval time.Duration
}
var ConfigInst = &config{}
func (c *config) Load() {
// exportAddress currently has no default value, as the feature is not enabled
// by default. In a future release, the default will be something like
// 'https://telemetry-gateway.sourcegraph.com', and eventually, won't be configurable.
c.ExportAddress = env.Get("TELEMETRY_GATEWAY_EXPORTER_EXPORT_ADDR", "", "Target Telemetry Gateway address")
c.ExportInterval = env.MustGetDuration("TELEMETRY_GATEWAY_EXPORTER_EXPORT_INTERVAL", 10*time.Minute,
"Interval at which to export telemetry")
c.MaxExportBatchSize = env.MustGetInt("TELEMETRY_GATEWAY_EXPORTER_EXPORT_BATCH_SIZE", 5000,
"Maximum number of events to export in each batch")
c.ExportedEventsRetentionWindow = env.MustGetDuration("TELEMETRY_GATEWAY_EXPORTER_EXPORTED_EVENTS_RETENTION",
2*24*time.Hour, "Duration to retain already-exported telemetry events before deleting")
c.QueueCleanupInterval = env.MustGetDuration("TELEMETRY_GATEWAY_EXPORTER_QUEUE_CLEANUP_INTERVAL",
1*time.Hour, "Interval at which to clean up telemetry export queue")
}
type telemetryGatewayExporter struct{}
func NewJob() *telemetryGatewayExporter {
return &telemetryGatewayExporter{}
}
func (t *telemetryGatewayExporter) Description() string {
return "A background routine that exports telemetry events to Sourcegraph's Telemetry Gateway"
}
func (t *telemetryGatewayExporter) Config() []env.Config {
return []env.Config{ConfigInst}
}
func (t *telemetryGatewayExporter) Routines(initCtx context.Context, observationCtx *observation.Context) ([]goroutine.BackgroundRoutine, error) {
if ConfigInst.ExportAddress == "" {
return nil, nil
}
observationCtx.Logger.Info("Telemetry Gateway export enabled - initializing background routines")
db, err := workerdb.InitDB(observationCtx)
if err != nil {
return nil, err
}
exporter, err := telemetrygateway.NewExporter(
initCtx,
observationCtx.Logger.Scoped("exporter", "exporter client"),
conf.DefaultClient(),
ConfigInst.ExportAddress,
)
if err != nil {
return nil, errors.Wrap(err, "initializing export client")
}
observationCtx.Logger.Info("connected to Telemetry Gateway",
log.String("address", ConfigInst.ExportAddress))
return []goroutine.BackgroundRoutine{
newExporterJob(
observationCtx,
db.TelemetryEventsExportQueue(),
exporter,
*ConfigInst,
),
newQueueCleanupJob(db.TelemetryEventsExportQueue(), *ConfigInst),
newBacklogMetricsJob(db.TelemetryEventsExportQueue()),
}, nil
}

View File

@ -26,6 +26,7 @@ go_library(
"//enterprise/cmd/worker/internal/permissions",
"//enterprise/cmd/worker/internal/search",
"//enterprise/cmd/worker/internal/telemetry",
"//enterprise/cmd/worker/internal/telemetrygatewayexporter",
"//internal/auth/userpasswd",
"//internal/authz",
"//internal/authz/providers",

View File

@ -10,6 +10,7 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/worker/internal/githubapps"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/worker/internal/own"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/worker/internal/search"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/worker/internal/telemetrygatewayexporter"
"github.com/sourcegraph/sourcegraph/cmd/frontend/globals"
"github.com/sourcegraph/sourcegraph/cmd/worker/job"
@ -50,6 +51,7 @@ var additionalJobs = map[string]job.Job{
"permission-sync-job-cleaner": permissions.NewPermissionSyncJobCleaner(),
"permission-sync-job-scheduler": permissions.NewPermissionSyncJobScheduler(),
"export-usage-telemetry": telemetry.NewTelemetryJob(),
"telemetrygateway-exporter": telemetrygatewayexporter.NewJob(),
"codeintel-policies-repository-matcher": codeintel.NewPoliciesRepositoryMatcherJob(),
"codeintel-autoindexing-summary-builder": codeintel.NewAutoindexingSummaryBuilder(),

View File

@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "authbearer",
srcs = ["authbearer.go"],
importpath = "github.com/sourcegraph/sourcegraph/internal/authbearer",
visibility = ["//:__subpackages__"],
deps = ["//lib/errors"],
)

View File

@ -1,4 +1,4 @@
package auth
package authbearer
import (
"net/http"

View File

@ -24,10 +24,6 @@ const (
const FeatureHeaderName = "X-Sourcegraph-Feature"
// GQLErrCodeProductSubscriptionNotFound is the GraphQL error code returned when
// attempting to look up a product subscription failed by any means.
const GQLErrCodeProductSubscriptionNotFound = "ErrProductSubscriptionNotFound"
// GQLErrCodeDotcomUserNotFound is the GraphQL error code returned when
// attempting to look up a dotcom user failed.
const GQLErrCodeDotcomUserNotFound = "ErrDotcomUserNotFound"

View File

@ -142,6 +142,7 @@ go_library(
"//internal/rbac/types",
"//internal/search/result",
"//internal/security",
"//internal/telemetry/sensitivemetadataallowlist",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/temporarysettings",
"//internal/timeutil",

View File

@ -13,6 +13,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/batch"
"github.com/sourcegraph/sourcegraph/internal/featureflag"
"github.com/sourcegraph/sourcegraph/internal/telemetry/sensitivemetadataallowlist"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/lib/errors"
@ -33,9 +34,8 @@ type TelemetryEventsExportQueueStore interface {
// ListForExport returns the cached events that should be exported next. All
// events returned should be exported.
//
// 🚨 SECURITY: Potentially sensitive parts of the payload are retained at
// this stage. The caller is responsible for ensuring sensitive data is
// stripped.
// 🚨 SECURITY: The implementation strips out sensitive contents from events
// that are not in sensitivemetadataallowlist.AllowedEventTypes().
ListForExport(ctx context.Context, limit int) ([]*telemetrygatewayv1.Event, error)
// MarkAsExported marks all events in the set of IDs as exported.
@ -44,6 +44,9 @@ type TelemetryEventsExportQueueStore interface {
// DeletedExported deletes all events exported before the given timestamp,
// returning the number of affected events.
DeletedExported(ctx context.Context, before time.Time) (int64, error)
// CountUnexported returns the number of events not yet exported.
CountUnexported(ctx context.Context) (int64, error)
}
func TelemetryEventsExportQueueWith(logger log.Logger, other basestore.ShareableStore) TelemetryEventsExportQueueStore {
@ -137,6 +140,8 @@ func (s *telemetryEventsExportQueueStore) ListForExport(ctx context.Context, lim
}
defer rows.Close()
sensitiveAllowlist := sensitivemetadataallowlist.AllowedEventTypes()
events := make([]*telemetrygatewayv1.Event, 0, limit)
for rows.Next() {
var id string
@ -145,8 +150,9 @@ func (s *telemetryEventsExportQueueStore) ListForExport(ctx context.Context, lim
if err != nil {
return nil, err
}
var event telemetrygatewayv1.Event
if err := proto.Unmarshal(payloadPB, &event); err != nil {
event := &telemetrygatewayv1.Event{}
if err := proto.Unmarshal(payloadPB, event); err != nil {
tr.RecordError(err)
logger.Error("failed to unmarshal telemetry event payload",
log.String("id", id),
@ -155,7 +161,11 @@ func (s *telemetryEventsExportQueueStore) ListForExport(ctx context.Context, lim
// investigation.
continue
}
events = append(events, &event)
// 🚨 SECURITY: Apply sensitive data redaction of the payload.
sensitiveAllowlist.Redact(event)
events = append(events, event)
}
tr.SetAttributes(attribute.Int("events", len(events)))
if err := rows.Err(); err != nil {
@ -188,3 +198,12 @@ func (s *telemetryEventsExportQueueStore) DeletedExported(ctx context.Context, b
}
return result.RowsAffected()
}
func (s *telemetryEventsExportQueueStore) CountUnexported(ctx context.Context) (int64, error) {
var count int64
return count, s.ShareableStore.Handle().QueryRowContext(ctx, `
SELECT COUNT(*)
FROM telemetry_events_export_queue
WHERE exported_at IS NULL
`).Scan(&count)
}

View File

@ -35,15 +35,17 @@ func TestTelemetryEventsExportQueueLifecycle(t *testing.T) {
Timestamp: timestamppb.New(time.Date(2022, 11, 3, 1, 0, 0, 0, time.UTC)),
Parameters: &telemetrygatewayv1.EventParameters{
Metadata: map[string]int64{"public": 1},
PrivateMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{"sensitive": structpb.NewStringValue("sensitive")},
},
},
}, {
Id: "2",
Feature: "Feature",
Action: "Click",
Timestamp: timestamppb.New(time.Date(2022, 11, 3, 2, 0, 0, 0, time.UTC)),
Parameters: &telemetrygatewayv1.EventParameters{
PrivateMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{"sensitive": structpb.NewStringValue("sensitive")},
},
},
}, {
Id: "3",
Feature: "Feature",
@ -63,6 +65,12 @@ func TestTelemetryEventsExportQueueLifecycle(t *testing.T) {
require.NoError(t, store.QueueForExport(ctx, events))
})
t.Run("CountUnexported", func(t *testing.T) {
count, err := store.CountUnexported(ctx)
require.NoError(t, err)
require.Equal(t, count, int64(3))
})
t.Run("ListForExport", func(t *testing.T) {
limit := len(events) - 1
export, err := store.ListForExport(ctx, limit)
@ -74,7 +82,11 @@ func TestTelemetryEventsExportQueueLifecycle(t *testing.T) {
require.NoError(t, err)
got, err := proto.Marshal(export[0])
require.NoError(t, err)
require.Equal(t, string(original), string(got))
assert.Equal(t, string(original), string(got))
// Check second item's private meta is stripped
assert.NotNil(t, events[1].Parameters.PrivateMetadata) // original
assert.Nil(t, export[1].Parameters.PrivateMetadata) // got
})
t.Run("before export: DeleteExported", func(t *testing.T) {

View File

@ -25,6 +25,7 @@ go_library(
"@com_github_sourcegraph_log//:log",
"@io_opentelemetry_go_contrib_instrumentation_google_golang_org_grpc_otelgrpc//:otelgrpc",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//reflection",
],

View File

@ -6,18 +6,21 @@ package defaults
import (
"context"
"crypto/tls"
"sync"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/grpc/contextconv"
"github.com/sourcegraph/sourcegraph/internal/grpc/messagesize"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
"github.com/sourcegraph/sourcegraph/internal/grpc/contextconv"
"github.com/sourcegraph/sourcegraph/internal/grpc/messagesize"
"github.com/sourcegraph/sourcegraph/internal/actor"
internalgrpc "github.com/sourcegraph/sourcegraph/internal/grpc"
"github.com/sourcegraph/sourcegraph/internal/grpc/internalerrs"
@ -46,6 +49,23 @@ const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
// **Note**: Do not append to this slice directly, instead provide extra options
// via "additionalOptions".
func DialOptions(logger log.Logger, additionalOptions ...grpc.DialOption) []grpc.DialOption {
return defaultDialOptions(logger, insecure.NewCredentials(), additionalOptions...)
}
// ExternalDialOptions is a set of default dial options that should be used for
// gRPC clients external to a Sourcegraph deployment, e.g. Telemetry Gateway,
// along with any additional client-specific options. In particular, these
// options enforce TLS.
//
// Traffic within a Sourcegraph deployment should use DialOptions instead.
//
// **Note**: Do not append to this slice directly, instead provide extra options
// via "additionalOptions".
func ExternalDialOptions(logger log.Logger, additionalOptions ...grpc.DialOption) []grpc.DialOption {
return defaultDialOptions(logger, credentials.NewTLS(&tls.Config{}), additionalOptions...)
}
func defaultDialOptions(logger log.Logger, creds credentials.TransportCredentials, additionalOptions ...grpc.DialOption) []grpc.DialOption {
// Generate the options dynamically rather than using a static slice
// because these options depend on some globals (tracer, trace sampling)
// that are not initialized during init time.
@ -53,7 +73,7 @@ func DialOptions(logger log.Logger, additionalOptions ...grpc.DialOption) []grpc
metrics := mustGetClientMetrics()
out := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithTransportCredentials(creds),
grpc.WithChainStreamInterceptor(
metrics.StreamClientInterceptor(),
messagesize.StreamClientInterceptor,

View File

@ -8,3 +8,7 @@ package productsubscription
// In the future, this prefix can be used if we ever have access tokens for
// product subscriptions that are not based on a Sourcegraph license key.
const AccessTokenPrefix = "sgs_" // "(S)ource(g)raph (S)ubscription"
// GQLErrCodeProductSubscriptionNotFound is the GraphQL error code returned when
// attempting to look up a product subscription failed by any means.
const GQLErrCodeProductSubscriptionNotFound = "ErrProductSubscriptionNotFound"

View File

@ -7,7 +7,9 @@ go_library(
visibility = ["//:__subpackages__"],
deps = [
"//internal/env",
"//internal/trace",
"//lib/errors",
"@com_github_sourcegraph_log//:log",
"@com_google_cloud_go_pubsub//:pubsub",
"@org_golang_google_api//option",
],

View File

@ -7,7 +7,10 @@ import (
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -16,6 +19,8 @@ type TopicClient interface {
// Ping checks if the connection to the topic is valid.
Ping(ctx context.Context) error
// Publish publishes messages and waits for all the results synchronously.
// It returns the first error encountered or nil if all succeeded. To collect
// individual errors, call Publish with only 1 message.
Publish(ctx context.Context, messages ...[]byte) error
// Stop stops the topic publishing channel. The client should not be used after
// calling Stop.
@ -91,3 +96,23 @@ type noopTopicClient struct{}
func (c *noopTopicClient) Ping(context.Context) error { return nil }
func (c *noopTopicClient) Publish(context.Context, ...[]byte) error { return nil }
func (c *noopTopicClient) Stop() {}
// NewLoggingTopicClient creates a Pub/Sub client that just logs all messages,
// and does nothing otherwise. This is also a useful stub implementation of the
// TopicClient for testing/debugging purposes.
func NewLoggingTopicClient(logger log.Logger) TopicClient {
return &loggingTopicClient{logger: logger.Scoped("pubsub", "")}
}
type loggingTopicClient struct {
logger log.Logger
noopTopicClient
}
func (c *loggingTopicClient) Publish(ctx context.Context, messages ...[]byte) error {
l := trace.Logger(ctx, c.logger)
for _, m := range messages {
l.Info("Publish", log.String("message", string(m)))
}
return nil
}

View File

@ -4,6 +4,7 @@ load("//dev:go_defs.bzl", "go_test")
go_library(
name = "telemetry",
srcs = [
"besteffort.go",
"billing_categories.go",
"billing_products.go",
"events.go",
@ -13,9 +14,10 @@ go_library(
importpath = "github.com/sourcegraph/sourcegraph/internal/telemetry",
visibility = ["//:__subpackages__"],
deps = [
"//internal/telemetry/teestore",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/trace",
"//internal/version",
"@com_github_sourcegraph_log//:log",
"@org_golang_google_protobuf//types/known/structpb",
],
)

View File

@ -0,0 +1,33 @@
package telemetry
import (
"context"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/trace"
)
// BestEffortEventRecorder is a version of EventRecorder that logs errors instead
// of returning them. This is useful for non-critical telemetry collection.
type BestEffortEventRecorder struct {
logger log.Logger
recorder *EventRecorder
}
// NewEventRecorder creates a custom event recorder backed by a store
// implementation. In general, prefer to use the telemetryrecorder.NewBestEffort()
// constructor instead.
func NewBestEffortEventRecorder(logger log.Logger, recorder *EventRecorder) *BestEffortEventRecorder {
return &BestEffortEventRecorder{logger: logger, recorder: recorder}
}
func (r *BestEffortEventRecorder) Record(ctx context.Context, feature eventFeature, action eventAction, parameters EventParameters) {
if err := r.recorder.Record(ctx, feature, action, parameters); err != nil {
trace.Logger(ctx, r.logger).Error("failed to record telemetry event",
log.String("feature", string(feature)),
log.String("action", string(action)),
log.Int("parameters.version", parameters.Version),
log.Error(err))
}
}

View File

@ -10,6 +10,9 @@ type eventFeature string
// All event names in Sourcegraph's Go services.
const (
FeatureExample eventFeature = "exampleFeature"
FeatureSignIn eventFeature = "signIn"
FeatureSignOut eventFeature = "signOut"
)
// eventAction defines the action associated with an event. Values should
@ -21,4 +24,8 @@ type eventAction string
const (
ActionExample eventAction = "exampleAction"
ActionFailed eventAction = "failed"
ActionSucceeded eventAction = "succeeded"
ActionAttempted eventAction = "attempted"
)

View File

@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("//dev:go_defs.bzl", "go_test")
go_library(
name = "sensitivemetadataallowlist",
srcs = [
"redact.go",
"sensitiviemetadataallowlist.go",
],
importpath = "github.com/sourcegraph/sourcegraph/internal/telemetry/sensitivemetadataallowlist",
visibility = ["//:__subpackages__"],
deps = [
"//cmd/frontend/envvar",
"//internal/telemetry",
"//internal/telemetrygateway/v1:telemetrygateway",
"//lib/errors",
],
)
go_test(
name = "sensitivemetadataallowlist_test",
srcs = [
"redact_test.go",
"sensitivemetadataallowlist_test.go",
],
embed = [":sensitivemetadataallowlist"],
deps = [
"//internal/telemetry",
"//internal/telemetrygateway/v1:telemetrygateway",
"//lib/pointers",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//types/known/structpb",
],
)

View File

@ -0,0 +1,37 @@
package sensitivemetadataallowlist
import (
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
// redactMode dictates how much to redact. The lowest value indicates our
// strictest redaction mode - higher values indicate less redaction.
type redactMode int
const (
redactAllSensitive redactMode = iota
// redactMarketing only redacts marketing-related fields.
redactMarketing
// redactNothing is only used in dotocm mode.
redactNothing
)
// 🚨 SECURITY: Be very careful with the redaction mechanisms here, as it impacts
// what data we export from customer Sourcegraph instances.
func redactEvent(event *telemetrygatewayv1.Event, mode redactMode) {
// redactNothing
if mode >= redactNothing {
return
}
// redactMarketing
event.MarketingTracking = nil
if mode >= redactMarketing {
return
}
// redactAllSensitive
if event.Parameters != nil {
event.Parameters.PrivateMetadata = nil
}
}

View File

@ -0,0 +1,79 @@
package sensitivemetadataallowlist
import (
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/structpb"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/lib/pointers"
)
func TestRedactEvent(t *testing.T) {
makeFullEvent := func() *telemetrygatewayv1.Event {
return &telemetrygatewayv1.Event{
Parameters: &telemetrygatewayv1.EventParameters{
PrivateMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"testField": structpb.NewBoolValue(true),
},
},
},
MarketingTracking: &telemetrygatewayv1.EventMarketingTracking{
Url: pointers.Ptr("sourcegraph.com"),
},
}
}
tests := []struct {
name string
mode redactMode
event *telemetrygatewayv1.Event
assert func(t *testing.T, got *telemetrygatewayv1.Event)
}{
{
name: "redact all sensitive",
mode: redactAllSensitive,
event: makeFullEvent(),
assert: func(t *testing.T, got *telemetrygatewayv1.Event) {
assert.Nil(t, got.Parameters.PrivateMetadata)
assert.Nil(t, got.MarketingTracking)
},
},
{
name: "redact all sensitive on empty event",
mode: redactAllSensitive,
event: &telemetrygatewayv1.Event{},
assert: func(t *testing.T, got *telemetrygatewayv1.Event) {
assert.Nil(t, got.Parameters.PrivateMetadata)
assert.Nil(t, got.MarketingTracking)
},
},
{
name: "redact marketing",
mode: redactMarketing,
event: makeFullEvent(),
assert: func(t *testing.T, got *telemetrygatewayv1.Event) {
assert.NotNil(t, got.Parameters.PrivateMetadata)
assert.Nil(t, got.MarketingTracking)
},
},
{
name: "redact nothing",
mode: redactNothing,
event: makeFullEvent(),
assert: func(t *testing.T, got *telemetrygatewayv1.Event) {
assert.NotNil(t, got.Parameters.PrivateMetadata)
assert.NotNil(t, got.MarketingTracking)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ev := makeFullEvent()
redactEvent(ev, tc.mode)
tc.assert(t, ev)
})
}
}

View File

@ -0,0 +1,24 @@
package sensitivemetadataallowlist
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/internal/telemetry"
v1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
func TestIsAllowed(t *testing.T) {
allowedTypes := AllowedEventTypes()
require.NotEmpty(t, allowedTypes)
assert.True(t, allowedTypes.IsAllowed(&v1.Event{
Feature: string(telemetry.FeatureExample),
Action: string(telemetry.ActionExample),
}))
assert.False(t, allowedTypes.IsAllowed(&v1.Event{
Feature: "disallowedFeature",
Action: "disallowedAction",
}))
}

View File

@ -0,0 +1,87 @@
package sensitivemetadataallowlist
import (
"fmt"
"github.com/sourcegraph/sourcegraph/cmd/frontend/envvar"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/internal/telemetry"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
// AllowedEventTypes denotes a list of all events allowed to export sensitive
// telemetry metadata.
func AllowedEventTypes() EventTypes {
return eventTypes(
// Example event for testing.
EventType{
Feature: string(telemetry.FeatureExample),
Action: string(telemetry.ActionExample),
},
)
}
type EventTypes struct {
types []EventType
// index of '{feature}.{action}' for checking
index map[string]struct{}
}
func eventTypes(types ...EventType) EventTypes {
index := make(map[string]struct{}, len(types))
for _, t := range types {
index[fmt.Sprintf("%s.%s", t.Feature, t.Action)] = struct{}{}
}
return EventTypes{types: types, index: index}
}
// Redact strips the event of sensitive data based on the allowlist.
//
// 🚨 SECURITY: Be very careful with the redaction modes used here, as it impacts
// what data we export from customer Sourcegraph instances.
func (e EventTypes) Redact(event *telemetrygatewayv1.Event) {
rm := redactAllSensitive
if envvar.SourcegraphDotComMode() {
rm = redactNothing
} else if e.IsAllowed(event) {
rm = redactMarketing
}
redactEvent(event, rm)
}
// IsAllowed indicates an event is on the sensitive telemetry allowlist.
func (e EventTypes) IsAllowed(event *telemetrygatewayv1.Event) bool {
key := fmt.Sprintf("%s.%s", event.GetFeature(), event.GetAction())
_, allowed := e.index[key]
return allowed
}
func (e EventTypes) validate() error {
for _, t := range e.types {
if err := t.validate(); err != nil {
return err
}
}
return nil
}
type EventType struct {
Feature string
Action string
// Future: maybe restrict to specific, known private metadata fields as well
}
func (e EventType) validate() error {
if e.Feature == "" || e.Action == "" {
return errors.New("feature and action are required")
}
return nil
}
func init() {
if err := AllowedEventTypes().validate(); err != nil {
panic(errors.Wrap(err, "AllowedEvents has invalid event(s)"))
}
}

View File

@ -3,12 +3,16 @@ load("//dev:go_defs.bzl", "go_test")
go_library(
name = "teestore",
srcs = ["teestore.go"],
srcs = [
"option.go",
"teestore.go",
],
importpath = "github.com/sourcegraph/sourcegraph/internal/telemetry/teestore",
visibility = ["//:__subpackages__"],
deps = [
"//internal/database",
"//internal/featureflag",
"//internal/telemetry/sensitivemetadataallowlist",
"//internal/telemetrygateway/v1:telemetrygateway",
"//lib/errors",
"//lib/pointers",

View File

@ -0,0 +1,21 @@
package teestore
import "context"
type contextKey int
const withoutV1Key contextKey = iota
// WithoutV1 adds a special flag to context that indicates to an underlying
// events teestore.Store that it should not persist the event as a V1 event.
//
// This is useful for callsites where the shape of the legacy event must be
// preserved, such that it continues to be logged manually.
func WithoutV1(ctx context.Context) context.Context {
return context.WithValue(ctx, withoutV1Key, true)
}
func shouldDisableV1(ctx context.Context) bool {
v, ok := ctx.Value(withoutV1Key).(bool)
return ok && v
}

View File

@ -11,6 +11,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/featureflag"
"github.com/sourcegraph/sourcegraph/internal/telemetry/sensitivemetadataallowlist"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/pointers"
@ -19,20 +20,16 @@ import (
// Store tees events into both the event_logs table and the new telemetry export
// queue, translating the message into the existing event_logs format on a
// best-effort basis.
type Store interface {
StoreEvents(context.Context, []*telemetrygatewayv1.Event) error
}
type store struct {
type Store struct {
exportQueue database.TelemetryEventsExportQueueStore
eventLogs database.EventLogStore
}
func NewStore(exportQueue database.TelemetryEventsExportQueueStore, eventLogs database.EventLogStore) Store {
return &store{exportQueue, eventLogs}
func NewStore(exportQueue database.TelemetryEventsExportQueueStore, eventLogs database.EventLogStore) *Store {
return &Store{exportQueue, eventLogs}
}
func (s *store) StoreEvents(ctx context.Context, events []*telemetrygatewayv1.Event) error {
func (s *Store) StoreEvents(ctx context.Context, events []*telemetrygatewayv1.Event) error {
// Write to both stores at the same time.
wg := pool.New().WithErrors()
wg.Go(func() error {
@ -41,16 +38,20 @@ func (s *store) StoreEvents(ctx context.Context, events []*telemetrygatewayv1.Ev
}
return nil
})
wg.Go(func() error {
if err := s.eventLogs.BulkInsert(ctx, toEventLogs(time.Now, events)); err != nil {
return errors.Wrap(err, "bulk inserting events logs")
}
return nil
})
if !shouldDisableV1(ctx) {
wg.Go(func() error {
if err := s.eventLogs.BulkInsert(ctx, toEventLogs(time.Now, events)); err != nil {
return errors.Wrap(err, "bulk inserting events logs")
}
return nil
})
}
return wg.Wait()
}
func toEventLogs(now func() time.Time, telemetryEvents []*telemetrygatewayv1.Event) []*database.Event {
sensitiveMetadataAllowlist := sensitivemetadataallowlist.AllowedEventTypes()
eventLogs := make([]*database.Event, len(telemetryEvents))
for i, e := range telemetryEvents {
// Note that all generated proto getters are nil-safe, so use those to
@ -60,7 +61,7 @@ func toEventLogs(now func() time.Time, telemetryEvents []*telemetrygatewayv1.Eve
InsertID: nil, // not required on insert
// Identifiers
Name: fmt.Sprintf("%s%s.%s", maybeSourceEventNamePrefix(e.GetSource()), e.GetFeature(), e.GetAction()),
Name: fmt.Sprintf("V2:%s.%s", e.GetFeature(), e.GetAction()),
Timestamp: func() time.Time {
if e.GetTimestamp() == nil {
return now()
@ -91,6 +92,11 @@ func toEventLogs(now func() time.Time, telemetryEvents []*telemetrygatewayv1.Eve
if len(md) == 0 {
return nil
}
// Attach a simple indicator to denote if this metadata will
// be exported.
md["privateMetadata.export"] = sensitiveMetadataAllowlist.IsAllowed(e)
data, err := json.Marshal(md)
if err != nil {
data, _ = json.Marshal(map[string]string{"marshal.error": err.Error()})
@ -142,10 +148,3 @@ func toEventLogs(now func() time.Time, telemetryEvents []*telemetrygatewayv1.Eve
}
return eventLogs
}
func maybeSourceEventNamePrefix(s *telemetrygatewayv1.EventSource) string {
if name := s.GetClient().GetName(); name != "" {
return name + ":"
}
return ""
}

View File

@ -33,7 +33,7 @@ func TestToEventLogs(t *testing.T) {
expectEventLogs: autogold.Expect(`[
{
"ID": 0,
"Name": ".",
"Name": "V2:.",
"URL": "",
"UserID": 0,
"AnonymousUserID": "",
@ -61,7 +61,7 @@ func TestToEventLogs(t *testing.T) {
expectEventLogs: autogold.Expect(`[
{
"ID": 0,
"Name": ".",
"Name": "V2:.",
"URL": "",
"UserID": 0,
"AnonymousUserID": "",
@ -89,7 +89,7 @@ func TestToEventLogs(t *testing.T) {
Id: "1",
Timestamp: timestamppb.New(time.Date(2022, 11, 2, 1, 0, 0, 0, time.UTC)),
Feature: "CodeSearch",
Action: "Seach",
Action: "Search",
Source: &telemetrygatewayv1.EventSource{
Client: &telemetrygatewayv1.EventSource_Client{
Name: "VSCODE",
@ -120,12 +120,13 @@ func TestToEventLogs(t *testing.T) {
expectEventLogs: autogold.Expect(`[
{
"ID": 0,
"Name": "VSCODE:CodeSearch.Seach",
"Name": "V2:CodeSearch.Search",
"URL": "sourcegraph.com/foobar",
"UserID": 1234,
"AnonymousUserID": "anonymous",
"Argument": {
"private": "sensitive-data"
"private": "sensitive-data",
"privateMetadata.export": false
},
"PublicArgument": {
"public": 2

View File

@ -7,8 +7,6 @@ import (
"context"
"time"
"github.com/sourcegraph/sourcegraph/internal/telemetry/teestore"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
)
@ -45,19 +43,28 @@ type EventParameters struct {
BillingMetadata *EventBillingMetadata
}
type EventsStore interface {
StoreEvents(context.Context, []*telemetrygatewayv1.Event) error
}
// EventRecorder is for creating and recording telemetry events in the backend
// using Telemetry V2, which exports events to Sourcergraph.
type EventRecorder struct{ teestore teestore.Store }
type EventRecorder struct{ store EventsStore }
// ❗ Experimental - do not use!
func NewEventRecorder(store teestore.Store) *EventRecorder {
return &EventRecorder{teestore: store}
// NewEventRecorder creates a custom event recorder backed by a store
// implementation. In general, prefer to use the telemetryrecorder.New()
// constructor instead.
//
// If you don't care about event recording failures, consider using a
// BestEffortEventRecorder instead.
func NewEventRecorder(store EventsStore) *EventRecorder {
return &EventRecorder{store: store}
}
// Record records a single telemetry event with the context's Sourcegraph
// actor.
func (r *EventRecorder) Record(ctx context.Context, feature eventFeature, action eventAction, parameters EventParameters) error {
return r.teestore.StoreEvents(ctx, []*telemetrygatewayv1.Event{
return r.store.StoreEvents(ctx, []*telemetrygatewayv1.Event{
newTelemetryGatewayEvent(ctx, time.Now(), telemetrygatewayv1.DefaultEventIDFunc, feature, action, parameters),
})
}
@ -72,5 +79,5 @@ func (r *EventRecorder) BatchRecord(ctx context.Context, events ...Event) error
for i, e := range events {
rawEvents[i] = newTelemetryGatewayEvent(ctx, time.Now(), telemetrygatewayv1.DefaultEventIDFunc, e.Feature, e.Action, e.Parameters)
}
return r.teestore.StoreEvents(ctx, rawEvents)
return r.store.StoreEvents(ctx, rawEvents)
}

View File

@ -64,4 +64,17 @@ func TestRecorderEndToEnd(t *testing.T) {
require.NoError(t, err)
assert.Len(t, telemetryEvents, wantEvents)
})
t.Run("record without v1", func(t *testing.T) {
ctx := teestore.WithoutV1(ctx)
assert.NoError(t, recorder.Record(ctx, "Test", "Action1", telemetry.EventParameters{}))
telemetryEvents, err := db.TelemetryEventsExportQueue().ListForExport(ctx, 999)
require.NoError(t, err)
assert.Len(t, telemetryEvents, wantEvents+1)
eventLogs, err := db.EventLogs().ListAll(ctx, database.EventLogsListOptions{UserID: userID})
require.NoError(t, err)
assert.Len(t, eventLogs, wantEvents) // v1 unchanged
})
}

View File

@ -0,0 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "telemetryrecorder",
srcs = ["telemetryrecorder.go"],
importpath = "github.com/sourcegraph/sourcegraph/internal/telemetry/telemetryrecorder",
visibility = ["//:__subpackages__"],
deps = [
"//internal/database",
"//internal/telemetry",
"//internal/telemetry/teestore",
"@com_github_sourcegraph_log//:log",
],
)

View File

@ -0,0 +1,34 @@
// Package telemetryrecorder provides default constructors for telemetry
// recorders.
//
// This package partly exists to avoid dependency cycles with the database
// package and the telemetry package.
package telemetryrecorder
import (
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/telemetry"
"github.com/sourcegraph/sourcegraph/internal/telemetry/teestore"
)
// New creates a default EventRecorder for Telemetry V2, which exports recorded
// events to Sourcegraph's Telemetry Gateway service.
//
// The current defaults tee events to both the legacy event_logs table, as well
// as the new Telemetry Gateway export queue.
func New(db database.DB) *telemetry.EventRecorder {
return telemetry.NewEventRecorder(teestore.NewStore(db.TelemetryEventsExportQueue(), db.EventLogs()))
}
// New creates a default BestEffortEventRecorder for Telemetry V2, which exports
// recorded events to Sourcegraph's Telemetry Gateway service.
//
// The current defaults tee events to both the legacy event_logs table, as well
// as the new Telemetry Gateway export queue.
func NewBestEffort(logger log.Logger, db database.DB) *telemetry.BestEffortEventRecorder {
return telemetry.NewBestEffortEventRecorder(
logger.Scoped("telemetry", "telemetry event recorder"),
New(db))
}

View File

@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "telemetrygateway",
srcs = [
"exporter.go",
"identifier.go",
],
importpath = "github.com/sourcegraph/sourcegraph/internal/telemetrygateway",
visibility = ["//:__subpackages__"],
deps = [
"//internal/conf/conftypes",
"//internal/env",
"//internal/grpc/chunk",
"//internal/grpc/defaults",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/trace",
"//lib/errors",
"@com_github_google_uuid//:uuid",
"@com_github_sourcegraph_log//:log",
"@io_opentelemetry_go_otel//attribute",
"@org_golang_google_grpc//:go_default_library",
],
)

View File

@ -0,0 +1,175 @@
package telemetrygateway
import (
"context"
"io"
"net/url"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
"github.com/google/uuid"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/grpc/chunk"
"github.com/sourcegraph/sourcegraph/internal/grpc/defaults"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type Exporter interface {
ExportEvents(context.Context, []*telemetrygatewayv1.Event) ([]string, error)
Close() error
}
func NewExporter(ctx context.Context, logger log.Logger, c conftypes.SiteConfigQuerier, exportAddress string) (Exporter, error) {
u, err := url.Parse(exportAddress)
if err != nil {
return nil, errors.Wrap(err, "invalid export address")
}
insecureTarget := u.Scheme != "https"
if insecureTarget && !env.InsecureDev {
return nil, errors.Wrap(err, "insecure export address used outside of dev mode")
}
// TODO(@bobheadxi): Maybe don't use defaults.DialOptions etc, which are
// geared towards in-Sourcegraph services.
var opts []grpc.DialOption
if insecureTarget {
opts = defaults.DialOptions(logger)
} else {
opts = defaults.ExternalDialOptions(logger)
}
conn, err := grpc.DialContext(ctx, u.Host, opts...)
if err != nil {
return nil, errors.Wrap(err, "dialing telemetry gateway")
}
return &exporter{
client: telemetrygatewayv1.NewTelemeteryGatewayServiceClient(conn),
conf: c,
conn: conn,
}, nil
}
type exporter struct {
client telemetrygatewayv1.TelemeteryGatewayServiceClient
conf conftypes.SiteConfigQuerier
conn *grpc.ClientConn
}
func (e *exporter) ExportEvents(ctx context.Context, events []*telemetrygatewayv1.Event) ([]string, error) {
tr, ctx := trace.New(ctx, "ExportEvents", attribute.Int("events", len(events)))
defer tr.End()
identifier, err := newIdentifier(e.conf)
if err != nil {
tr.SetError(err)
return nil, err
}
var requestID string
if tr.IsRecording() {
requestID = tr.SpanContext().TraceID().String()
} else {
requestID = uuid.NewString()
}
succeeded, err := e.doExportEvents(ctx, requestID, identifier, events)
if err != nil {
tr.SetError(err)
// Surface request ID to help us correlate log entries more easily on
// our end, because Telemetry Gateway doesn't return granular failure
// details.
return succeeded, errors.Wrapf(err, "request %q", requestID)
}
return succeeded, nil
}
// doExportEvents makes it easier for us to wrap all errors in our request ID
// for ease of investigating failures.
func (e *exporter) doExportEvents(
ctx context.Context,
requestID string,
identifier *telemetrygatewayv1.Identifier,
events []*telemetrygatewayv1.Event,
) ([]string, error) {
// Start the stream
stream, err := e.client.RecordEvents(ctx)
if err != nil {
return nil, errors.Wrap(err, "start export")
}
// Send initial metadata
if err := stream.Send(&telemetrygatewayv1.RecordEventsRequest{
Payload: &telemetrygatewayv1.RecordEventsRequest_Metadata{
Metadata: &telemetrygatewayv1.RecordEventsRequestMetadata{
RequestId: requestID,
Identifier: identifier,
},
},
}); err != nil {
return nil, errors.Wrap(err, "send initial metadata")
}
// Set up a callback that makes sure we pick up all responses from the
// server.
collectResults := func() ([]string, error) {
// We're collecting results now - end the request send stream. From here,
// the server will eventually get io.EOF and return, then we will eventually
// get an io.EOF and return. Discard the error because we don't really
// care - in examples, the error gets discarded as well:
// https://github.com/grpc/grpc-go/blob/130bc4281c39ac1ed287ec988364d36322d3cd34/examples/route_guide/client/client.go#L145
//
// If anything goes wrong stream.Recv() will let us know.
_ = stream.CloseSend()
// Wait for responses from server.
succeededEvents := make([]string, 0, len(events))
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return succeededEvents, err
}
if len(resp.GetSucceededEvents()) > 0 {
succeededEvents = append(succeededEvents, resp.GetSucceededEvents()...)
}
}
if len(succeededEvents) < len(events) {
return succeededEvents, errors.Newf("%d events did not get recorded successfully",
len(events)-len(succeededEvents))
}
return succeededEvents, nil
}
// Start streaming our set of events, chunking them based on message size
// as determined internally by chunk.Chunker.
chunker := chunk.New(func(chunkedEvents []*telemetrygatewayv1.Event) error {
return stream.Send(&telemetrygatewayv1.RecordEventsRequest{
Payload: &telemetrygatewayv1.RecordEventsRequest_Events{
Events: &telemetrygatewayv1.RecordEventsRequest_EventsPayload{
Events: chunkedEvents,
},
},
})
})
if err := chunker.Send(events...); err != nil {
succeeded, _ := collectResults()
return succeeded, errors.Wrap(err, "chunk and send events")
}
if err := chunker.Flush(); err != nil {
succeeded, _ := collectResults()
return succeeded, errors.Wrap(err, "flush events")
}
return collectResults()
}
func (e *exporter) Close() error { return e.conn.Close() }

View File

@ -0,0 +1,23 @@
package telemetrygateway
import (
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func newIdentifier(c conftypes.SiteConfigQuerier) (*telemetrygatewayv1.Identifier, error) {
if lk := c.SiteConfig().LicenseKey; lk != "" {
return &telemetrygatewayv1.Identifier{
Identifier: &telemetrygatewayv1.Identifier_LicensedInstance{
LicensedInstance: &telemetrygatewayv1.Identifier_LicensedInstanceIdentifier{
LicenseKey: lk,
},
},
}, nil
}
// TODO: Add support for unlicensed instances
return nil, errors.New("cannot infer an identifer for this instance")
}

View File

@ -34,12 +34,16 @@ go_library(
importpath = "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1",
visibility = [
"//cmd/frontend/internal/telemetry/resolvers:__pkg__",
"//cmd/telemetry-gateway/internal/events:__pkg__",
"//cmd/telemetry-gateway/internal/server:__pkg__",
"//cmd/telemetry-gateway/shared:__pkg__",
"//cmd/telemetrygateway/server:__pkg__",
"//cmd/telemetrygateway/shared:__pkg__",
"//internal/api:__pkg__",
"//internal/database:__pkg__",
"//internal/extsvc/gitolite:__pkg__",
"//internal/telemetry:__pkg__",
"//internal/telemetry/sensitivemetadataallowlist:__pkg__",
"//internal/telemetry/teestore:__pkg__",
"//internal/telemetrygateway:__pkg__",
"//internal/telemetrygateway/gitdomain:__pkg__",

View File

@ -15,7 +15,7 @@ func NonZeroPtr[T comparable](val T) *T {
return Ptr(val)
}
// Deref safely dereferences a pointer. If pointer is nil, returns zero value,
// Deref safely dereferences a pointer. If pointer is nil, returns default value,
// otherwise returns dereferenced value.
func Deref[T any](v *T, defaultValue T) T {
if v != nil {

View File

@ -132,6 +132,8 @@ env:
GRPC_INTERNAL_ERROR_LOGGING_LOG_PROTOBUF_MESSAGES_JSON_TRUNCATION_SIZE_BYTES: "1KB"
GRPC_INTERNAL_ERROR_LOGGING_LOG_PROTOBUF_MESSAGES_HANDLING_MAX_MESSAGE_SIZE_BYTES: "100MB"
TELEMETRY_GATEWAY_EXPORTER_EXPORT_ADDR: "http://127.0.0.1:10080"
commands:
server:
description: Run an all-in-one sourcegraph/server image
@ -312,6 +314,31 @@ commands:
- internal
- cmd/cody-gateway
telemetry-gateway:
cmd: |
# Telemetry Gateway needs this to parse and validate incoming license keys.
export SOURCEGRAPH_LICENSE_GENERATION_KEY=$(cat ../dev-private/enterprise/dev/test-license-generation-key.pem)
.bin/telemetry-gateway
install: |
if [ -n "$DELVE" ]; then
export GCFLAGS='all=-N -l'
fi
go build -gcflags="$GCFLAGS" -o .bin/telemetry-gateway github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway
checkBinary: .bin/telemetry-gateway
env:
PORT: "10080"
DIAGNOSTICS_SECRET: sekret
TELEMETRY_GATEWAY_EVENTS_PUBSUB_ENABLED: false
SRC_LOG_LEVEL: info
# Enables metrics in dev via debugserver
SRC_PROF_HTTP: "127.0.0.1:6080"
GRPC_WEB_UI_ENABLED: true
watch:
- lib
- internal
- cmd/telemetry-gateway
- internal/telemetrygateway
pings:
cmd: |
.bin/pings
@ -1080,6 +1107,7 @@ commandsets:
- zoekt-web-1
- blobstore
- embeddings
- telemetry-gateway
env:
DISABLE_CODE_INSIGHTS_HISTORICAL: false
DISABLE_CODE_INSIGHTS: false
@ -1118,6 +1146,7 @@ commandsets:
- blobstore
- cody-gateway
- embeddings
- telemetry-gateway
env:
SOURCEGRAPHDOTCOM_MODE: true