diff --git a/doc/admin/observability/dashboards.md b/doc/admin/observability/dashboards.md index 80e0fcf74fe..695850f704a 100644 --- a/doc/admin/observability/dashboards.md +++ b/doc/admin/observability/dashboards.md @@ -6527,32 +6527,6 @@ src_gitserver_repo_count
-#### gitserver: src_gitserver_client_concurrent_requests - -

Number of concurrent requests running against gitserver client

- -This metric is only for informational purposes. It indicates the current number of concurrently running requests by process against gitserver gRPC. - -It does not indicate any problems with the instance, but can give a good indication of load spikes or request throttling. - -This panel has no related alerts. - -To see this panel, visit `/-/debug/grafana/d/gitserver/gitserver?viewPanel=100052` on your Sourcegraph instance. - -*Managed by the [Sourcegraph Source team](https://handbook.sourcegraph.com/departments/engineering/teams/source).* - -
-Technical details - -Query: - -``` -sum by (job, instance) (src_gitserver_client_concurrent_requests) -``` -
- -
- ### Git Server: Gitservice for internal cloning #### gitserver: aggregate_gitservice_request_duration diff --git a/internal/gitserver/connection/BUILD.bazel b/internal/gitserver/connection/BUILD.bazel index 9a077e04977..48a8891a1b4 100644 --- a/internal/gitserver/connection/BUILD.bazel +++ b/internal/gitserver/connection/BUILD.bazel @@ -14,11 +14,8 @@ go_library( "//internal/api", "//internal/conf", "//internal/conf/conftypes", - "//internal/env", "//internal/gitserver/protocol", - "//internal/grpc/concurrencylimiter", "//internal/grpc/defaults", - "//internal/limiter", "//lib/errors", "@com_github_prometheus_client_golang//prometheus", "@com_github_prometheus_client_golang//prometheus/promauto", diff --git a/internal/gitserver/connection/addrs.go b/internal/gitserver/connection/addrs.go index a2f4e0b8be0..439dffeccc0 100644 --- a/internal/gitserver/connection/addrs.go +++ b/internal/gitserver/connection/addrs.go @@ -17,11 +17,9 @@ import ( "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/conf/conftypes" - "github.com/sourcegraph/sourcegraph/internal/env" + "github.com/sourcegraph/sourcegraph/internal/gitserver/protocol" - "github.com/sourcegraph/sourcegraph/internal/grpc/concurrencylimiter" "github.com/sourcegraph/sourcegraph/internal/grpc/defaults" - "github.com/sourcegraph/sourcegraph/internal/limiter" "github.com/sourcegraph/sourcegraph/lib/errors" ) @@ -206,8 +204,6 @@ func (a *atomicGitServerConns) update(cfg *conf.Unified) { conn, err := defaults.Dial( addr, clientLogger, - grpc.WithChainUnaryInterceptor(concurrencylimiter.UnaryClientInterceptor(lim)), - grpc.WithChainStreamInterceptor(concurrencylimiter.StreamClientInterceptor(lim)), ) after.grpcConns[addr] = connAndErr{conn: conn, err: err} } @@ -222,28 +218,4 @@ func (a *atomicGitServerConns) update(cfg *conf.Unified) { } } -var ( - concurrencyLimit = env.MustGetInt("SRC_GITSERVER_CLIENT_CONCURRENCY_LIMIT", 500, "maximum number of concurrent gitserver RPC calls") - lim = &observedLimiter{Limiter: limiter.New(concurrencyLimit)} -) - -var concurrentRequestsGauge = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "src_gitserver_client_concurrent_requests", - Help: "Current number of concurrent requests running against gitserver client.", -}) - -type observedLimiter struct { - limiter.Limiter -} - -func (l *observedLimiter) Acquire() { - l.Limiter.Acquire() - concurrentRequestsGauge.Inc() -} - -func (l *observedLimiter) Release() { - l.Limiter.Release() - concurrentRequestsGauge.Dec() -} - var _ AddressWithConn = &connAndErr{} diff --git a/internal/grpc/concurrencylimiter/BUILD.bazel b/internal/grpc/concurrencylimiter/BUILD.bazel deleted file mode 100644 index 473df21e7d4..00000000000 --- a/internal/grpc/concurrencylimiter/BUILD.bazel +++ /dev/null @@ -1,21 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") -load("//dev:go_defs.bzl", "go_test") - -go_library( - name = "concurrencylimiter", - srcs = ["limiter.go"], - importpath = "github.com/sourcegraph/sourcegraph/internal/grpc/concurrencylimiter", - visibility = ["//:__subpackages__"], - deps = ["@org_golang_google_grpc//:go_default_library"], -) - -go_test( - name = "concurrencylimiter_test", - srcs = ["limiter_test.go"], - embed = [":concurrencylimiter"], - deps = [ - "//internal/limiter", - "@com_github_stretchr_testify//require", - "@org_golang_google_grpc//:go_default_library", - ], -) diff --git a/internal/grpc/concurrencylimiter/limiter.go b/internal/grpc/concurrencylimiter/limiter.go deleted file mode 100644 index 96c26a97a9c..00000000000 --- a/internal/grpc/concurrencylimiter/limiter.go +++ /dev/null @@ -1,64 +0,0 @@ -// package concurrencylimiter provides a concurrency limiter for use with grpc. -// The limiter is used to limit the number of concurrent calls to a grpc server. -package concurrencylimiter - -import ( - "context" - - "google.golang.org/grpc" -) - -// Limiter is a concurrency limiter. Acquire() blocks if the limit has been reached. -type Limiter interface { - Acquire() - Release() -} - -// UnaryClientInterceptor returns a UnaryClientInterceptor that limits the number -// of concurrent calls with the given limiter. -func UnaryClientInterceptor(limiter Limiter) func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - limiter.Acquire() - defer limiter.Release() - - return invoker(ctx, method, req, reply, cc, opts...) - } -} - -// StreamClientInterceptor returns a StreamClientInterceptor that limits the number -// of concurrent calls with the given limiter. -func StreamClientInterceptor(limiter Limiter) func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - limiter.Acquire() - - cs, err := streamer(ctx, desc, cc, method, opts...) - if err != nil { - limiter.Release() - return cs, err - } - - return &limitedClientStream{ClientStream: cs, release: limiter.Release}, nil - } -} - -type limitedClientStream struct { - grpc.ClientStream - - release func() -} - -func (s *limitedClientStream) SendMsg(m any) error { - err := s.ClientStream.SendMsg(m) - if err != nil { - s.release() - } - return err -} - -func (s *limitedClientStream) RecvMsg(m any) error { - err := s.ClientStream.RecvMsg(m) - if err != nil { - s.release() - } - return err -} diff --git a/internal/grpc/concurrencylimiter/limiter_test.go b/internal/grpc/concurrencylimiter/limiter_test.go deleted file mode 100644 index 734fdf97df0..00000000000 --- a/internal/grpc/concurrencylimiter/limiter_test.go +++ /dev/null @@ -1,228 +0,0 @@ -package concurrencylimiter - -import ( - "context" - "errors" - "io" - "sync" - "sync/atomic" - "testing" - - "google.golang.org/grpc" - - "github.com/stretchr/testify/require" - - "github.com/sourcegraph/sourcegraph/internal/limiter" -) - -func TestUnaryClientInterceptor(t *testing.T) { - t.Run("acquire and release limiter", func(t *testing.T) { - limiter := &mockLimiter{} - in := UnaryClientInterceptor(limiter) - - invoker := func(_ context.Context, _ string, _, _ any, _ *grpc.ClientConn, _ ...grpc.CallOption) error { - return nil - } - - _ = in(context.Background(), "Test", struct{}{}, struct{}{}, &grpc.ClientConn{}, invoker) - - if limiter.acquireCount != 1 { - t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount) - } - if limiter.releaseCount != 1 { - t.Errorf("expected release count to be 1, got %d", limiter.releaseCount) - } - }) - - t.Run("invoker error propagated", func(t *testing.T) { - limiter := &mockLimiter{} - in := UnaryClientInterceptor(limiter) - - expectedErr := errors.New("invoker error") - invoker := func(_ context.Context, _ string, _, _ any, _ *grpc.ClientConn, _ ...grpc.CallOption) error { - return expectedErr - } - - err := in(context.Background(), "Test", struct{}{}, struct{}{}, &grpc.ClientConn{}, invoker) - - if err != expectedErr { - t.Errorf("expected error %v, got %v", expectedErr, err) - } - if limiter.acquireCount != 1 { - t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount) - } - if limiter.releaseCount != 1 { - t.Errorf("expected release count to be 1, got %d", limiter.releaseCount) - } - }) - - t.Run("maximum concurrency honored", func(t *testing.T) { - limiter := limiter.New(1) - in := UnaryClientInterceptor(limiter) - - var wg sync.WaitGroup - count := 3000 - wg.Add(count) - - var concurrency atomic.Int32 - var notHonored atomic.Bool - - invoker := func(_ context.Context, _ string, _, _ any, _ *grpc.ClientConn, _ ...grpc.CallOption) error { - defer concurrency.Add(-1) - - if !concurrency.CompareAndSwap(0, 1) { - notHonored.Store(true) - } - - return nil - } - - for range count { - go func() { - _ = in(context.Background(), "Test", struct{}{}, struct{}{}, &grpc.ClientConn{}, invoker) - wg.Done() - }() - } - - wg.Wait() - - if notHonored.Load() { - t.Fatal("concurrency limit not honored") - } - }) -} - -func TestStreamClientInterceptor(t *testing.T) { - t.Run("acquire and release limiter", func(t *testing.T) { - limiter := &mockLimiter{} - in := StreamClientInterceptor(limiter) - - streamer := func(_ context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, _ string, _ ...grpc.CallOption) (grpc.ClientStream, error) { - return &mockClientStream{err: io.EOF}, nil - } - - cc, _ := in(context.Background(), &grpc.StreamDesc{}, &grpc.ClientConn{}, "Test", streamer) - - require.NoError(t, cc.SendMsg(nil)) - require.NoError(t, cc.CloseSend()) - require.Equal(t, io.EOF, cc.RecvMsg(nil)) - - if limiter.acquireCount != 1 { - t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount) - } - if limiter.releaseCount != 1 { - t.Errorf("expected release count to be 1, got %d", limiter.releaseCount) - } - }) - - t.Run("streamer error propagated", func(t *testing.T) { - limiter := &mockLimiter{} - in := StreamClientInterceptor(limiter) - - expectedErr := errors.New("streamer error") - streamer := func(_ context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, _ string, _ ...grpc.CallOption) (grpc.ClientStream, error) { - return nil, expectedErr - } - - _, err := in(context.Background(), &grpc.StreamDesc{}, &grpc.ClientConn{}, "Test", streamer) - - if err != expectedErr { - t.Errorf("expected error %v, got %v", expectedErr, err) - } - if limiter.acquireCount != 1 { - t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount) - } - if limiter.releaseCount != 1 { - t.Errorf("expected release count to be 1, got %d", limiter.releaseCount) - } - - limiter = &mockLimiter{} - in = StreamClientInterceptor(limiter) - streamer = func(_ context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, _ string, _ ...grpc.CallOption) (grpc.ClientStream, error) { - return &mockClientStream{err: expectedErr}, nil - } - cc, err := in(context.Background(), &grpc.StreamDesc{}, &grpc.ClientConn{}, "Test", streamer) - require.NoError(t, err) - - require.NoError(t, cc.SendMsg(nil)) - require.NoError(t, cc.CloseSend()) - require.Equal(t, expectedErr, cc.RecvMsg(nil)) - if limiter.acquireCount != 1 { - t.Errorf("expected acquire count to be 1, got %d", limiter.acquireCount) - } - if limiter.releaseCount != 1 { - t.Errorf("expected release count to be 1, got %d", limiter.releaseCount) - } - }) - - t.Run("maximum concurrency honored", func(t *testing.T) { - limiter := limiter.New(1) - in := StreamClientInterceptor(limiter) - - var wg sync.WaitGroup - count := 3000 - wg.Add(count) - - var concurrency atomic.Int32 - var notHonored atomic.Bool - - streamer := func(_ context.Context, _ *grpc.StreamDesc, _ *grpc.ClientConn, _ string, _ ...grpc.CallOption) (grpc.ClientStream, error) { - if !concurrency.CompareAndSwap(0, 1) { - notHonored.Store(true) - } - - return &limitedClientStream{ - release: func() { - concurrency.Add(-1) - }, - ClientStream: &mockClientStream{err: io.EOF}, - }, nil - } - - for range count { - go func() { - cc, _ := in(context.Background(), &grpc.StreamDesc{}, &grpc.ClientConn{}, "Test", streamer) - require.NoError(t, cc.SendMsg(nil)) - require.NoError(t, cc.CloseSend()) - require.Equal(t, io.EOF, cc.RecvMsg(nil)) - wg.Done() - }() - } - - wg.Wait() - - if notHonored.Load() { - t.Fatal("concurrency limit not honored") - } - }) -} - -type mockLimiter struct { - acquireCount int - releaseCount int -} - -func (m *mockLimiter) Acquire() { - m.acquireCount++ -} - -func (m *mockLimiter) Release() { - m.releaseCount++ -} - -type mockClientStream struct { - grpc.ClientStream - err error -} - -func (m *mockClientStream) CloseSend() error { - return nil -} - -func (m *mockClientStream) SendMsg(x interface{}) error { - return nil -} - -func (m *mockClientStream) RecvMsg(x interface{}) error { - return m.err -} diff --git a/monitoring/definitions/git_server.go b/monitoring/definitions/git_server.go index a8103367142..452ad7d6f73 100644 --- a/monitoring/definitions/git_server.go +++ b/monitoring/definitions/git_server.go @@ -333,19 +333,6 @@ func GitServer() *monitoring.Dashboard { It does not indicate any problems with the instance. `, }, - { - Name: "src_gitserver_client_concurrent_requests", - Description: "number of concurrent requests running against gitserver client", - Query: "sum by (job, instance) (src_gitserver_client_concurrent_requests)", - NoAlert: true, - Panel: monitoring.Panel().LegendFormat("{{job}} {{instance}}"), - Owner: monitoring.ObservableOwnerSource, - Interpretation: ` - This metric is only for informational purposes. It indicates the current number of concurrently running requests by process against gitserver gRPC. - - It does not indicate any problems with the instance, but can give a good indication of load spikes or request throttling. - `, - }, }, }, },