pings: expose OpenTelemetry metrics (#56875)

This commit is contained in:
Joe Chen 2023-09-20 20:17:37 -04:00 committed by GitHub
parent ebb3b6ca4a
commit 9d0e310ee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 31 deletions

View File

@ -5,6 +5,7 @@ go_library(
srcs = [
"config.go",
"main.go",
"metrics.go",
"service.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/pings/shared",
@ -19,10 +20,19 @@ go_library(
"//internal/profiler",
"//internal/pubsub",
"//internal/service",
"//internal/tracer/oteldefaults/exporters",
"//internal/updatecheck",
"//internal/version",
"//lib/errors",
"@com_github_googlecloudplatform_opentelemetry_operations_go_exporter_metric//:metric",
"@com_github_gorilla_mux//:mux",
"@com_github_sourcegraph_conc//:conc",
"@com_github_sourcegraph_log//:log",
"@io_opentelemetry_go_contrib_detectors_gcp//:gcp",
"@io_opentelemetry_go_otel//:otel",
"@io_opentelemetry_go_otel//semconv/v1.4.0:v1_4_0",
"@io_opentelemetry_go_otel_metric//:metric",
"@io_opentelemetry_go_otel_sdk//resource",
"@io_opentelemetry_go_otel_sdk_metric//:metric",
],
)

View File

@ -1,6 +1,8 @@
package shared
import (
"os"
"github.com/sourcegraph/sourcegraph/internal/env"
)
@ -14,6 +16,12 @@ type Config struct {
ProjectID string
TopicID string
}
OpenTelemetry OpenTelemetryConfig
}
type OpenTelemetryConfig struct {
GCPProjectID string
}
func (c *Config) Load() {
@ -23,4 +31,9 @@ func (c *Config) Load() {
c.PubSub.ProjectID = c.Get("PINGS_PUBSUB_PROJECT_ID", "", "The project ID for the Pub/Sub.")
c.PubSub.TopicID = c.Get("PINGS_PUBSUB_TOPIC_ID", "", "The topic ID for the Pub/Sub.")
c.OpenTelemetry.GCPProjectID = c.GetOptional("PINGS_OTEL_GCP_PROJECT_ID", "Google Cloud Traces project ID.")
if c.OpenTelemetry.GCPProjectID == "" {
c.OpenTelemetry.GCPProjectID = os.Getenv("GOOGLE_CLOUD_PROJECT")
}
}

View File

@ -10,6 +10,8 @@ import (
"github.com/gorilla/mux"
"github.com/sourcegraph/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"github.com/sourcegraph/sourcegraph/cmd/frontend/hubspot/hubspotutil"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
@ -26,7 +28,13 @@ import (
func Main(ctx context.Context, obctx *observation.Context, ready service.ReadyFunc, config *Config) error {
profiler.Init()
// Initialize our server
shutdownOtel, err := initOpenTelemetry(ctx, obctx.Logger, config.OpenTelemetry)
if err != nil {
return errors.Wrap(err, "initOpenTelemetry")
}
defer shutdownOtel()
// Initialize HTTP server
serverHandler, err := newServerHandler(obctx.Logger, config)
if err != nil {
return errors.Errorf("create server handler: %v", err)
@ -51,6 +59,8 @@ func Main(ctx context.Context, obctx *observation.Context, ready service.ReadyFu
return nil
}
var meter = otel.GetMeterProvider().Meter("pings/shared")
func newServerHandler(logger log.Logger, config *Config) (http.Handler, error) {
r := mux.NewRouter()
@ -115,10 +125,37 @@ func newServerHandler(logger log.Logger, config *Config) (http.Handler, error) {
}
return
})
requestCounter, err := meter.Int64Counter(
"pings.request_count",
metric.WithDescription("number of requests to the update check handler"),
)
if err != nil {
return nil, errors.Errorf("create request counter: %v", err)
}
requestHasUpdateCounter, err := meter.Int64Counter(
"pings.request_has_update_count",
metric.WithDescription("number of requests to the update check handler where an update is available"),
)
if err != nil {
return nil, errors.Errorf("create request has update counter: %v", err)
}
errorCounter, err := meter.Int64Counter(
"pings.error_count",
metric.WithDescription("number of errors that occur while publishing server pings"),
)
if err != nil {
return nil, errors.Errorf("create request counter: %v", err)
}
meter := &updatecheck.Meter{
RequestCounter: requestCounter,
RequestHasUpdateCounter: requestHasUpdateCounter,
ErrorCounter: errorCounter,
}
r.Path("/updates").
Methods(http.MethodGet, http.MethodPost).
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
updatecheck.Handle(logger, pubsubClient, w, r)
updatecheck.Handle(logger, pubsubClient, meter, w, r)
})
return r, nil
}

View File

@ -0,0 +1,97 @@
package shared
import (
"context"
"time"
gcpmetricexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"github.com/sourcegraph/conc"
"github.com/sourcegraph/log"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"github.com/sourcegraph/sourcegraph/internal/tracer/oteldefaults/exporters"
"github.com/sourcegraph/sourcegraph/internal/version"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func initOpenTelemetry(ctx context.Context, logger log.Logger, config OpenTelemetryConfig) (func(), error) {
res, err := getOpenTelemetryResource(ctx)
if err != nil {
return nil, err
}
shutdownMetrics, err := maybeEnableMetrics(ctx,
logger.Scoped("metrics", "OpenTelemetry metrics"),
config, res)
if err != nil {
return nil, errors.Wrap(err, "maybeEnableMetrics")
}
return func() {
var wg conc.WaitGroup
wg.Go(shutdownMetrics)
wg.Wait()
}, nil
}
func getOpenTelemetryResource(ctx context.Context) (*resource.Resource, error) {
// Identify your application using resource detection
return resource.New(ctx,
// Use the GCP resource detector to detect information about the GCP platform
resource.WithDetectors(gcp.NewDetector()),
// Keep the default detectors
resource.WithTelemetrySDK(),
// Add your own custom attributes to identify your application
resource.WithAttributes(
semconv.ServiceNameKey.String("pings"),
semconv.ServiceVersionKey.String(version.Version()),
),
)
}
func maybeEnableMetrics(_ context.Context, logger log.Logger, config OpenTelemetryConfig, otelResource *resource.Resource) (func(), error) {
var reader sdkmetric.Reader
if config.GCPProjectID != "" {
logger.Info("initializing GCP trace exporter", log.String("projectID", config.GCPProjectID))
exporter, err := gcpmetricexporter.New(
gcpmetricexporter.WithProjectID(config.GCPProjectID))
if err != nil {
return nil, errors.Wrap(err, "gcpmetricexporter.New")
}
reader = sdkmetric.NewPeriodicReader(exporter,
sdkmetric.WithInterval(30*time.Second))
} else {
logger.Info("initializing Prometheus exporter")
var err error
reader, err = exporters.NewPrometheusExporter()
if err != nil {
return nil, errors.Wrap(err, "exporters.NewPrometheusExporter")
}
}
// Create and set global tracer
provider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(reader),
sdkmetric.WithResource(otelResource))
otel.SetMeterProvider(provider)
logger.Info("metrics configured")
return func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
start := time.Now()
logger.Info("Shutting down metrics")
if err := provider.ForceFlush(shutdownCtx); err != nil {
logger.Warn("error occurred force-flushing metrics", log.Error(err))
}
if err := provider.Shutdown(shutdownCtx); err != nil {
logger.Warn("error occurred shutting down metrics", log.Error(err))
}
logger.Info("metrics shut down", log.Duration("elapsed", time.Since(start)))
}, nil
}

View File

@ -36,8 +36,8 @@ go_library(
"@com_github_coreos_go_semver//semver",
"@com_github_gomodule_redigo//redis",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_golang//prometheus/promauto",
"@com_github_sourcegraph_log//:log",
"@io_opentelemetry_go_otel_metric//:metric",
],
)

