mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 16:51:55 +00:00
This reverts commit 9185da3c3e.
Noticed there are some bad callers in worker and symbols that don't
properly return a connection. Will need to investigate and fix that
first.
## Test plan
Worked before, CI passes.
This commit is contained in:
parent
8f4a610c67
commit
9e724bc596
26
doc/admin/observability/dashboards.md
generated
26
doc/admin/observability/dashboards.md
generated
@ -6527,32 +6527,6 @@ src_gitserver_repo_count
|
||||
|
||||
<br />
|
||||
|
||||
#### gitserver: src_gitserver_client_concurrent_requests
|
||||
|
||||
<p class="subtitle">Number of concurrent requests running against gitserver client</p>
|
||||
|
||||
This metric is only for informational purposes. It indicates the current number of concurrently running requests by process against gitserver gRPC.
|
||||
|
||||
It does not indicate any problems with the instance, but can give a good indication of load spikes or request throttling.
|
||||
|
||||
This panel has no related alerts.
|
||||
|
||||
To see this panel, visit `/-/debug/grafana/d/gitserver/gitserver?viewPanel=100052` on your Sourcegraph instance.
|
||||
|
||||
<sub>*Managed by the [Sourcegraph Source team](https://handbook.sourcegraph.com/departments/engineering/teams/source).*</sub>
|
||||
|
||||
<details>
|
||||
<summary>Technical details</summary>
|
||||
|
||||
Query:
|
||||
|
||||
```
|
||||
sum by (job, instance) (src_gitserver_client_concurrent_requests)
|
||||
```
|
||||
</details>
|
||||
|
||||
<br />
|
||||
|
||||
### Git Server: Gitservice for internal cloning
|
||||
|
||||
#### gitserver: aggregate_gitservice_request_duration
|
||||
|
||||
@ -14,11 +14,8 @@ go_library(
|
||||
"//internal/api",
|
||||
"//internal/conf",
|
||||
"//internal/conf/conftypes",
|
||||
"//internal/env",
|
||||
"//internal/gitserver/protocol",
|
||||
"//internal/grpc/concurrencylimiter",
|
||||
"//internal/grpc/defaults",
|
||||
"//internal/limiter",
|
||||
"//lib/errors",
|
||||
"@com_github_prometheus_client_golang//prometheus",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto",
|
||||
|
||||
@ -17,11 +17,9 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
|
||||
"github.com/sourcegraph/sourcegraph/internal/env"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/gitserver/protocol"
|
||||
"github.com/sourcegraph/sourcegraph/internal/grpc/concurrencylimiter"
|
||||
"github.com/sourcegraph/sourcegraph/internal/grpc/defaults"
|
||||
"github.com/sourcegraph/sourcegraph/internal/limiter"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
)
|
||||
|
||||
@ -206,8 +204,6 @@ func (a *atomicGitServerConns) update(cfg *conf.Unified) {
|
||||
conn, err := defaults.Dial(
|
||||
addr,
|
||||
clientLogger,
|
||||
grpc.WithChainUnaryInterceptor(concurrencylimiter.UnaryClientInterceptor(lim)),
|
||||
grpc.WithChainStreamInterceptor(concurrencylimiter.StreamClientInterceptor(lim)),
|
||||
)
|
||||
after.grpcConns[addr] = connAndErr{conn: conn, err: err}
|
||||
}
|
||||
@ -222,28 +218,4 @@ func (a *atomicGitServerConns) update(cfg *conf.Unified) {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
concurrencyLimit = env.MustGetInt("SRC_GITSERVER_CLIENT_CONCURRENCY_LIMIT", 500, "maximum number of concurrent gitserver RPC calls")
|
||||
lim = &observedLimiter{Limiter: limiter.New(concurrencyLimit)}
|
||||
)
|
||||
|
||||
var concurrentRequestsGauge = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "src_gitserver_client_concurrent_requests",
|
||||
Help: "Current number of concurrent requests running against gitserver client.",
|
||||
})
|
||||
|
||||
type observedLimiter struct {
|
||||
limiter.Limiter
|
||||
}
|
||||
|
||||
func (l *observedLimiter) Acquire() {
|
||||
l.Limiter.Acquire()
|
||||
concurrentRequestsGauge.Inc()
|
||||
}
|
||||
|
||||
func (l *observedLimiter) Release() {
|
||||
l.Limiter.Release()
|
||||
concurrentRequestsGauge.Dec()
|
||||
}
|
||||
|
||||
var _ AddressWithConn = &connAndErr{}
|
||||
|
||||
@ -1,21 +0,0 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
load("//dev:go_defs.bzl", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "concurrencylimiter",
|
||||
srcs = ["limiter.go"],
|
||||
importpath = "github.com/sourcegraph/sourcegraph/internal/grpc/concurrencylimiter",
|
||||
visibility = ["//:__subpackages__"],
|
||||
deps = ["@org_golang_google_grpc//:go_default_library"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "concurrencylimiter_test",
|
||||
srcs = ["limiter_test.go"],
|
||||
embed = [":concurrencylimiter"],
|
||||
deps = [
|
||||
"//internal/limiter",
|
||||
"@com_github_stretchr_testify//require",
|
||||
"@org_golang_google_grpc//:go_default_library",
|
||||
],
|
||||
)
|
||||
@ -1,64 +0,0 @@
|
||||
// package concurrencylimiter provides a concurrency limiter for use with grpc.
|
||||
// The limiter is used to limit the number of concurrent calls to a grpc server.
|
||||
package concurrencylimiter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Limiter is a concurrency limiter. Acquire() blocks if the limit has been reached.
|
||||
type Limiter interface {
|
||||
Acquire()
|
||||
Release()
|
||||
}
|
||||
|
||||
// UnaryClientInterceptor returns a UnaryClientInterceptor that limits the number
|
||||
// of concurrent calls with the given limiter.
|
||||
func UnaryClientInterceptor(limiter Limiter) func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
limiter.Acquire()
|
||||
defer limiter.Release()
|
||||
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// StreamClientInterceptor returns a StreamClientInterceptor that limits the number
|
||||
// of concurrent calls with the given limiter.
|
||||
func StreamClientInterceptor(limiter Limiter) func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
limiter.Acquire()
|
||||
|
||||
cs, err := streamer(ctx, desc, cc, method, opts...)
|
||||
if err != nil {
|
||||
limiter.Release()
|
||||
return cs, err
|
||||
}
|
||||
|
||||
return &limitedClientStream{ClientStream: cs, release: limiter.Release}, nil
|
||||
}
|
||||
}
|
||||
|
||||
type limitedClientStream struct {
|
||||
grpc.ClientStream
|
||||
|
||||
release func()
|
||||
}
|
||||
|
||||
func (s *limitedClientStream) SendMsg(m any) error {
|
||||
err := s.ClientStream.SendMsg(m)
|
||||
if err != nil {
|
||||
s.release()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *limitedClientStream) RecvMsg(m any) error {
|
||||
err := s.ClientStream.RecvMsg(m)
|
||||
if err != nil {
|
||||
s.release()
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -1,228 +0,0 @@
|
||||
package concurrencylimiter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/limiter"
|
||||
)
|
||||
|
||||
func TestUnaryClientInterceptor(t *testing.T) {
|
||||
t.Run("acquire and release limiter", func(t *testing.T) {
|
||||
limiter := &mockLimiter{}
|
||||
in := UnaryClientInterceptor(limiter)
|
||||
|
||||
invoker := func(_ context.Context, _ string, _, _ any, _ *grpc.ClientConn, _ ...grpc.CallOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
_ = in(context.Background(), "Test", struct{}{}, struct{}{}, &grpc.ClientConn{}, invoker)
|
||||
|
||||
if limiter.acquireCount != 1 {
|
||||
t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount)
|
||||
}
|
||||
if limiter.releaseCount != 1 {
|
||||
t.Errorf("expected release count to be 1, got %d", limiter.releaseCount)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("invoker error propagated", func(t *testing.T) {
|
||||
limiter := &mockLimiter{}
|
||||
in := UnaryClientInterceptor(limiter)
|
||||
|
||||
expectedErr := errors.New("invoker error")
|
||||
invoker := func(_ context.Context, _ string, _, _ any, _ *grpc.ClientConn, _ ...grpc.CallOption) error {
|
||||
return expectedErr
|
||||
}
|
||||
|
||||
err := in(context.Background(), "Test", struct{}{}, struct{}{}, &grpc.ClientConn{}, invoker)
|
||||
|
||||
if err != expectedErr {
|
||||
t.Errorf("expected error %v, got %v", expectedErr, err)
|
||||
}
|
||||
if limiter.acquireCount != 1 {
|
||||
t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount)
|
||||
}
|
||||
if limiter.releaseCount != 1 {
|
||||
t.Errorf("expected release count to be 1, got %d", limiter.releaseCount)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("maximum concurrency honored", func(t *testing.T) {
|
||||
limiter := limiter.New(1)
|
||||
in := UnaryClientInterceptor(limiter)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
count := 3000
|
||||
wg.Add(count)
|
||||
|
||||
var concurrency atomic.Int32
|
||||
var notHonored atomic.Bool
|
||||
|
||||
invoker := func(_ context.Context, _ string, _, _ any, _ *grpc.ClientConn, _ ...grpc.CallOption) error {
|
||||
defer concurrency.Add(-1)
|
||||
|
||||
if !concurrency.CompareAndSwap(0, 1) {
|
||||
notHonored.Store(true)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
for range count {
|
||||
go func() {
|
||||
_ = in(context.Background(), "Test", struct{}{}, struct{}{}, &grpc.ClientConn{}, invoker)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if notHonored.Load() {
|
||||
t.Fatal("concurrency limit not honored")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamClientInterceptor(t *testing.T) {
|
||||
t.Run("acquire and release limiter", func(t *testing.T) {
|
||||
limiter := &mockLimiter{}
|
||||
in := StreamClientInterceptor(limiter)
|
||||
|
||||
streamer := func(_ context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, _ string, _ ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return &mockClientStream{err: io.EOF}, nil
|
||||
}
|
||||
|
||||
cc, _ := in(context.Background(), &grpc.StreamDesc{}, &grpc.ClientConn{}, "Test", streamer)
|
||||
|
||||
require.NoError(t, cc.SendMsg(nil))
|
||||
require.NoError(t, cc.CloseSend())
|
||||
require.Equal(t, io.EOF, cc.RecvMsg(nil))
|
||||
|
||||
if limiter.acquireCount != 1 {
|
||||
t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount)
|
||||
}
|
||||
if limiter.releaseCount != 1 {
|
||||
t.Errorf("expected release count to be 1, got %d", limiter.releaseCount)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("streamer error propagated", func(t *testing.T) {
|
||||
limiter := &mockLimiter{}
|
||||
in := StreamClientInterceptor(limiter)
|
||||
|
||||
expectedErr := errors.New("streamer error")
|
||||
streamer := func(_ context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, _ string, _ ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return nil, expectedErr
|
||||
}
|
||||
|
||||
_, err := in(context.Background(), &grpc.StreamDesc{}, &grpc.ClientConn{}, "Test", streamer)
|
||||
|
||||
if err != expectedErr {
|
||||
t.Errorf("expected error %v, got %v", expectedErr, err)
|
||||
}
|
||||
if limiter.acquireCount != 1 {
|
||||
t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount)
|
||||
}
|
||||
if limiter.releaseCount != 1 {
|
||||
t.Errorf("expected release count to be 1, got %d", limiter.releaseCount)
|
||||
}
|
||||
|
||||
limiter = &mockLimiter{}
|
||||
in = StreamClientInterceptor(limiter)
|
||||
streamer = func(_ context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, _ string, _ ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return &mockClientStream{err: expectedErr}, nil
|
||||
}
|
||||
cc, err := in(context.Background(), &grpc.StreamDesc{}, &grpc.ClientConn{}, "Test", streamer)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, cc.SendMsg(nil))
|
||||
require.NoError(t, cc.CloseSend())
|
||||
require.Equal(t, expectedErr, cc.RecvMsg(nil))
|
||||
if limiter.acquireCount != 1 {
|
||||
t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount)
|
||||
}
|
||||
if limiter.releaseCount != 1 {
|
||||
t.Errorf("expected release count to be 1, got %d", limiter.releaseCount)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("maximum concurrency honored", func(t *testing.T) {
|
||||
limiter := limiter.New(1)
|
||||
in := StreamClientInterceptor(limiter)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
count := 3000
|
||||
wg.Add(count)
|
||||
|
||||
var concurrency atomic.Int32
|
||||
var notHonored atomic.Bool
|
||||
|
||||
streamer := func(_ context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, _ string, _ ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
if !concurrency.CompareAndSwap(0, 1) {
|
||||
notHonored.Store(true)
|
||||
}
|
||||
|
||||
return &limitedClientStream{
|
||||
release: func() {
|
||||
concurrency.Add(-1)
|
||||
},
|
||||
ClientStream: &mockClientStream{err: io.EOF},
|
||||
}, nil
|
||||
}
|
||||
|
||||
for range count {
|
||||
go func() {
|
||||
cc, _ := in(context.Background(), &grpc.StreamDesc{}, &grpc.ClientConn{}, "Test", streamer)
|
||||
require.NoError(t, cc.SendMsg(nil))
|
||||
require.NoError(t, cc.CloseSend())
|
||||
require.Equal(t, io.EOF, cc.RecvMsg(nil))
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if notHonored.Load() {
|
||||
t.Fatal("concurrency limit not honored")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type mockLimiter struct {
|
||||
acquireCount int
|
||||
releaseCount int
|
||||
}
|
||||
|
||||
func (m *mockLimiter) Acquire() {
|
||||
m.acquireCount++
|
||||
}
|
||||
|
||||
func (m *mockLimiter) Release() {
|
||||
m.releaseCount++
|
||||
}
|
||||
|
||||
type mockClientStream struct {
|
||||
grpc.ClientStream
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *mockClientStream) CloseSend() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockClientStream) SendMsg(x interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockClientStream) RecvMsg(x interface{}) error {
|
||||
return m.err
|
||||
}
|
||||
@ -333,19 +333,6 @@ func GitServer() *monitoring.Dashboard {
|
||||
It does not indicate any problems with the instance.
|
||||
`,
|
||||
},
|
||||
{
|
||||
Name: "src_gitserver_client_concurrent_requests",
|
||||
Description: "number of concurrent requests running against gitserver client",
|
||||
Query: "sum by (job, instance) (src_gitserver_client_concurrent_requests)",
|
||||
NoAlert: true,
|
||||
Panel: monitoring.Panel().LegendFormat("{{job}} {{instance}}"),
|
||||
Owner: monitoring.ObservableOwnerSource,
|
||||
Interpretation: `
|
||||
This metric is only for informational purposes. It indicates the current number of concurrently running requests by process against gitserver gRPC.
|
||||
|
||||
It does not indicate any problems with the instance, but can give a good indication of load spikes or request throttling.
|
||||
`,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
Loading…
Reference in New Issue
Block a user