httpcli: switch outbound request logger to FIFOList (#46576)

The use of AllKeys had a hidden cost of scanning _every_ key in redis,
not just those associated with outbound requests. So this commit ports
the outbound-requests logger to use a FIFOList, the same datastructure
used by our slow request logger.

This has the additional benefit of reducing our API surface in rcache,
since this was the only user of AllKeys and the Multi* functions.

We could likely do this better since we do things like lookup all logged
requests each time we want to read. But given the value is small (500
items) and it is only done on read (not log time), this seems like a
fine tradeoff versus doing a bigger change.

Test Plan: go test
This commit is contained in:
Keegan Carruthers-Smith 2023-01-18 12:08:10 +02:00 committed by GitHub
parent 2376163f36
commit d4e84f0306
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 139 additions and 115 deletions

View File

@ -18,7 +18,11 @@ import (
"github.com/sourcegraph/sourcegraph/lib/errors"
)
var outboundRequestsRedisCache = rcache.NewWithTTL("outbound-requests", 604800)
// outboundRequestsRedisFIFOListDefaultSize sets a default value for the FIFO list.
const outboundRequestsRedisFIFOListDefaultSize = 50
// outboundRequestsRedisFIFOList is a FIFO redis cache to store the requests.
var outboundRequestsRedisFIFOList = rcache.NewFIFOList("outbound-requests", outboundRequestsRedisFIFOListDefaultSize)
const sourcegraphPrefix = "github.com/sourcegraph/sourcegraph/"
@ -32,6 +36,9 @@ func redisLoggerMiddleware() Middleware {
limit := OutboundRequestLogLimit()
shouldRedactSensitiveHeaders := !deploy.IsDev(deploy.Type()) || RedactOutboundRequestHeaders()
// Update limit in case it changed
outboundRequestsRedisFIFOList.SetMaxSize(int(limit))
// Feature is turned off, do not log
if limit == 0 {
return resp, err
@ -106,12 +113,9 @@ func redisLoggerMiddleware() Middleware {
}
// Save new item
outboundRequestsRedisCache.Set(key, logItemJson)
// Delete excess items
if deleteErr := deleteExcessItems(outboundRequestsRedisCache, int(limit)); deleteErr != nil {
if err := outboundRequestsRedisFIFOList.Insert(logItemJson); err != nil {
middlewareErrors = errors.Append(middlewareErrors,
errors.Wrap(deleteErr, "delete excess items"))
errors.Wrap(err, "insert log item"))
}
return resp, err
@ -119,90 +123,76 @@ func redisLoggerMiddleware() Middleware {
}
}
func deleteExcessItems(c *rcache.Cache, limit int) error {
keys, err := c.ListKeys(context.Background())
if err != nil {
return errors.Wrap(err, "list keys")
}
// Delete all but the last N keys
if len(keys) > limit {
sort.Strings(keys)
c.DeleteMulti(keys[:len(keys)-limit]...)
}
return nil
}
// GetOutboundRequestLogItems returns all outbound request log items after the given key,
// in ascending order, trimmed to maximum {limit} items. Example for `after`: "2021-01-01T00_00_00.000000".
func GetOutboundRequestLogItems(ctx context.Context, after string) ([]*types.OutboundRequestLogItem, error) {
var limit = OutboundRequestLogLimit()
var limit = int(OutboundRequestLogLimit())
if limit == 0 {
return []*types.OutboundRequestLogItem{}, nil
}
// Get values from Redis
rawItems, err := getAllValuesAfter(ctx, outboundRequestsRedisCache, after, int(limit))
items, err := getOutboundRequestLogItems(ctx, func(item *types.OutboundRequestLogItem) bool {
if after == "" {
return true
} else {
return item.ID > after
}
})
if err != nil {
return nil, err
}
// Convert raw Redis store items to log items
items := make([]*types.OutboundRequestLogItem, 0, len(rawItems))
if len(items) > limit {
items = items[:limit]
}
return items, nil
}
func GetOutboundRequestLogItem(key string) (*types.OutboundRequestLogItem, error) {
items, err := getOutboundRequestLogItems(context.Background(), func(item *types.OutboundRequestLogItem) bool {
return item.ID == key
})
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, errors.New("item not found")
}
return items[0], nil
}
// getOutboundRequestLogItems returns all items where pred returns true,
// sorted by ID ascending.
func getOutboundRequestLogItems(ctx context.Context, pred func(*types.OutboundRequestLogItem) bool) ([]*types.OutboundRequestLogItem, error) {
// We fetch all values from redis, then just return those matching pred.
// Given the max size is enforced as 500, this is fine. But if we ever
// raise the limit, we likely need to think of an alternative way to do
// pagination against lists / or also store the items so we can look up by
// key
rawItems, err := outboundRequestsRedisFIFOList.All(ctx)
if err != nil {
return nil, errors.Wrap(err, "list all log items")
}
var items []*types.OutboundRequestLogItem
for _, rawItem := range rawItems {
var item types.OutboundRequestLogItem
err = json.Unmarshal(rawItem, &item)
if err != nil {
return nil, err
}
items = append(items, &item)
}
return items, nil
}
func GetOutboundRequestLogItem(key string) (*types.OutboundRequestLogItem, error) {
rawItem, ok := outboundRequestsRedisCache.Get(key)
if !ok {
return nil, errors.New("item not found")
}
var item types.OutboundRequestLogItem
err := json.Unmarshal(rawItem, &item)
if err != nil {
return nil, err
}
return &item, nil
}
// getAllValuesAfter returns all items after the given key, in ascending order, trimmed to maximum {limit} items.
func getAllValuesAfter(ctx context.Context, c *rcache.Cache, after string, limit int) ([][]byte, error) {
all, err := c.ListKeys(ctx)
if err != nil {
return nil, err
}
var keys []string
if after != "" {
for _, key := range all {
if key > after {
keys = append(keys, key)
}
if pred(&item) {
items = append(items, &item)
}
} else {
keys = all
}
// Sort ascending
sort.Strings(keys)
sort.Slice(items, func(i, j int) bool {
return items[i].ID < items[j].ID
})
// Limit to N
if len(keys) > limit {
keys = keys[len(keys)-limit:]
}
return c.GetMulti(keys...), nil
return items, nil
}
func redactSensitiveHeaders(headers http.Header) http.Header {

View File

@ -4,13 +4,12 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"k8s.io/utils/strings/slices"
"github.com/sourcegraph/sourcegraph/internal/rcache"
@ -72,9 +71,7 @@ func TestRedisLoggerMiddleware(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
// Enable the feature
old := OutboundRequestLogLimit()
SetOutboundRequestLogLimit(1)
t.Cleanup(func() { SetOutboundRequestLogLimit(old) })
setOutboundRequestLogLimit(t, 1)
// Build client with middleware
cli := redisLoggerMiddleware()(tc.cli)
@ -110,29 +107,87 @@ func TestRedisLoggerMiddleware(t *testing.T) {
}
})
}
}
func TestRedisLoggerMiddleware_getAllValuesAfter(t *testing.T) {
func TestRedisLoggerMiddleware_multiple(t *testing.T) {
// This test ensures that we correctly apply limits bigger than 1, as well
// as ensuring GetOutboundRequestLogItem works.
requests := 10
limit := requests / 2
rcache.SetupForTest(t)
c := rcache.NewWithTTL("some_prefix", 1)
ctx := context.Background()
var pairs = make([][2]string, 10)
for i := 0; i < 10; i++ {
pairs[i] = [2]string{"key" + strconv.Itoa(i), "value" + strconv.Itoa(i)}
// Enable the feature
setOutboundRequestLogLimit(t, int32(limit))
// Build client with middleware
cli := redisLoggerMiddleware()(newFakeClient(http.StatusOK, []byte(`{"responseBody":true}`), nil))
// Send requests and track the URLs we send so we can compare later to
// what was stored.
var wantURLs []string
for i := 0; i < requests; i++ {
u := fmt.Sprintf("http://dev/%d", i)
wantURLs = append(wantURLs, u)
req, _ := http.NewRequest("GET", u, strings.NewReader("horse"))
_, err := cli.Do(req)
if err != nil {
t.Fatal(err)
}
// Our keys are based on time, so we add a tiny sleep to ensure we
// don't duplicate keys.
time.Sleep(10 * time.Millisecond)
}
c.SetMulti(pairs...)
key := "key5"
got, err := getAllValuesAfter(ctx, c, key, 10)
// Updated want by what is actually kept
wantURLs = wantURLs[len(wantURLs)-limit:]
assert.Nil(t, err)
assert.Len(t, got, 4)
gotURLs := func(items []*types.OutboundRequestLogItem) []string {
var got []string
for _, item := range items {
got = append(got, item.URL)
}
return got
}
got, err = getAllValuesAfter(ctx, c, key, 2)
assert.Nil(t, err)
assert.Len(t, got, 2)
// Check logged request
logged, err := GetOutboundRequestLogItems(context.Background(), "")
if err != nil {
t.Fatalf("couldnt get logged requests: %s", err)
}
if diff := cmp.Diff(wantURLs, gotURLs(logged)); diff != "" {
t.Fatalf("unexpected logged URLs (-want, +got):\n%s", diff)
}
// Check that after works
after := logged[limit/2-1].ID
wantURLs = wantURLs[limit/2:]
afterLogged, err := GetOutboundRequestLogItems(context.Background(), after)
if err != nil {
t.Fatalf("couldnt get logged requests: %s", err)
}
if diff := cmp.Diff(wantURLs, gotURLs(afterLogged)); diff != "" {
t.Fatalf("unexpected logged with after URLs (-want, +got):\n%s", diff)
}
// Check that GetOutboundRequestLogItem works
for _, want := range logged {
got, err := GetOutboundRequestLogItem(want.ID)
if err != nil {
t.Fatalf("failed to find log item %+v", want)
}
if diff := cmp.Diff(want, got); diff != "" {
t.Fatalf("unexpected item returned via GetOutboundRequestLogItem (-want, +got):\n%s", diff)
}
}
// Finally check we return an error if the item key doesn't exist.
_, err = GetOutboundRequestLogItem("does not exist")
if got, want := fmt.Sprintf("%s", err), "item not found"; got != want {
t.Fatalf("unexpected error for GetOutboundRequestLogItem(\"does not exist\") got=%q want=%q", got, want)
}
}
func TestRedisLoggerMiddleware_redactSensitiveHeaders(t *testing.T) {
@ -168,33 +223,6 @@ func TestRedisLoggerMiddleware_redactSensitiveHeaders(t *testing.T) {
}
}
func TestRedisLoggerMiddleware_DeleteFirstN(t *testing.T) {
rcache.SetupForTest(t)
c := rcache.NewWithTTL("some_prefix", 1)
// Add 10 key-value pairs
var pairs = make([][2]string, 10)
for i := 0; i < 10; i++ {
pairs[i] = [2]string{"key" + strconv.Itoa(i), "value" + strconv.Itoa(i)}
}
c.SetMulti(pairs...)
// Delete the first 4 key-value pairs
_ = deleteExcessItems(c, 4)
got, listErr := c.ListKeys(context.Background())
assert.Nil(t, listErr)
assert.Len(t, got, 4)
assert.NotContains(t, got, "key0") // 0 through 5 should be deleted
assert.NotContains(t, got, "key5")
assert.Contains(t, got, "key6") // 6 through 9 (4 items) should be kept
assert.Contains(t, got, "key9")
}
func TestRedisLoggerMiddleware_formatStackFrame(t *testing.T) {
tests := []struct {
name string
@ -235,3 +263,9 @@ func TestRedisLoggerMiddleware_formatStackFrame(t *testing.T) {
})
}
}
func setOutboundRequestLogLimit(t *testing.T, limit int32) {
old := OutboundRequestLogLimit()
SetOutboundRequestLogLimit(limit)
t.Cleanup(func() { SetOutboundRequestLogLimit(old) })
}