View File

@ -14,8 +14,7 @@ import (
"time"
"github.com/coreos/go-semver/semver"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/metric"
"github.com/sourcegraph/log"
@ -87,10 +86,16 @@ func ForwardHandler() (http.HandlerFunc, error) {
}, nil
}
type Meter struct {
RequestCounter metric.Int64Counter
RequestHasUpdateCounter metric.Int64Counter
ErrorCounter metric.Int64Counter
}
// Handle handles the ping requests and responds with information about software
// updates for Sourcegraph.
func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, w http.ResponseWriter, r *http.Request) {
requestCounter.Inc()
func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, meter *Meter, w http.ResponseWriter, r *http.Request) {
meter.RequestCounter.Add(r.Context(), 1)
pr, err := readPingRequest(r)
if err != nil {
@ -119,7 +124,7 @@ func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, w http.ResponseW
hasUpdate, err := canUpdate(pr.ClientVersionString, pingResponse, pr.DeployType)
// Always log, even on malformed version strings
logPing(logger, pubsubClient, r, pr, hasUpdate)
logPing(logger, pubsubClient, meter, r, pr, hasUpdate)
if err != nil {
http.Error(w, pr.ClientVersionString+" is a bad version string: "+err.Error(), http.StatusBadRequest)
@ -140,7 +145,7 @@ func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, w http.ResponseW
// the user's instance may have unseen notification messages.
if deploy.IsDeployTypeApp(pr.DeployType) {
if hasUpdate {
requestHasUpdateCounter.Inc()
meter.RequestHasUpdateCounter.Add(r.Context(), 1)
}
w.Header().Set("content-type", "application/json; charset=utf-8")
_, _ = w.Write(body)
@ -153,7 +158,7 @@ func Handle(logger log.Logger, pubsubClient pubsub.TopicClient, w http.ResponseW
return
}
w.Header().Set("content-type", "application/json; charset=utf-8")
requestHasUpdateCounter.Inc()
meter.RequestHasUpdateCounter.Add(r.Context(), 1)
_, _ = w.Write(body)
}
@ -387,12 +392,12 @@ type pingPayload struct {
RepoMetadataUsage json.RawMessage `json:"repo_metadata_usage"`
}
func logPing(logger log.Logger, pubsubClient pubsub.TopicClient, r *http.Request, pr *pingRequest, hasUpdate bool) {
func logPing(logger log.Logger, pubsubClient pubsub.TopicClient, meter *Meter, r *http.Request, pr *pingRequest, hasUpdate bool) {
logger = logger.Scoped("logPing", "logs ping requests")
defer func() {
if r := recover(); r != nil {
logger.Warn("panic", log.String("recover", fmt.Sprintf("%+v", r)))
errorCounter.Inc()
if err := recover(); err != nil {
logger.Warn("panic", log.String("recover", fmt.Sprintf("%+v", err)))
meter.ErrorCounter.Add(r.Context(), 1)
}
}()
@ -414,14 +419,14 @@ func logPing(logger log.Logger, pubsubClient pubsub.TopicClient, r *http.Request
message, err := marshalPing(pr, hasUpdate, clientAddr, time.Now())
if err != nil {
errorCounter.Inc()
meter.ErrorCounter.Add(r.Context(), 1)
logger.Error("failed to marshal payload", log.Error(err))
return
}
err = pubsubClient.Publish(context.Background(), message)
if err != nil {
errorCounter.Inc()
meter.ErrorCounter.Add(r.Context(), 1)
logger.Error("failed to publish", log.String("message", string(message)), log.Error(err))
return
}
@ -782,18 +787,3 @@ func reserializeCodyUsage(payload json.RawMessage) (json.RawMessage, error) {
return json.Marshal(singlePeriodUsage)
}
var (
requestCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "src_updatecheck_server_requests",
Help: "Number of requests to the update check handler.",
})
requestHasUpdateCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "src_updatecheck_server_requests_has_update",
Help: "Number of requests to the update check handler where an update is available.",
})
errorCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "src_updatecheck_server_errors",
Help: "Number of errors that occur while publishing server pings.",
})
)

View File

@ -355,6 +355,8 @@ commands:
PINGS_PUBSUB_PROJECT_ID: 'telligentsourcegraph'
PINGS_PUBSUB_TOPIC_ID: 'server-update-checks-test'
HUBSPOT_ACCESS_TOKEN: ''
# Enables metrics in dev via debugserver
SRC_PROF_HTTP: "127.0.0.1:7011"
watch:
- lib
- internal