tracing: set worker jobs trace sample rate probability to 2 by default (configurable) (#42901)

This commit is contained in:
Noah S-C 2022-10-13 14:25:54 +01:00 committed by GitHub
parent a53a4ecb68
commit 740a5701e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 55 additions and 34 deletions

View File

@ -180,14 +180,14 @@ func endpointOptions(c *config.Config, pathPrefix string) apiclient.EndpointOpti
}
}
func makeWorkerMetrics(queueName string) workerutil.WorkerMetrics {
func makeWorkerMetrics(queueName string) workerutil.WorkerObservability {
observationContext := &observation.Context{
Logger: log.Scoped("executor_processor", "executor worker processor"),
Tracer: &trace.Tracer{TracerProvider: otel.GetTracerProvider()},
Registerer: prometheus.DefaultRegisterer,
}
return workerutil.NewMetrics(observationContext, "executor_processor",
return workerutil.NewMetrics(observationContext, "executor_processor", workerutil.WithSampler(func(job workerutil.Record) bool { return true }),
// derived from historic data, ideally we will use spare high-res histograms once they're a reality
// 30s 1m 2.5m 5m 7.5m 10m 15m 20m 30m 45m 1hr
workerutil.WithDurationBuckets([]float64{30, 60, 150, 300, 450, 600, 900, 1200, 1800, 2700, 3600}),

View File

@ -403,7 +403,7 @@ func createBitbucketProjectPermissionsStore(logger log.Logger, s basestore.Share
// These are the metrics that are used by the worker and resetter.
// They are required by the workerutil package for automatic metrics collection.
type bitbucketProjectPermissionsMetrics struct {
workerMetrics workerutil.WorkerMetrics
workerMetrics workerutil.WorkerObservability
resets prometheus.Counter
resetFailures prometheus.Counter
errors prometheus.Counter

View File

@ -12,7 +12,7 @@ import (
)
type codeMonitorsMetrics struct {
workerMetrics workerutil.WorkerMetrics
workerMetrics workerutil.WorkerObservability
resets prometheus.Counter
resetFailures prometheus.Counter
errors prometheus.Counter

View File

@ -121,7 +121,7 @@ func GetBackgroundQueryRunnerJob(ctx context.Context, logger log.Logger, mainApp
//
// Individual insights workers may then _also_ want to register their own metrics, if desired, in
// their NewWorker functions.
func newWorkerMetrics(observationContext *observation.Context, workerName string) (workerutil.WorkerMetrics, dbworker.ResetterMetrics) {
func newWorkerMetrics(observationContext *observation.Context, workerName string) (workerutil.WorkerObservability, dbworker.ResetterMetrics) {
workerMetrics := workerutil.NewMetrics(observationContext, workerName+"_processor")
resetterMetrics := dbworker.NewMetrics(observationContext, workerName)
return workerMetrics, *resetterMetrics

View File

@ -37,7 +37,7 @@ import (
// NewWorker returns a worker that will execute search queries and insert information about the
// results into the code insights database.
func NewWorker(ctx context.Context, logger log.Logger, workerStore dbworkerstore.Store, insightsStore *store.Store, repoStore discovery.RepoStore, metrics workerutil.WorkerMetrics) *workerutil.Worker {
func NewWorker(ctx context.Context, logger log.Logger, workerStore dbworkerstore.Store, insightsStore *store.Store, repoStore discovery.RepoStore, metrics workerutil.WorkerObservability) *workerutil.Worker {
numHandlers := conf.Get().InsightsQueryWorkerConcurrency
if numHandlers <= 0 {
// Default concurrency is set to 5.
@ -235,7 +235,6 @@ func PurgeJobsForSeries(ctx context.Context, workerBaseStore *basestore.Store, s
err = tx.Exec(ctx, sqlf.Sprintf(purgeJobsForSeriesFmtStr, seriesID))
return err
}
const purgeJobsForSeriesFmtStr = `
@ -333,6 +332,7 @@ func QueryAllSeriesStatus(ctx context.Context, workerBaseStore *basestore.Store)
query, err := workerBaseStore.Query(ctx, q)
return scanAllSeriesStatusRows(query, err)
}
func scanAllSeriesStatusRows(rows *sql.Rows, queryErr error) (_ []types.InsightSeriesStatus, err error) {
if queryErr != nil {
return nil, queryErr

View File

@ -16,6 +16,7 @@ import (
"github.com/lib/pq"
log "github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"

View File

@ -60,8 +60,8 @@ type backgroundJob struct {
metrics *resetterMetrics
janitorMetrics *janitorMetrics
depencencySyncMetrics workerutil.WorkerMetrics
depencencyIndexMetrics workerutil.WorkerMetrics
depencencySyncMetrics workerutil.WorkerObservability
depencencyIndexMetrics workerutil.WorkerObservability
}
func New(

View File

@ -34,7 +34,7 @@ type Service struct {
expirationMetrics *expirationMetrics
resetterMetrics *resetterMetrics
janitorMetrics *janitorMetrics
workerMetrics workerutil.WorkerMetrics
workerMetrics workerutil.WorkerObservability
policyMatcher PolicyMatcher
locker Locker
logger logger.Logger
@ -67,7 +67,7 @@ func newService(
expirationMetrics: newExpirationMetrics(observationContext),
resetterMetrics: newResetterMetrics(observationContext),
janitorMetrics: newJanitorMetrics(observationContext),
workerMetrics: workerutil.NewMetrics(observationContext, "codeintel_upload_processor"),
workerMetrics: workerutil.NewMetrics(observationContext, "codeintel_upload_processor", workerutil.WithSampler(func(job workerutil.Record) bool { return true })),
policyMatcher: policyMatcher,
locker: locker,
logger: observationContext.Logger,

View File

@ -88,7 +88,7 @@ func NewSyncWorker(ctx context.Context, logger log.Logger, dbHandle basestore.Tr
return worker, resetter
}
func newWorkerMetrics(r prometheus.Registerer) workerutil.WorkerMetrics {
func newWorkerMetrics(r prometheus.Registerer) workerutil.WorkerObservability {
var observationContext *observation.Context
if r == nil {

View File

@ -1280,7 +1280,6 @@ func testOrphanedRepo(store repos.Store) func(*testing.T) {
// Sync first service
syncer := &repos.Syncer{
Logger: logtest.Scoped(t),
Sourcer: func(ctx context.Context, service *types.ExternalService) (repos.Source, error) {
s := repos.NewFakeSource(svc1, nil, githubRepo)
@ -2415,7 +2414,8 @@ func testSyncReposWithLastErrorsHitsRateLimiter(s repos.Store) func(*testing.T)
}
func setupSyncErroredTest(ctx context.Context, s repos.Store, t *testing.T,
serviceType string, externalSvcError error, config, serviceID string, repoNames ...api.RepoName) (*repos.Syncer, types.Repos) {
serviceType string, externalSvcError error, config, serviceID string, repoNames ...api.RepoName,
) (*repos.Syncer, types.Repos) {
t.Helper()
now := time.Now()
dbRepos := types.Repos{}

View File

@ -23,8 +23,7 @@ import (
// webhookBuildJob implements the Job interface
// from package job
type webhookBuildJob struct {
}
type webhookBuildJob struct{}
func NewWebhookBuildJob() *webhookBuildJob {
return &webhookBuildJob{}
@ -69,7 +68,7 @@ func (w *webhookBuildJob) Routines(_ context.Context, logger log.Logger) ([]goro
}, nil
}
func newWebhookBuildWorkerMetrics(observationContext *observation.Context, workerName string) (workerutil.WorkerMetrics, dbworker.ResetterMetrics) {
func newWebhookBuildWorkerMetrics(observationContext *observation.Context, workerName string) (workerutil.WorkerObservability, dbworker.ResetterMetrics) {
workerMetrics := workerutil.NewMetrics(observationContext, fmt.Sprintf("%s_processor", workerName))
resetterMetrics := dbworker.NewMetrics(observationContext, workerName)
return workerMetrics, *resetterMetrics

View File

@ -16,7 +16,7 @@ import (
workerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
func NewWorker(ctx context.Context, handler workerutil.Handler, workerStore workerstore.Store, metrics workerutil.WorkerMetrics) *workerutil.Worker {
func NewWorker(ctx context.Context, handler workerutil.Handler, workerStore workerstore.Store, metrics workerutil.WorkerObservability) *workerutil.Worker {
options := workerutil.WorkerOptions{
Name: "webhook_build_worker",
NumHandlers: 3,

View File

@ -2,6 +2,7 @@ package workerutil
import (
"fmt"
"math/rand"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -12,11 +13,15 @@ import (
"github.com/sourcegraph/sourcegraph/internal/observation"
)
type WorkerMetrics struct {
type WorkerObservability struct {
// logger is the root logger provided for observability. Prefer to use a more granular
// logger provided by operations where relevant.
logger log.Logger
// temporary solution to have configurable trace ahead-of-time sample for worker jobs
// to avoid swamping sinks with traces.
traceSampler func(job Record) bool
operations *operations
numJobs Gauge
}
@ -32,19 +37,26 @@ type operations struct {
preHandle *observation.Operation
}
type metricOptions struct {
type observabilityOptions struct {
labels map[string]string
durationBuckets []float64
// temporary solution to have configurable trace ahead-of-time sample for worker jobs
// to avoid swamping sinks with traces.
traceSampler func(job Record) bool
}
type MetricOption func(o *metricOptions)
type ObservabilityOption func(o *observabilityOptions)
func WithLabels(labels map[string]string) MetricOption {
return func(o *metricOptions) { o.labels = labels }
func WithSampler(fn func(job Record) bool) func(*observabilityOptions) {
return func(o *observabilityOptions) { o.traceSampler = fn }
}
func WithDurationBuckets(buckets []float64) MetricOption {
return func(o *metricOptions) { o.durationBuckets = buckets }
func WithLabels(labels map[string]string) ObservabilityOption {
return func(o *observabilityOptions) { o.labels = labels }
}
func WithDurationBuckets(buckets []float64) ObservabilityOption {
return func(o *observabilityOptions) { o.durationBuckets = buckets }
}
// NewMetrics creates and registers the following metrics for a generic worker instance.
@ -54,10 +66,14 @@ func WithDurationBuckets(buckets []float64) MetricOption {
// - {prefix}_error_total: number of handler operations resulting in an error
// - {prefix}_handlers: the number of active handler routines
//
// The given labels are emitted on each metric.
func NewMetrics(observationContext *observation.Context, prefix string, opts ...MetricOption) WorkerMetrics {
options := &metricOptions{
// The given labels are emitted on each metric. If WithSampler option is not passed,
// traces will have a 1 in 2 probability of being sampled.
func NewMetrics(observationContext *observation.Context, prefix string, opts ...ObservabilityOption) WorkerObservability {
options := &observabilityOptions{
durationBuckets: prometheus.DefBuckets,
traceSampler: func(job Record) bool {
return rand.Int31()%2 == 0
},
}
for _, fn := range opts {
@ -86,10 +102,11 @@ func NewMetrics(observationContext *observation.Context, prefix string, opts ...
"The number of active handlers.",
)
return WorkerMetrics{
logger: observationContext.Logger,
operations: newOperations(observationContext, prefix, keys, values, options.durationBuckets),
numJobs: newLenientConcurrencyGauge(numJobs, time.Second*5),
return WorkerObservability{
logger: observationContext.Logger,
traceSampler: options.traceSampler,
operations: newOperations(observationContext, prefix, keys, values, options.durationBuckets),
numJobs: newLenientConcurrencyGauge(numJobs, time.Second*5),
}
}

View File

@ -86,7 +86,7 @@ type WorkerOptions struct {
MaximumRuntimePerJob time.Duration
// Metrics configures logging, tracing, and metrics for the work loop.
Metrics WorkerMetrics
Metrics WorkerObservability
}
func NewWorker(ctx context.Context, store Store, handler Handler, options WorkerOptions) *Worker {
@ -312,7 +312,11 @@ func (w *Worker) dequeueAndHandle() (dequeued bool, err error) {
}
// Create context and span based on the root context
workerSpan, workerCtxWithSpan := ot.StartSpanFromContext(policy.WithShouldTrace(w.rootCtx, true), w.options.Name)
workerSpan, workerCtxWithSpan := ot.StartSpanFromContext(
// TODO tail-based sampling once its a thing, until then, we can configure on a per-job basis
policy.WithShouldTrace(w.rootCtx, w.options.Metrics.traceSampler(record)),
w.options.Name,
)
handleCtx, cancel := context.WithCancel(workerCtxWithSpan)
processLog := trace.Logger(workerCtxWithSpan, w.options.Metrics.logger)