From 9d0e310ee2376d8f948dabd40f89985a8a8ef676 Mon Sep 17 00:00:00 2001 From: Joe Chen Date: Wed, 20 Sep 2023 20:17:37 -0400 Subject: [PATCH] pings: expose OpenTelemetry metrics (#56875) --- cmd/pings/shared/BUILD.bazel | 10 ++++ cmd/pings/shared/config.go | 13 +++++ cmd/pings/shared/main.go | 41 +++++++++++++- cmd/pings/shared/metrics.go | 97 ++++++++++++++++++++++++++++++++ internal/updatecheck/BUILD.bazel | 2 +- internal/updatecheck/handler.go | 46 ++++++--------- sg.config.yaml | 2 + 7 files changed, 180 insertions(+), 31 deletions(-) create mode 100644 cmd/pings/shared/metrics.go diff --git a/cmd/pings/shared/BUILD.bazel b/cmd/pings/shared/BUILD.bazel index 1fb1b0c77c6..8aa2da06bc4 100644 --- a/cmd/pings/shared/BUILD.bazel +++ b/cmd/pings/shared/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "config.go", "main.go", + "metrics.go", "service.go", ], importpath = "github.com/sourcegraph/sourcegraph/cmd/pings/shared", @@ -19,10 +20,19 @@ go_library( "//internal/profiler", "//internal/pubsub", "//internal/service", + "//internal/tracer/oteldefaults/exporters", "//internal/updatecheck", "//internal/version", "//lib/errors", + "@com_github_googlecloudplatform_opentelemetry_operations_go_exporter_metric//:metric", "@com_github_gorilla_mux//:mux", + "@com_github_sourcegraph_conc//:conc", "@com_github_sourcegraph_log//:log", + "@io_opentelemetry_go_contrib_detectors_gcp//:gcp", + "@io_opentelemetry_go_otel//:otel", + "@io_opentelemetry_go_otel//semconv/v1.4.0:v1_4_0", + "@io_opentelemetry_go_otel_metric//:metric", + "@io_opentelemetry_go_otel_sdk//resource", + "@io_opentelemetry_go_otel_sdk_metric//:metric", ], ) diff --git a/cmd/pings/shared/config.go b/cmd/pings/shared/config.go index d576f9d1c9a..69362ecf7d4 100644 --- a/cmd/pings/shared/config.go +++ b/cmd/pings/shared/config.go @@ -1,6 +1,8 @@ package shared import ( + "os" + "github.com/sourcegraph/sourcegraph/internal/env" ) @@ -14,6 +16,12 @@ type Config struct { ProjectID string TopicID string } + + OpenTelemetry OpenTelemetryConfig +} + +type OpenTelemetryConfig struct { + GCPProjectID string } func (c *Config) Load() { @@ -23,4 +31,9 @@ func (c *Config) Load() { c.PubSub.ProjectID = c.Get("PINGS_PUBSUB_PROJECT_ID", "", "The project ID for the Pub/Sub.") c.PubSub.TopicID = c.Get("PINGS_PUBSUB_TOPIC_ID", "", "The topic ID for the Pub/Sub.") + + c.OpenTelemetry.GCPProjectID = c.GetOptional("PINGS_OTEL_GCP_PROJECT_ID", "Google Cloud Traces project ID.") + if c.OpenTelemetry.GCPProjectID == "" { + c.OpenTelemetry.GCPProjectID = os.Getenv("GOOGLE_CLOUD_PROJECT") + } } diff --git a/cmd/pings/shared/main.go b/cmd/pings/shared/main.go index fe4515b673d..512914a5a7a 100644 --- a/cmd/pings/shared/main.go +++ b/cmd/pings/shared/main.go @@ -10,6 +10,8 @@ import ( "github.com/gorilla/mux" "github.com/sourcegraph/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "github.com/sourcegraph/sourcegraph/cmd/frontend/hubspot/hubspotutil" "github.com/sourcegraph/sourcegraph/internal/goroutine" @@ -26,7 +28,13 @@ import ( func Main(ctx context.Context, obctx *observation.Context, ready service.ReadyFunc, config *Config) error { profiler.Init() - // Initialize our server + shutdownOtel, err := initOpenTelemetry(ctx, obctx.Logger, config.OpenTelemetry) + if err != nil { + return errors.Wrap(err, "initOpenTelemetry") + } + defer shutdownOtel() + + // Initialize HTTP server serverHandler, err := newServerHandler(obctx.Logger, config) if err != nil { return errors.Errorf("create server handler: %v", err) @@ -51,6 +59,8 @@ func Main(ctx context.Context, obctx *observation.Context, ready service.ReadyFu return nil } +var meter = otel.GetMeterProvider().Meter("pings/shared") + func newServerHandler(logger log.Logger, config *Config) (http.Handler, error) { r := mux.NewRouter() @@ -115,10 +125,37 @@ func newServerHandler(logger log.Logger, config *Config) (http.Handler, error) { } return }) + + requestCounter, err := meter.Int64Counter( + "pings.request_count", + metric.WithDescription("number of requests to the update check handler"), + ) + if err != nil { + return nil, errors.Errorf("create request counter: %v", err) + } + requestHasUpdateCounter, err := meter.Int64Counter( + "pings.request_has_update_count", + metric.WithDescription("number of requests to the update check handler where an update is available"), + ) + if err != nil { + return nil, errors.Errorf("create request has update counter: %v", err) + } + errorCounter, err := meter.Int64Counter( + "pings.error_count", + metric.WithDescription("number of errors that occur while publishing server pings"), + ) + if err != nil { + return nil, errors.Errorf("create request counter: %v", err) + } + meter := &updatecheck.Meter{ + RequestCounter: requestCounter, + RequestHasUpdateCounter: requestHasUpdateCounter, + ErrorCounter: errorCounter, + } r.Path("/updates"). Methods(http.MethodGet, http.MethodPost). HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - updatecheck.Handle(logger, pubsubClient, w, r) + updatecheck.Handle(logger, pubsubClient, meter, w, r) }) return r, nil } diff --git a/cmd/pings/shared/metrics.go b/cmd/pings/shared/metrics.go new file mode 100644 index 00000000000..b452bbbf83f --- /dev/null +++ b/cmd/pings/shared/metrics.go @@ -0,0 +1,97 @@ +package shared + +import ( + "context" + "time" + + gcpmetricexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric" + "github.com/sourcegraph/conc" + "github.com/sourcegraph/log" + "go.opentelemetry.io/contrib/detectors/gcp" + "go.opentelemetry.io/otel" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + + "github.com/sourcegraph/sourcegraph/internal/tracer/oteldefaults/exporters" + "github.com/sourcegraph/sourcegraph/internal/version" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +func initOpenTelemetry(ctx context.Context, logger log.Logger, config OpenTelemetryConfig) (func(), error) { + res, err := getOpenTelemetryResource(ctx) + if err != nil { + return nil, err + } + + shutdownMetrics, err := maybeEnableMetrics(ctx, + logger.Scoped("metrics", "OpenTelemetry metrics"), + config, res) + if err != nil { + return nil, errors.Wrap(err, "maybeEnableMetrics") + } + + return func() { + var wg conc.WaitGroup + wg.Go(shutdownMetrics) + wg.Wait() + }, nil +} + +func getOpenTelemetryResource(ctx context.Context) (*resource.Resource, error) { + // Identify your application using resource detection + return resource.New(ctx, + // Use the GCP resource detector to detect information about the GCP platform + resource.WithDetectors(gcp.NewDetector()), + // Keep the default detectors + resource.WithTelemetrySDK(), + // Add your own custom attributes to identify your application + resource.WithAttributes( + semconv.ServiceNameKey.String("pings"), + semconv.ServiceVersionKey.String(version.Version()), + ), + ) +} + +func maybeEnableMetrics(_ context.Context, logger log.Logger, config OpenTelemetryConfig, otelResource *resource.Resource) (func(), error) { + var reader sdkmetric.Reader + if config.GCPProjectID != "" { + logger.Info("initializing GCP trace exporter", log.String("projectID", config.GCPProjectID)) + exporter, err := gcpmetricexporter.New( + gcpmetricexporter.WithProjectID(config.GCPProjectID)) + if err != nil { + return nil, errors.Wrap(err, "gcpmetricexporter.New") + } + reader = sdkmetric.NewPeriodicReader(exporter, + sdkmetric.WithInterval(30*time.Second)) + } else { + logger.Info("initializing Prometheus exporter") + var err error + reader, err = exporters.NewPrometheusExporter() + if err != nil { + return nil, errors.Wrap(err, "exporters.NewPrometheusExporter") + } + } + + // Create and set global tracer + provider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + sdkmetric.WithResource(otelResource)) + otel.SetMeterProvider(provider) + + logger.Info("metrics configured") + return func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + start := time.Now() + logger.Info("Shutting down metrics") + if err := provider.ForceFlush(shutdownCtx); err != nil { + logger.Warn("error occurred force-flushing metrics", log.Error(err)) + } + if err := provider.Shutdown(shutdownCtx); err != nil { + logger.Warn("error occurred shutting down metrics", log.Error(err)) + } + logger.Info("metrics shut down", log.Duration("elapsed", time.Since(start))) + }, nil +} diff --git a/internal/updatecheck/BUILD.bazel b/internal/updatecheck/BUILD.bazel index 6df38f54da0..1e20cef59b0 100644 --- a/internal/updatecheck/BUILD.bazel +++ b/internal/updatecheck/BUILD.bazel @@ -36,8 +36,8 @@ go_library( "@com_github_coreos_go_semver//semver", "@com_github_gomodule_redigo//redis", "@com_github_prometheus_client_golang//prometheus", - "@com_github_prometheus_client_golang//prometheus/promauto", "@com_github_sourcegraph_log//:log", + "@io_opentelemetry_go_otel_metric//:metric", ], ) diff --git a/internal/updatecheck/handler.go b/internal/updatecheck/handler.go index 1058237120a..65d37c11ed0 100644 --- a/internal/updatecheck/handler.go +++ b/internal/updatecheck/handler.go @@ -14,8 +14,7 @@ import ( "time" "github.com/coreos/go-semver/semver" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel/metric" "github.com/sourcegraph/log" @@ -87,10 +86,16 @@ func ForwardHandler() (http.HandlerFunc, error) { }, nil } +type Meter struct { + RequestCounter metric.Int64Counter + RequestHasUpdateCounter metric.Int64Counter + ErrorCounter metric.Int64Counter +} + // Handle handles the ping requests and responds with information about software // updates for Sourcegraph. -func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, w http.ResponseWriter, r *http.Request) { - requestCounter.Inc() +func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, meter *Meter, w http.ResponseWriter, r *http.Request) { + meter.RequestCounter.Add(r.Context(), 1) pr, err := readPingRequest(r) if err != nil { @@ -119,7 +124,7 @@ func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, w http.ResponseW hasUpdate, err := canUpdate(pr.ClientVersionString, pingResponse, pr.DeployType) // Always log, even on malformed version strings - logPing(logger, pubsubClient, r, pr, hasUpdate) + logPing(logger, pubsubClient, meter, r, pr, hasUpdate) if err != nil { http.Error(w, pr.ClientVersionString+" is a bad version string: "+err.Error(), http.StatusBadRequest) @@ -140,7 +145,7 @@ func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, w http.ResponseW // the user's instance may have unseen notification messages. if deploy.IsDeployTypeApp(pr.DeployType) { if hasUpdate { - requestHasUpdateCounter.Inc() + meter.RequestHasUpdateCounter.Add(r.Context(), 1) } w.Header().Set("content-type", "application/json; charset=utf-8") _, _ = w.Write(body) @@ -153,7 +158,7 @@ func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, w http.ResponseW return } w.Header().Set("content-type", "application/json; charset=utf-8") - requestHasUpdateCounter.Inc() + meter.RequestHasUpdateCounter.Add(r.Context(), 1) _, _ = w.Write(body) } @@ -387,12 +392,12 @@ type pingPayload struct { RepoMetadataUsage json.RawMessage `json:"repo_metadata_usage"` } -func logPing(logger log.Logger, pubsubClient pubsub.TopicClient, r *http.Request, pr *pingRequest, hasUpdate bool) { +func logPing(logger log.Logger, pubsubClient pubsub.TopicClient, meter *Meter, r *http.Request, pr *pingRequest, hasUpdate bool) { logger = logger.Scoped("logPing", "logs ping requests") defer func() { - if r := recover(); r != nil { - logger.Warn("panic", log.String("recover", fmt.Sprintf("%+v", r))) - errorCounter.Inc() + if err := recover(); err != nil { + logger.Warn("panic", log.String("recover", fmt.Sprintf("%+v", err))) + meter.ErrorCounter.Add(r.Context(), 1) } }() @@ -414,14 +419,14 @@ func logPing(logger log.Logger, pubsubClient pubsub.TopicClient, r *http.Request message, err := marshalPing(pr, hasUpdate, clientAddr, time.Now()) if err != nil { - errorCounter.Inc() + meter.ErrorCounter.Add(r.Context(), 1) logger.Error("failed to marshal payload", log.Error(err)) return } err = pubsubClient.Publish(context.Background(), message) if err != nil { - errorCounter.Inc() + meter.ErrorCounter.Add(r.Context(), 1) logger.Error("failed to publish", log.String("message", string(message)), log.Error(err)) return } @@ -782,18 +787,3 @@ func reserializeCodyUsage(payload json.RawMessage) (json.RawMessage, error) { return json.Marshal(singlePeriodUsage) } - -var ( - requestCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "src_updatecheck_server_requests", - Help: "Number of requests to the update check handler.", - }) - requestHasUpdateCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "src_updatecheck_server_requests_has_update", - Help: "Number of requests to the update check handler where an update is available.", - }) - errorCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "src_updatecheck_server_errors", - Help: "Number of errors that occur while publishing server pings.", - }) -) diff --git a/sg.config.yaml b/sg.config.yaml index 7e8379d0ee6..7dfd78e6660 100644 --- a/sg.config.yaml +++ b/sg.config.yaml @@ -355,6 +355,8 @@ commands: PINGS_PUBSUB_PROJECT_ID: 'telligentsourcegraph' PINGS_PUBSUB_TOPIC_ID: 'server-update-checks-test' HUBSPOT_ACCESS_TOKEN: '' + # Enables metrics in dev via debugserver + SRC_PROF_HTTP: "127.0.0.1:7011" watch: - lib - internal