chore/msp/runtime: add args as trace attributes, clean up tracing code (#63087)

This is a simplified version of what we have for in-Sourcegraph db
connections as well. They're not directly compatible because we use
`pgx` hooks.

## Test plan

n/a
This commit is contained in:
Robert Lin 2024-06-04 17:30:32 -07:00 committed by GitHub
parent f2590cbb36
commit 2630d5fc56
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 147 additions and 65 deletions

View File

@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "cloudsql",
srcs = ["cloudsql.go"],
srcs = [
"cloudsql.go",
"tracer.go",
],
importpath = "github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/cloudsql",
visibility = ["//visibility:public"],
deps = [

View File

@ -7,76 +7,12 @@ import (
"net"
"cloud.google.com/go/cloudsqlconn"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
var tracer = otel.GetTracerProvider().Tracer("msp/cloudsql/pgx")
type pgxTracer struct{}
// Select tracing hooks we want to implement.
var _ pgx.QueryTracer = pgxTracer{}
var _ pgx.ConnectTracer = pgxTracer{}
// TraceQueryStart is called at the beginning of Query, QueryRow, and Exec calls. The returned context is used for the
// rest of the call and will be passed to TraceQueryEnd.
func (pgxTracer) TraceQueryStart(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
ctx, _ = tracer.Start(ctx, "pgx.Query", trace.WithAttributes(
attribute.String("query", data.SQL),
attribute.Int("args.len", len(data.Args)),
))
return ctx
}
func (pgxTracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
span := trace.SpanFromContext(ctx)
defer span.End()
span.SetAttributes(
attribute.String("command_tag", data.CommandTag.String()),
attribute.Int64("rows_affected", data.CommandTag.RowsAffected()),
)
switch {
case data.CommandTag.Insert():
span.SetName("pgx.Query: INSERT")
case data.CommandTag.Update():
span.SetName("pgx.Query: UPDATE")
case data.CommandTag.Delete():
span.SetName("pgx.Query: DELETE")
case data.CommandTag.Select():
span.SetName("pgx.Query: SELECT")
}
if data.Err != nil {
span.SetStatus(codes.Error, data.Err.Error())
}
}
func (pgxTracer) TraceConnectStart(ctx context.Context, data pgx.TraceConnectStartData) context.Context {
ctx, _ = tracer.Start(ctx, "pgx.Connect", trace.WithAttributes(
attribute.String("database", data.ConnConfig.Database),
attribute.String("instance", fmt.Sprintf("%s:%d", data.ConnConfig.Host, data.ConnConfig.Port)),
attribute.String("user", data.ConnConfig.User)))
return ctx
}
func (pgxTracer) TraceConnectEnd(ctx context.Context, data pgx.TraceConnectEndData) {
span := trace.SpanFromContext(ctx)
defer span.End()
if data.Err != nil {
span.SetStatus(codes.Error, data.Err.Error())
}
}
type ConnConfig struct {
// ConnectionName is the CloudSQL connection name,
// e.g. '${project}:${region}:${instance}'

View File

@ -0,0 +1,143 @@
package cloudsql
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/jackc/pgx/v5"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
var tracer = otel.GetTracerProvider().Tracer("msp/cloudsql/pgx")
type pgxTracer struct{}
// Select tracing hooks we want to implement.
var (
_ pgx.QueryTracer = pgxTracer{}
_ pgx.ConnectTracer = pgxTracer{}
// Future:
// _ pgx.BatchTracer = pgxTracer{}
// _ pgx.CopyFromTracer = pgxTracer{}
// _ pgx.PrepareTracer = pgxTracer{}
)
// TraceQueryStart is called at the beginning of Query, QueryRow, and Exec calls. The returned context is used for the
// rest of the call and will be passed to TraceQueryEnd.
func (pgxTracer) TraceQueryStart(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
ctx, _ = tracer.Start(ctx, "pgx.Query",
trace.WithAttributes(
attribute.String("query", data.SQL),
attribute.Int("args.len", len(data.Args)),
),
trace.WithAttributes(argsAsAttributes(data.Args)...))
return ctx
}
func (pgxTracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
span := trace.SpanFromContext(ctx)
defer span.End()
span.SetAttributes(
attribute.String("command_tag", data.CommandTag.String()),
attribute.Int64("rows_affected", data.CommandTag.RowsAffected()),
)
switch {
case data.CommandTag.Insert():
span.SetName("pgx.Query: INSERT")
case data.CommandTag.Update():
span.SetName("pgx.Query: UPDATE")
case data.CommandTag.Delete():
span.SetName("pgx.Query: DELETE")
case data.CommandTag.Select():
span.SetName("pgx.Query: SELECT")
}
if data.Err != nil {
span.SetStatus(codes.Error, data.Err.Error())
}
}
func (pgxTracer) TraceConnectStart(ctx context.Context, data pgx.TraceConnectStartData) context.Context {
ctx, _ = tracer.Start(ctx, "pgx.Connect", trace.WithAttributes(
attribute.String("database", data.ConnConfig.Database),
attribute.String("instance", fmt.Sprintf("%s:%d", data.ConnConfig.Host, data.ConnConfig.Port)),
attribute.String("user", data.ConnConfig.User)))
return ctx
}
func (pgxTracer) TraceConnectEnd(ctx context.Context, data pgx.TraceConnectEndData) {
span := trace.SpanFromContext(ctx)
defer span.End()
if data.Err != nil {
span.SetStatus(codes.Error, data.Err.Error())
}
}
// Number of SQL arguments allowed to enable argument instrumentation
const argsAttributesCountLimit = 24
// Maximum size to use in heuristics of SQL arguments size in argument
// instrumentation before they are truncated. This is NOT a hard cap on bytes
// size of values.
const argsAttributesValueLimit = 240
func argsAsAttributes(args []any) []attribute.KeyValue {
if len(args) > argsAttributesCountLimit {
return []attribute.KeyValue{
attribute.String("db.args", "too many args"),
}
}
attrs := make([]attribute.KeyValue, len(args))
for i, arg := range args {
key := "db.args.$" + strconv.Itoa(i)
switch v := arg.(type) {
case nil:
attrs[i] = attribute.String(key, "nil")
case int:
attrs[i] = attribute.Int(key, v)
case int32:
attrs[i] = attribute.Int(key, int(v))
case int64:
attrs[i] = attribute.Int64(key, v)
case float32:
attrs[i] = attribute.Float64(key, float64(v))
case float64:
attrs[i] = attribute.Float64(key, v)
case bool:
attrs[i] = attribute.Bool(key, v)
case []byte:
attrs[i] = attribute.String(key, truncateStringValue(string(v)))
case string:
attrs[i] = attribute.String(key, truncateStringValue(v))
case time.Time:
attrs[i] = attribute.String(key, v.String())
default: // in case we miss anything
attrs[i] = attribute.String(key, fmt.Sprintf("unhandled type %T", v))
}
}
return attrs
}
// utf8Replace is the same value as used in other strings.ToValidUTF8 callsites
// in sourcegraph/sourcegraph.
const utf8Replace = "<22>"
// truncateStringValue should be used on all string attributes in the otelsql
// instrumentation. It ensures the length of v is within argsAttributesValueLimit,
// and ensures v is valid UTF8.
func truncateStringValue(v string) string {
if len(v) > argsAttributesValueLimit {
return strings.ToValidUTF8(v[:argsAttributesValueLimit], utf8Replace)
}
return strings.ToValidUTF8(v, utf8Replace)
}