mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 12:51:55 +00:00
telemetry-gateway: migrate to MSP runtime (#58814)
This change migrates Telemetry Gateway to use the MSP runtime library for service initialization, which now handles Sentry, OpenTelemetry, etc and offers a simpler interface for defining services. Because we now only expose 1 port (i.e. no debugserver port), I've made the default in local dev `6080`, because my browser was complaining about `10080`. ## Test plan ```sh sg run telemetry-gateway curl http://localhost:6080/-/version # 0.0.0+dev% curl http://localhost:6080/-/healthz # unauthorized% curl -H 'Authorization: bearer sekret' http://localhost:6080/-/healthz # healthz: ok% ``` Also visit http://localhost:6080/debug/grpcui/ and http://localhost:6080/metrics, which are expected to be enabled in local dev. Then try with full Sourcegraph stack: ``` sg start ``` <img width="660" alt="image" src="https://github.com/sourcegraph/sourcegraph/assets/23356519/9e799c58-4d02-4752-9f9f-da3108ba762f">
This commit is contained in:
parent
7b5dd65e03
commit
5fa93155fc
@ -61,12 +61,7 @@ go_library(
|
||||
importpath = "github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway",
|
||||
visibility = ["//visibility:private"],
|
||||
deps = [
|
||||
"//cmd/telemetry-gateway/shared",
|
||||
"//internal/conf",
|
||||
"//internal/env",
|
||||
"//internal/sanitycheck",
|
||||
"//internal/service/svcmain",
|
||||
"@com_github_getsentry_sentry_go//:sentry-go",
|
||||
"@com_github_sourcegraph_log//:log",
|
||||
"//cmd/telemetry-gateway/service",
|
||||
"//lib/managedservicesplatform/runtime",
|
||||
],
|
||||
)
|
||||
|
||||
@ -1,33 +1,10 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/sourcegraph/log"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/shared"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/env"
|
||||
"github.com/sourcegraph/sourcegraph/internal/sanitycheck"
|
||||
"github.com/sourcegraph/sourcegraph/internal/service/svcmain"
|
||||
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/service"
|
||||
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
|
||||
)
|
||||
|
||||
var sentryDSN = env.Get("TELEMETRY_GATEWAY_SENTRY_DSN", "", "Sentry DSN")
|
||||
|
||||
func main() {
|
||||
sanitycheck.Pass()
|
||||
svcmain.SingleServiceMainWithoutConf(shared.Service, svcmain.OutOfBandConfiguration{
|
||||
Logging: func() conf.LogSinksSource {
|
||||
if sentryDSN == "" {
|
||||
return nil
|
||||
}
|
||||
return conf.NewStaticLogsSinksSource(log.SinksConfig{
|
||||
Sentry: &log.SentrySink{
|
||||
ClientOptions: sentry.ClientOptions{
|
||||
Dsn: sentryDSN,
|
||||
},
|
||||
},
|
||||
})
|
||||
}(),
|
||||
Tracing: nil,
|
||||
})
|
||||
runtime.Start[service.Config](&service.Service{})
|
||||
}
|
||||
|
||||
52
cmd/telemetry-gateway/service/BUILD.bazel
Normal file
52
cmd/telemetry-gateway/service/BUILD.bazel
Normal file
@ -0,0 +1,52 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "shared",
|
||||
srcs = [
|
||||
"config.go",
|
||||
"service.go",
|
||||
],
|
||||
importpath = "github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/shared",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//cmd/telemetry-gateway/internal/server",
|
||||
"//internal/debugserver",
|
||||
"//internal/grpc",
|
||||
"//internal/grpc/defaults",
|
||||
"//internal/httpserver",
|
||||
"//internal/pubsub",
|
||||
"//internal/telemetrygateway/v1:telemetrygateway",
|
||||
"//internal/trace/policy",
|
||||
"//internal/version",
|
||||
"//lib/background",
|
||||
"//lib/errors",
|
||||
"//lib/managedservicesplatform/runtime",
|
||||
"@com_github_sourcegraph_log//:log",
|
||||
"@org_golang_google_grpc//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "service",
|
||||
srcs = [
|
||||
"config.go",
|
||||
"service.go",
|
||||
],
|
||||
importpath = "github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/service",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//cmd/telemetry-gateway/internal/server",
|
||||
"//internal/debugserver",
|
||||
"//internal/grpc",
|
||||
"//internal/grpc/defaults",
|
||||
"//internal/httpserver",
|
||||
"//internal/pubsub",
|
||||
"//internal/telemetrygateway/v1:telemetrygateway",
|
||||
"//internal/trace/policy",
|
||||
"//internal/version",
|
||||
"//lib/background",
|
||||
"//lib/errors",
|
||||
"//lib/managedservicesplatform/runtime",
|
||||
"@com_github_sourcegraph_log//:log",
|
||||
],
|
||||
)
|
||||
24
cmd/telemetry-gateway/service/config.go
Normal file
24
cmd/telemetry-gateway/service/config.go
Normal file
@ -0,0 +1,24 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Events struct {
|
||||
PubSub struct {
|
||||
Enabled bool
|
||||
ProjectID *string
|
||||
TopicID *string
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) Load(env *runtime.Env) {
|
||||
c.Events.PubSub.Enabled = env.GetBool("TELEMETRY_GATEWAY_EVENTS_PUBSUB_ENABLED", "true",
|
||||
"If false, logs Pub/Sub messages instead of actually sending them")
|
||||
c.Events.PubSub.ProjectID = env.GetOptional("TELEMETRY_GATEWAY_EVENTS_PUBSUB_PROJECT_ID",
|
||||
"The project ID for the Pub/Sub.")
|
||||
c.Events.PubSub.TopicID = env.GetOptional("TELEMETRY_GATEWAY_EVENTS_PUBSUB_TOPIC_ID",
|
||||
"The topic ID for the Pub/Sub.")
|
||||
}
|
||||
105
cmd/telemetry-gateway/service/service.go
Normal file
105
cmd/telemetry-gateway/service/service.go
Normal file
@ -0,0 +1,105 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/sourcegraph/log"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/debugserver"
|
||||
internalgrpc "github.com/sourcegraph/sourcegraph/internal/grpc"
|
||||
"github.com/sourcegraph/sourcegraph/internal/grpc/defaults"
|
||||
"github.com/sourcegraph/sourcegraph/internal/httpserver"
|
||||
"github.com/sourcegraph/sourcegraph/internal/pubsub"
|
||||
"github.com/sourcegraph/sourcegraph/internal/trace/policy"
|
||||
"github.com/sourcegraph/sourcegraph/internal/version"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/lib/background"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/server"
|
||||
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
|
||||
)
|
||||
|
||||
type Service struct{}
|
||||
|
||||
var _ runtime.Service[Config] = (*Service)(nil)
|
||||
|
||||
func (Service) Name() string { return "telemetry-gateway" }
|
||||
func (Service) Version() string { return version.Version() }
|
||||
|
||||
func (Service) Initialize(ctx context.Context, logger log.Logger, contract runtime.Contract, config Config) (background.CombinedRoutine, error) {
|
||||
// We use Sourcegraph tracing code, so explicitly configure a trace policy
|
||||
policy.SetTracePolicy(policy.TraceAll)
|
||||
|
||||
// Prepare pubsub client
|
||||
var err error
|
||||
var eventsTopic pubsub.TopicClient
|
||||
if !config.Events.PubSub.Enabled {
|
||||
logger.Warn("pub/sub events publishing disabled, logging messages instead")
|
||||
eventsTopic = pubsub.NewLoggingTopicClient(logger)
|
||||
} else {
|
||||
eventsTopic, err = pubsub.NewTopicClient(*config.Events.PubSub.ProjectID, *config.Events.PubSub.TopicID)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("create Events Pub/Sub client: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize our gRPC server
|
||||
// TODO(@bobheadxi): Maybe don't use defaults.NewServer, which is geared
|
||||
// towards in-Sourcegraph services.
|
||||
grpcServer := defaults.NewServer(logger)
|
||||
telemetryGatewayServer, err := server.New(logger, eventsTopic)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "init telemetry gateway server")
|
||||
}
|
||||
telemetrygatewayv1.RegisterTelemeteryGatewayServiceServer(grpcServer, telemetryGatewayServer)
|
||||
|
||||
listenAddr := fmt.Sprintf(":%d", contract.Port)
|
||||
|
||||
// Set up diagnostics endpoints
|
||||
diagnosticsServer := http.NewServeMux()
|
||||
contract.RegisterDiagnosticsHandlers(diagnosticsServer, &serviceStatus{
|
||||
eventsTopic: eventsTopic,
|
||||
})
|
||||
if !contract.MSP {
|
||||
// Requires GRPC_WEB_UI_ENABLED to be set to enable - only use in local
|
||||
// development!
|
||||
grpcUI := debugserver.NewGRPCWebUIEndpoint("telemetry-gateway", listenAddr)
|
||||
diagnosticsServer.Handle(grpcUI.Path, grpcUI.Handler)
|
||||
}
|
||||
|
||||
return background.CombinedRoutine{
|
||||
httpserver.NewFromAddr(
|
||||
listenAddr,
|
||||
&http.Server{
|
||||
ReadTimeout: 2 * time.Minute,
|
||||
WriteTimeout: 2 * time.Minute,
|
||||
Handler: internalgrpc.MultiplexHandlers(
|
||||
grpcServer,
|
||||
diagnosticsServer,
|
||||
),
|
||||
},
|
||||
),
|
||||
background.CallbackRoutine{
|
||||
// No Start - serving is handled by httpserver
|
||||
StopFunc: func() { grpcServer.GracefulStop() },
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
type serviceStatus struct {
|
||||
eventsTopic pubsub.TopicClient
|
||||
}
|
||||
|
||||
var _ runtime.ServiceState = (*serviceStatus)(nil)
|
||||
|
||||
func (s *serviceStatus) Healthy(ctx context.Context) error {
|
||||
if err := s.eventsTopic.Ping(ctx); err != nil {
|
||||
return errors.Wrap(err, "eventsPubSubClient.Ping")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -1,43 +0,0 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "shared",
|
||||
srcs = [
|
||||
"config.go",
|
||||
"main.go",
|
||||
"metrics.go",
|
||||
"service.go",
|
||||
"tracing.go",
|
||||
],
|
||||
importpath = "github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/shared",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//cmd/telemetry-gateway/internal/diagnosticsserver",
|
||||
"//cmd/telemetry-gateway/internal/server",
|
||||
"//internal/debugserver",
|
||||
"//internal/env",
|
||||
"//internal/goroutine",
|
||||
"//internal/grpc",
|
||||
"//internal/grpc/defaults",
|
||||
"//internal/httpserver",
|
||||
"//internal/observation",
|
||||
"//internal/pubsub",
|
||||
"//internal/service",
|
||||
"//internal/telemetrygateway/v1:telemetrygateway",
|
||||
"//internal/trace/policy",
|
||||
"//internal/tracer/oteldefaults",
|
||||
"//internal/tracer/oteldefaults/exporters",
|
||||
"//internal/version",
|
||||
"//lib/errors",
|
||||
"@com_github_googlecloudplatform_opentelemetry_operations_go_exporter_metric//:metric",
|
||||
"@com_github_googlecloudplatform_opentelemetry_operations_go_exporter_trace//:trace",
|
||||
"@com_github_sourcegraph_conc//:conc",
|
||||
"@com_github_sourcegraph_log//:log",
|
||||
"@io_opentelemetry_go_contrib_detectors_gcp//:gcp",
|
||||
"@io_opentelemetry_go_otel//:otel",
|
||||
"@io_opentelemetry_go_otel//semconv/v1.4.0:v1_4_0",
|
||||
"@io_opentelemetry_go_otel_sdk//resource",
|
||||
"@io_opentelemetry_go_otel_sdk//trace",
|
||||
"@io_opentelemetry_go_otel_sdk_metric//:metric",
|
||||
],
|
||||
)
|
||||
@ -1,54 +0,0 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/env"
|
||||
"github.com/sourcegraph/sourcegraph/internal/trace/policy"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
env.BaseConfig
|
||||
|
||||
Port int
|
||||
DiagnosticsSecret string
|
||||
|
||||
Events struct {
|
||||
PubSub struct {
|
||||
Enabled bool
|
||||
ProjectID string
|
||||
TopicID string
|
||||
}
|
||||
}
|
||||
|
||||
OpenTelemetry OpenTelemetryConfig
|
||||
}
|
||||
|
||||
type OpenTelemetryConfig struct {
|
||||
TracePolicy policy.TracePolicy
|
||||
GCPProjectID string
|
||||
}
|
||||
|
||||
func (c *Config) Load() {
|
||||
c.Port = c.GetInt("PORT", "10080", "Port to serve Telemetry Gateway service on, generally injected by Cloud Run.")
|
||||
c.DiagnosticsSecret = c.Get("DIAGNOSTICS_SECRET", "", "Secret for accessing diagnostics - "+
|
||||
"should be used as 'Authorization: Bearer $secret' header when accessing diagnostics endpoints.")
|
||||
|
||||
c.Events.PubSub.Enabled = c.GetBool("TELEMETRY_GATEWAY_EVENTS_PUBSUB_ENABLED", "true",
|
||||
"If false, logs Pub/Sub messages instead of actually sending them")
|
||||
c.Events.PubSub.ProjectID = c.GetOptional("TELEMETRY_GATEWAY_EVENTS_PUBSUB_PROJECT_ID",
|
||||
"The project ID for the Pub/Sub.")
|
||||
c.Events.PubSub.TopicID = c.GetOptional("TELEMETRY_GATEWAY_EVENTS_PUBSUB_TOPIC_ID",
|
||||
"The topic ID for the Pub/Sub.")
|
||||
|
||||
c.OpenTelemetry.TracePolicy = policy.TracePolicy(c.Get("TELEMETRY_GATEWAY_TRACE_POLICY", "all", "Trace policy, one of 'all', 'selective', 'none'."))
|
||||
c.OpenTelemetry.GCPProjectID = c.GetOptional("TELEMETRY_GATEWAY_OTEL_GCP_PROJECT_ID", "Google Cloud Traces project ID.")
|
||||
if c.OpenTelemetry.GCPProjectID == "" {
|
||||
c.OpenTelemetry.GCPProjectID = os.Getenv("GOOGLE_CLOUD_PROJECT")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) GetListenAdress() string {
|
||||
return fmt.Sprintf(":%d", c.Port)
|
||||
}
|
||||
@ -1,133 +0,0 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/sourcegraph/conc"
|
||||
"github.com/sourcegraph/log"
|
||||
"go.opentelemetry.io/contrib/detectors/gcp"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/goroutine"
|
||||
internalgrpc "github.com/sourcegraph/sourcegraph/internal/grpc"
|
||||
"github.com/sourcegraph/sourcegraph/internal/grpc/defaults"
|
||||
"github.com/sourcegraph/sourcegraph/internal/httpserver"
|
||||
"github.com/sourcegraph/sourcegraph/internal/observation"
|
||||
"github.com/sourcegraph/sourcegraph/internal/pubsub"
|
||||
"github.com/sourcegraph/sourcegraph/internal/service"
|
||||
"github.com/sourcegraph/sourcegraph/internal/version"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/diagnosticsserver"
|
||||
"github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway/internal/server"
|
||||
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
|
||||
)
|
||||
|
||||
func Main(ctx context.Context, obctx *observation.Context, ready service.ReadyFunc, config *Config) error {
|
||||
shutdownOtel, err := initOpenTelemetry(ctx, obctx.Logger, config.OpenTelemetry)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "initOpenTelemetry")
|
||||
}
|
||||
defer shutdownOtel()
|
||||
|
||||
var eventsTopic pubsub.TopicClient
|
||||
if !config.Events.PubSub.Enabled {
|
||||
obctx.Logger.Warn("pub/sub events publishing disabled, logging messages instead")
|
||||
eventsTopic = pubsub.NewLoggingTopicClient(obctx.Logger)
|
||||
} else {
|
||||
eventsTopic, err = pubsub.NewTopicClient(config.Events.PubSub.ProjectID, config.Events.PubSub.TopicID)
|
||||
if err != nil {
|
||||
return errors.Errorf("create Events Pub/Sub client: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize our gRPC server
|
||||
// TODO(@bobheadxi): Maybe don't use defaults.NewServer, which is geared
|
||||
// towards in-Sourcegraph services.
|
||||
grpcServer := defaults.NewServer(obctx.Logger)
|
||||
defer grpcServer.GracefulStop()
|
||||
telemetryGatewayServer, err := server.New(obctx.Logger, eventsTopic)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "init telemetry gateway server")
|
||||
}
|
||||
telemetrygatewayv1.RegisterTelemeteryGatewayServiceServer(grpcServer, telemetryGatewayServer)
|
||||
|
||||
// Start up the service
|
||||
addr := config.GetListenAdress()
|
||||
server := httpserver.NewFromAddr(
|
||||
addr,
|
||||
&http.Server{
|
||||
ReadTimeout: 2 * time.Minute,
|
||||
WriteTimeout: 2 * time.Minute,
|
||||
Handler: internalgrpc.MultiplexHandlers(
|
||||
grpcServer,
|
||||
diagnosticsserver.NewDiagnosticsHandler(
|
||||
obctx.Logger,
|
||||
config.DiagnosticsSecret,
|
||||
func(ctx context.Context) error {
|
||||
if err := eventsTopic.Ping(ctx); err != nil {
|
||||
return errors.Wrap(err, "eventsPubSubClient.Ping")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
),
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
// Mark health server as ready and go!
|
||||
ready()
|
||||
obctx.Logger.Info("service ready", log.String("address", addr))
|
||||
|
||||
// Block until done
|
||||
goroutine.MonitorBackgroundRoutines(ctx, server)
|
||||
return nil
|
||||
}
|
||||
|
||||
func initOpenTelemetry(ctx context.Context, logger log.Logger, config OpenTelemetryConfig) (func(), error) {
|
||||
res, err := getOpenTelemetryResource(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Enable tracing, at this point tracing wouldn't have been enabled yet because
|
||||
// we run without conf which means Sourcegraph tracing is not enabled.
|
||||
shutdownTracing, err := maybeEnableTracing(ctx,
|
||||
logger.Scoped("tracing"),
|
||||
config, res)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "maybeEnableTracing")
|
||||
}
|
||||
|
||||
shutdownMetrics, err := maybeEnableMetrics(ctx,
|
||||
logger.Scoped("metrics"),
|
||||
config, res)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "maybeEnableMetrics")
|
||||
}
|
||||
|
||||
return func() {
|
||||
var wg conc.WaitGroup
|
||||
wg.Go(shutdownTracing)
|
||||
wg.Go(shutdownMetrics)
|
||||
wg.Wait()
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getOpenTelemetryResource(ctx context.Context) (*resource.Resource, error) {
|
||||
// Identify your application using resource detection
|
||||
return resource.New(ctx,
|
||||
// Use the GCP resource detector to detect information about the GCP platform
|
||||
resource.WithDetectors(gcp.NewDetector()),
|
||||
// Keep the default detectors
|
||||
resource.WithTelemetrySDK(),
|
||||
// Add your own custom attributes to identify your application
|
||||
resource.WithAttributes(
|
||||
semconv.ServiceNameKey.String("telemetry-gateway"),
|
||||
semconv.ServiceVersionKey.String(version.Version()),
|
||||
),
|
||||
)
|
||||
}
|
||||
@ -1,58 +0,0 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
gcpmetricexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
|
||||
"github.com/sourcegraph/log"
|
||||
"go.opentelemetry.io/otel"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/tracer/oteldefaults/exporters"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
)
|
||||
|
||||
func maybeEnableMetrics(_ context.Context, logger log.Logger, config OpenTelemetryConfig, otelResource *resource.Resource) (func(), error) {
|
||||
var reader sdkmetric.Reader
|
||||
if config.GCPProjectID != "" {
|
||||
logger.Info("initializing GCP trace exporter", log.String("projectID", config.GCPProjectID))
|
||||
exporter, err := gcpmetricexporter.New(
|
||||
gcpmetricexporter.WithProjectID(config.GCPProjectID))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "gcpmetricexporter.New")
|
||||
}
|
||||
reader = sdkmetric.NewPeriodicReader(exporter,
|
||||
sdkmetric.WithInterval(30*time.Second))
|
||||
} else {
|
||||
logger.Info("initializing Prometheus exporter")
|
||||
var err error
|
||||
reader, err = exporters.NewPrometheusExporter()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "exporters.NewPrometheusExporter")
|
||||
}
|
||||
}
|
||||
|
||||
// Create and set global tracer
|
||||
provider := sdkmetric.NewMeterProvider(
|
||||
sdkmetric.WithReader(reader),
|
||||
sdkmetric.WithResource(otelResource))
|
||||
otel.SetMeterProvider(provider)
|
||||
|
||||
logger.Info("metrics configured")
|
||||
return func() {
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
logger.Info("Shutting down metrics")
|
||||
if err := provider.ForceFlush(shutdownCtx); err != nil {
|
||||
logger.Warn("error occurred force-flushing metrics", log.Error(err))
|
||||
}
|
||||
if err := provider.Shutdown(shutdownCtx); err != nil {
|
||||
logger.Warn("error occurred shutting down metrics", log.Error(err))
|
||||
}
|
||||
logger.Info("metrics shut down", log.Duration("elapsed", time.Since(start)))
|
||||
}, nil
|
||||
}
|
||||
@ -1,31 +0,0 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/debugserver"
|
||||
"github.com/sourcegraph/sourcegraph/internal/env"
|
||||
"github.com/sourcegraph/sourcegraph/internal/observation"
|
||||
"github.com/sourcegraph/sourcegraph/internal/service"
|
||||
)
|
||||
|
||||
// Service is the shared ping service.
|
||||
var Service service.Service = svc{}
|
||||
|
||||
type svc struct{}
|
||||
|
||||
func (svc) Name() string { return "telemetry-gateway" }
|
||||
|
||||
func (svc) Configure() (env.Config, []debugserver.Endpoint) {
|
||||
c := &Config{}
|
||||
c.Load()
|
||||
return c, []debugserver.Endpoint{
|
||||
// Requires GRPC_WEB_UI_ENABLED to be set to enable - only use in local
|
||||
// development!
|
||||
debugserver.NewGRPCWebUIEndpoint("telemetry-gateway", c.GetListenAdress()),
|
||||
}
|
||||
}
|
||||
|
||||
func (svc) Start(ctx context.Context, observationCtx *observation.Context, ready service.ReadyFunc, config env.Config) error {
|
||||
return Main(ctx, observationCtx, ready, config.(*Config))
|
||||
}
|
||||
@ -1,77 +0,0 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/sourcegraph/log"
|
||||
|
||||
gcptraceexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/trace/policy"
|
||||
"github.com/sourcegraph/sourcegraph/internal/tracer/oteldefaults"
|
||||
"github.com/sourcegraph/sourcegraph/internal/tracer/oteldefaults/exporters"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
)
|
||||
|
||||
// maybeEnableTracing configures OpenTelemetry tracing if the GOOGLE_CLOUD_PROJECT is set.
|
||||
// It differs from Sourcegraph's default tracing because we need to export directly to GCP,
|
||||
// and the use case is more niche as a standalone service.
|
||||
//
|
||||
// Based on https://cloud.google.com/trace/docs/setup/go-ot
|
||||
func maybeEnableTracing(ctx context.Context, logger log.Logger, config OpenTelemetryConfig, otelResource *resource.Resource) (func(), error) {
|
||||
// Set globals
|
||||
policy.SetTracePolicy(config.TracePolicy)
|
||||
otel.SetTextMapPropagator(oteldefaults.Propagator())
|
||||
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
|
||||
logger.Debug("OpenTelemetry error", log.Error(err))
|
||||
}))
|
||||
|
||||
// Initialize exporter
|
||||
var exporter sdktrace.SpanExporter
|
||||
if config.GCPProjectID != "" {
|
||||
logger.Info("initializing GCP trace exporter", log.String("projectID", config.GCPProjectID))
|
||||
var err error
|
||||
exporter, err = gcptraceexporter.New(
|
||||
gcptraceexporter.WithProjectID(config.GCPProjectID),
|
||||
gcptraceexporter.WithErrorHandler(otel.ErrorHandlerFunc(func(err error) {
|
||||
logger.Warn("gcptraceexporter error", log.Error(err))
|
||||
})),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "gcptraceexporter.New")
|
||||
}
|
||||
} else {
|
||||
logger.Info("initializing OTLP exporter")
|
||||
var err error
|
||||
exporter, err = exporters.NewOTLPTraceExporter(ctx, logger)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "exporters.NewOTLPExporter")
|
||||
}
|
||||
}
|
||||
|
||||
// Create and set global tracer
|
||||
provider := sdktrace.NewTracerProvider(
|
||||
sdktrace.WithBatcher(exporter),
|
||||
sdktrace.WithResource(otelResource))
|
||||
otel.SetTracerProvider(provider)
|
||||
|
||||
logger.Info("tracing configured")
|
||||
return func() {
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
logger.Info("Shutting down tracing")
|
||||
if err := provider.ForceFlush(shutdownCtx); err != nil {
|
||||
logger.Warn("error occurred force-flushing traces", log.Error(err))
|
||||
}
|
||||
if err := provider.Shutdown(shutdownCtx); err != nil {
|
||||
logger.Warn("error occured shutting down tracing", log.Error(err))
|
||||
}
|
||||
logger.Info("Tracing shut down", log.Duration("elapsed", time.Since(start)))
|
||||
}, nil
|
||||
}
|
||||
@ -76,7 +76,6 @@
|
||||
- labels:
|
||||
job: telemetry-gateway
|
||||
targets:
|
||||
# cody gateway
|
||||
- host.docker.internal:6080
|
||||
- labels:
|
||||
job: msp-example
|
||||
|
||||
@ -36,6 +36,7 @@ go_library(
|
||||
"//cmd/frontend/internal/telemetry/resolvers:__pkg__",
|
||||
"//cmd/telemetry-gateway/internal/events:__pkg__",
|
||||
"//cmd/telemetry-gateway/internal/server:__pkg__",
|
||||
"//cmd/telemetry-gateway/service:__pkg__",
|
||||
"//cmd/telemetry-gateway/shared:__pkg__",
|
||||
"//cmd/telemetrygateway/server:__pkg__",
|
||||
"//cmd/telemetrygateway/shared:__pkg__",
|
||||
|
||||
@ -104,12 +104,25 @@ func (r CombinedRoutine) Stop() {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type noopRoutine struct{}
|
||||
|
||||
func (r noopRoutine) Start() {}
|
||||
func (r noopRoutine) Stop() {}
|
||||
|
||||
// NoopRoutine does nothing for start or stop.
|
||||
func NoopRoutine() Routine {
|
||||
return noopRoutine{}
|
||||
return CallbackRoutine{}
|
||||
}
|
||||
|
||||
// CallbackRoutine calls the StartFunc and StopFunc callbacks to implement a
|
||||
// Routine. Each callback may be nil.
|
||||
type CallbackRoutine struct {
|
||||
StartFunc func()
|
||||
StopFunc func()
|
||||
}
|
||||
|
||||
func (r CallbackRoutine) Start() {
|
||||
if r.StartFunc != nil {
|
||||
r.StartFunc()
|
||||
}
|
||||
}
|
||||
func (r CallbackRoutine) Stop() {
|
||||
if r.StopFunc != nil {
|
||||
r.StopFunc()
|
||||
}
|
||||
}
|
||||
|
||||
@ -69,8 +69,10 @@ func Start[
|
||||
contract := newContract(log.Scoped("msp.contract"), env, service)
|
||||
|
||||
// Enable Sentry error log reporting
|
||||
var sentryEnabled bool
|
||||
if contract.internal.sentryDSN != nil {
|
||||
liblog.Update(func() log.SinksConfig {
|
||||
sentryEnabled = true
|
||||
return log.SinksConfig{
|
||||
Sentry: &log.SentrySink{
|
||||
ClientOptions: sentry.ClientOptions{
|
||||
@ -105,6 +107,10 @@ func Start[
|
||||
}
|
||||
|
||||
// Start service routine, and block until it stops.
|
||||
logger.Info("starting service",
|
||||
log.Int("port", contract.Port),
|
||||
log.Bool("msp", contract.MSP),
|
||||
log.Bool("sentry", sentryEnabled))
|
||||
background.Monitor(ctx, routine)
|
||||
logger.Info("service stopped")
|
||||
}
|
||||
|
||||
@ -132,7 +132,7 @@ env:
|
||||
GRPC_INTERNAL_ERROR_LOGGING_LOG_PROTOBUF_MESSAGES_JSON_TRUNCATION_SIZE_BYTES: '1KB'
|
||||
GRPC_INTERNAL_ERROR_LOGGING_LOG_PROTOBUF_MESSAGES_HANDLING_MAX_MESSAGE_SIZE_BYTES: '100MB'
|
||||
|
||||
TELEMETRY_GATEWAY_EXPORTER_EXPORT_ADDR: 'http://127.0.0.1:10080'
|
||||
TELEMETRY_GATEWAY_EXPORTER_EXPORT_ADDR: 'http://127.0.0.1:6080'
|
||||
SRC_TELEMETRY_EVENTS_EXPORT_ALL: 'true'
|
||||
|
||||
# By default, allow temporary edits to external services.
|
||||
@ -332,12 +332,10 @@ commands:
|
||||
go build -gcflags="$GCFLAGS" -o .bin/telemetry-gateway github.com/sourcegraph/sourcegraph/cmd/telemetry-gateway
|
||||
checkBinary: .bin/telemetry-gateway
|
||||
env:
|
||||
PORT: '10080'
|
||||
PORT: '6080'
|
||||
DIAGNOSTICS_SECRET: sekret
|
||||
TELEMETRY_GATEWAY_EVENTS_PUBSUB_ENABLED: false
|
||||
SRC_LOG_LEVEL: info
|
||||
# Enables metrics in dev via debugserver
|
||||
SRC_PROF_HTTP: '127.0.0.1:6080'
|
||||
GRPC_WEB_UI_ENABLED: true
|
||||
watch:
|
||||
- lib
|
||||
|
||||
Loading…
Reference in New Issue
Block a user