Revert "chore(ci): rework build-tracker to use redis instead of in-memory store of build results" (#64436)

Reverts sourcegraph/sourcegraph#64304

Number of redis related issues cropped up live

## Test plan

CI
This commit is contained in:
Noah S-C 2024-08-13 12:22:41 +01:00 committed by GitHub
parent 4d57eb1188
commit d4fa539b31
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 134 additions and 7978 deletions

View File

@ -132,6 +132,5 @@ write_source_files(
"//cmd/frontend/internal/backend:generate_mocks",
"//cmd/enterprise-portal/internal/codyaccessservice:generate_mocks",
"//cmd/enterprise-portal/internal/routines/licenseexpiration:generate_mocks",
"//dev/build-tracker/build:generate_mocks",
],
)

View File

@ -28,10 +28,7 @@ go_library(
"//lib/managedservicesplatform/runtime",
"//lib/pointers",
"@com_github_buildkite_go_buildkite_v3//buildkite",
"@com_github_go_redsync_redsync_v4//:redsync",
"@com_github_go_redsync_redsync_v4//redis/goredis/v9:goredis",
"@com_github_gorilla_mux//:mux",
"@com_github_redis_go_redis_v9//:go-redis",
"@com_github_sourcegraph_log//:log",
"@com_google_cloud_go_bigquery//:bigquery",
"@org_golang_x_exp//maps",
@ -53,7 +50,6 @@ go_test(
"main_test.go",
"mocks_test.go",
"server_test.go",
"util_test.go",
],
embed = [":build-tracker_lib"],
tags = [TAG_INFRA_DEVINFRA],
@ -66,12 +62,10 @@ go_test(
"//lib/pointers",
"@com_github_buildkite_go_buildkite_v3//buildkite",
"@com_github_gorilla_mux//:mux",
"@com_github_redis_go_redis_v9//:go-redis",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_google_cloud_go_bigquery//:bigquery",
"@org_golang_x_exp//maps",
],
)

View File

@ -1,21 +1,27 @@
# BUILD TRACKER
Build Tracker is a server that listens for build events from Buildkite, stores them in Redis and sends notifications about builds if they've failed.
Build Tracker is a server that listens for build events from Buildkite and stores them in memory and sends notifications about builds if they've failed.
The server currently listens for two events:
- `build.finished`
- `job.finished`
For each `job.finished` event that is received, the corresponding `build` is updated with the job that has finished. On receipt of a `build.finished` event, the server will determine if the build has failed by going through all the contained jobs of the build. If one or more jobs have indeed failed, a notification will be sent over slack. As well as this, the server will trigger a Buildkite job to process CI and Bazel data for the build for analytics purposes.
For each `job.finished` event that is received, the corresponding `build` is updated with the job that has finished. On receipt of a `build.finished` event, the server will determine if the build has failed by going through all the contained jobs of the build. If one or more jobs have indeed failed, a notification will be sent over slack.
## Deployment infrastructure
Build Tracker is deployed in MSP. See the auto-generated [Notion doc](https://www.notion.so/sourcegraph/Build-Tracker-infrastructure-operations-bd66bf25d65d41b4875874a6f4d350cc#711a335bc7554738823293334221a18b) for details around accessing the environment and observability systems.
Build Tracker is deployed in the Buildkite kubernetes cluster of the Sourcegraph CI project on GCP. For more information on the deployment see [infrastructure](https://github.com/sourcegraph/infrastructure/tree/main/buildkite/kubernetes)
It is fine to wipe Redis if there are any issues stemming from data inconsistencies, redsync lock problems etc.
## Build
## Notification testing
Execute the `build.sh` script which will build the docker container and push it to correct GCP registry. Once the image has been pushed the pod needs to be restarted so that it can pick up the new image!
## Test
To run the tests execute `go test .`
### Notification testing
To test the notifications that get sent over slack you can pass the flag `-RunSlackIntegrationTest` as part of your test invocation, with some required configuration:

View File

@ -12,12 +12,9 @@ import (
func deleteOldBuilds(logger log.Logger, store *build.Store, every, window time.Duration) goroutine.BackgroundRoutine {
return goroutine.NewPeriodicGoroutine(context.Background(), goroutine.HandlerFunc(func(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
oldBuilds := make([]int, 0)
now := time.Now()
for _, b := range store.FinishedBuilds(ctx) {
for _, b := range store.FinishedBuilds() {
finishedAt := *b.FinishedAt
delta := now.Sub(finishedAt.Time)
if delta >= window {
@ -26,7 +23,7 @@ func deleteOldBuilds(logger log.Logger, store *build.Store, every, window time.D
}
}
logger.Info("deleting old builds", log.Int("oldBuildCount", len(oldBuilds)))
store.DelByBuildNumber(ctx, oldBuilds...)
store.DelByBuildNumber(oldBuilds...)
return nil
}), goroutine.WithInterval(every), goroutine.WithName("old-build-purger"))
}

View File

@ -1,4 +1,3 @@
load("//dev:go_mockgen.bzl", "go_mockgen")
load("//dev:go_defs.bzl", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
@ -6,7 +5,6 @@ go_library(
name = "build",
srcs = [
"build.go",
"mocks.go",
"steps.go",
],
importpath = "github.com/sourcegraph/sourcegraph/dev/build-tracker/build",
@ -17,7 +15,6 @@ go_library(
"//lib/errors",
"//lib/pointers",
"@com_github_buildkite_go_buildkite_v3//buildkite",
"@com_github_redis_go_redis_v9//:go-redis",
"@com_github_sourcegraph_log//:log",
],
)
@ -30,20 +27,8 @@ go_test(
tags = [TAG_INFRA_DEVINFRA],
deps = [
"@com_github_buildkite_go_buildkite_v3//buildkite",
"@com_github_redis_go_redis_v9//:go-redis",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
go_mockgen(
name = "generate_mocks",
out = "mocks.go",
manifests = [
"//:mockgen.yaml",
"//:mockgen.test.yaml",
"//:mockgen.temp.yaml",
],
deps = [":build"],
)

View File

@ -1,13 +1,10 @@
package build
import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"github.com/buildkite/go-buildkite/v3/buildkite"
"github.com/redis/go-redis/v9"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/dev/build-tracker/notify"
@ -31,6 +28,9 @@ type Build struct {
// ConsecutiveFailure indicates whether this build is the nth consecutive failure.
ConsecutiveFailure int `json:"consecutiveFailures"`
// Mutex is used to to control and stop other changes being made to the build.
sync.Mutex
}
type Step struct {
@ -226,79 +226,45 @@ func (b *Event) GetBuildNumber() int {
type Store struct {
logger log.Logger
r RedisClient
m1 Locker
builds map[int]*Build
// consecutiveFailures tracks how many consecutive build failed events has been
// received by pipeline and branch
consecutiveFailures map[string]int
// m locks all writes to BuildStore properties.
m sync.RWMutex
}
type Locker interface {
LockContext(context.Context) error
Unlock() (bool, error)
}
type RedisClient interface {
redis.StringCmdable
redis.GenericCmdable
}
func NewBuildStore(logger log.Logger, rclient RedisClient, lock Locker) *Store {
func NewBuildStore(logger log.Logger) *Store {
return &Store{
logger: logger.Scoped("store"),
r: rclient,
m1: lock,
builds: make(map[int]*Build),
consecutiveFailures: make(map[string]int),
m: sync.RWMutex{},
}
}
func (s *Store) lock(ctx context.Context) (func(), error) {
err := s.m1.LockContext(ctx)
if err != nil {
s.logger.Error("failed to acquire lock", log.Error(err))
return nil, err
}
return func() {
if _, err := s.m1.Unlock(); err != nil {
s.logger.Error("failed to unlock", log.Error(err))
}
}, nil
}
func (s *Store) Add(ctx context.Context, event *Event) {
unlock, err := s.lock(ctx)
if err != nil {
return
}
defer unlock()
buildb, err := s.r.Get(ctx, "build/"+strconv.Itoa(event.GetBuildNumber())).Bytes()
if err != nil && err != redis.Nil {
s.logger.Error("failed to get build from redis", log.Error(err))
return
}
var build *Build
if err == nil {
if err := json.Unmarshal(buildb, &build); err != nil {
s.logger.Error("failed to unmarshal build", log.Error(err))
return
}
}
func (s *Store) Add(event *Event) {
s.m.Lock()
defer s.m.Unlock()
build, ok := s.builds[event.GetBuildNumber()]
// if we don't know about this build, convert it and add it to the store
if err == redis.Nil {
if !ok {
build = event.WrappedBuild()
s.builds[event.GetBuildNumber()] = build
}
// write out the build to redis at the end, once all mutations are applied
defer func() {
buildb, _ = json.Marshal(build)
s.r.Set(ctx, "build/"+strconv.Itoa(event.GetBuildNumber()), buildb, 0)
}()
// Now that we have a build, lets make sure it isn't modified while we look and possibly update it
build.Lock()
defer build.Unlock()
// if the build is finished replace the original build with the replaced one since it
// will be more up to date, and tack on some finalized data
if event.IsBuildFinished() {
build.updateFromEvent(event)
s.logger.Debug("build finished", log.Int("buildNumber", event.GetBuildNumber()),
log.Int("totalSteps", len(build.Steps)),
log.String("status", build.GetState()))
@ -309,18 +275,12 @@ func (s *Store) Add(ctx context.Context, event *Event) {
// We do this because we do not rely on the state of the build to determine if a build is "successful" or not.
// We instead depend on the state of the jobs associated with said build.
if event.Build.TriggeredFrom != nil {
parentBuildb, err := s.r.Get(ctx, "build/"+strconv.Itoa(*event.Build.TriggeredFrom.BuildNumber)).Bytes()
switch err {
case nil:
var parentBuild *Build
if err := json.Unmarshal(parentBuildb, &parentBuild); err != nil {
s.logger.Error("failed to unmarshal build", log.Error(err))
return
}
parentBuild, ok := s.builds[*event.Build.TriggeredFrom.BuildNumber]
if ok {
parentBuild.Lock()
parentBuild.AppendSteps(build.Steps)
buildb, _ = json.Marshal(parentBuild)
s.r.Set(ctx, "build/"+strconv.Itoa(event.GetBuildNumber()), buildb, 0)
case redis.Nil:
parentBuild.Unlock()
} else {
// If the triggered build doesn't exist, we'll just leave log a message
s.logger.Warn(
"build triggered from non-existent build",
@ -336,19 +296,17 @@ func (s *Store) Add(ctx context.Context, event *Event) {
// if we get a pass, we reset the global count of consecutiveFailures
failuresKey := fmt.Sprintf("%s/%s", build.Pipeline.GetName(), build.GetBranch())
if build.IsFailed() {
i, _ := s.r.Incr(ctx, failuresKey).Result()
build.ConsecutiveFailure = int(i)
s.consecutiveFailures[failuresKey] += 1
build.ConsecutiveFailure = s.consecutiveFailures[failuresKey]
} else {
// We got a pass, reset the global count
if _, err := s.r.Set(ctx, failuresKey, 0, 0).Result(); err != nil {
s.logger.Error("failed to reset consecutive failures count", log.Error(err))
}
s.consecutiveFailures[failuresKey] = 0
}
}
// Keep track of the job, if there is one
newJob := event.WrappedJob()
err = build.AddJob(newJob)
err := build.AddJob(newJob)
if err != nil {
s.logger.Warn("job not added",
log.Error(err),
@ -375,92 +333,35 @@ func (s *Store) Add(ctx context.Context, event *Event) {
}
}
func (s *Store) Set(ctx context.Context, build *Build) {
unlock, err := s.lock(ctx)
if err != nil {
return
}
defer unlock()
buildb, _ := json.Marshal(build)
s.r.Set(ctx, "build/"+strconv.Itoa(*build.Number), buildb, 0)
func (s *Store) Set(build *Build) {
s.m.RLock()
defer s.m.RUnlock()
s.builds[build.GetNumber()] = build
}
func (s *Store) GetByBuildNumber(ctx context.Context, num int) *Build {
unlock, err := s.lock(ctx)
if err != nil {
return nil
}
defer unlock()
func (s *Store) GetByBuildNumber(num int) *Build {
s.m.RLock()
defer s.m.RUnlock()
buildb, err := s.r.Get(ctx, "build/"+strconv.Itoa(num)).Bytes()
if err != nil && err != redis.Nil {
s.logger.Error("failed to get build from redis", log.Error(err))
return nil
}
var build *Build
if err == nil {
if err := json.Unmarshal(buildb, &build); err != nil {
s.logger.Error("failed to unmarshal build", log.Error(err))
return nil
}
}
return build
return s.builds[num]
}
func (s *Store) DelByBuildNumber(ctx context.Context, buildNumbers ...int) {
unlock, err := s.lock(ctx)
if err != nil {
return
}
defer unlock()
func (s *Store) DelByBuildNumber(buildNumbers ...int) {
s.m.Lock()
defer s.m.Unlock()
nums := make([]string, 0, len(buildNumbers))
for _, num := range buildNumbers {
nums = append(nums, "build/"+strconv.Itoa(num))
delete(s.builds, num)
}
s.r.Del(ctx, nums...)
s.logger.Info("deleted builds", log.Int("totalBuilds", len(buildNumbers)))
}
func (s *Store) FinishedBuilds(ctx context.Context) []*Build {
unlock, err := s.lock(ctx)
if err != nil {
return nil
}
defer unlock()
buildKeys, err := s.r.Keys(ctx, "build/*").Result()
if err != nil {
s.logger.Error("failed to get build keys", log.Error(err))
return nil
}
builds := make([]*Build, 0, len(buildKeys))
values, err := s.r.MGet(ctx, buildKeys...).Result()
if err != nil {
s.logger.Error("failed to get build values", log.Error(err))
return nil
}
for _, value := range values {
if value == nil {
continue
}
var build *Build
if err := json.Unmarshal([]byte(value.(string)), &build); err != nil {
s.logger.Error("failed to unmarshal build", log.Error(err))
continue
}
builds = append(builds, build)
}
func (s *Store) FinishedBuilds() []*Build {
s.m.RLock()
defer s.m.RUnlock()
finished := make([]*Build, 0)
for _, b := range builds {
for _, b := range s.builds {
if b.IsFinished() {
s.logger.Debug("build is finished", log.Int("buildNumber", b.GetNumber()), log.String("state", b.GetState()))
finished = append(finished, b)

View File

@ -1,13 +1,9 @@
package build
import (
"context"
"strings"
"testing"
"time"
"github.com/buildkite/go-buildkite/v3/buildkite"
"github.com/redis/go-redis/v9"
"github.com/sourcegraph/log/logtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -90,61 +86,37 @@ func TestBuildStoreAdd(t *testing.T) {
return &Event{Name: EventBuildFinished, Build: buildkite.Build{State: nil, Number: &n}, Pipeline: buildkite.Pipeline{Name: &pipeline}}
}
failureCounter := 0
builds := make(map[string][]byte)
mockredis := NewMockRedisClient()
mockredis.IncrFunc.SetDefaultHook(func(ctx context.Context, key string) *redis.IntCmd {
failureCounter++
return redis.NewIntResult(int64(failureCounter), nil)
})
mockredis.SetFunc.SetDefaultHook(func(ctx context.Context, s string, i interface{}, d time.Duration) *redis.StatusCmd {
if strings.HasPrefix(s, pipeline) {
failureCounter = 0
} else {
builds[s] = i.([]byte)
}
return redis.NewStatusCmd(ctx)
})
mockredis.GetFunc.SetDefaultHook(func(ctx context.Context, s string) *redis.StringCmd {
if strings.HasPrefix(s, "build/") {
if b, ok := builds[s]; ok {
return redis.NewStringResult(string(b), nil)
}
return redis.NewStringResult("", redis.Nil)
}
return redis.NewStringCmd(ctx)
})
store := NewBuildStore(logtest.Scoped(t), mockredis, NewMockLocker())
store := NewBuildStore(logtest.Scoped(t))
t.Run("subsequent failures should increment ConsecutiveFailure", func(t *testing.T) {
store.Add(context.Background(), eventFailed(1))
build := store.GetByBuildNumber(context.Background(), 1)
store.Add(eventFailed(1))
build := store.GetByBuildNumber(1)
assert.Equal(t, build.ConsecutiveFailure, 1)
store.Add(context.Background(), eventFailed(2))
build = store.GetByBuildNumber(context.Background(), 2)
store.Add(eventFailed(2))
build = store.GetByBuildNumber(2)
assert.Equal(t, build.ConsecutiveFailure, 2)
store.Add(context.Background(), eventFailed(3))
build = store.GetByBuildNumber(context.Background(), 3)
store.Add(eventFailed(3))
build = store.GetByBuildNumber(3)
assert.Equal(t, build.ConsecutiveFailure, 3)
})
t.Run("a pass should reset ConsecutiveFailure", func(t *testing.T) {
store.Add(context.Background(), eventFailed(4))
build := store.GetByBuildNumber(context.Background(), 4)
store.Add(eventFailed(4))
build := store.GetByBuildNumber(4)
assert.Equal(t, build.ConsecutiveFailure, 4)
store.Add(context.Background(), eventSucceeded(5))
build = store.GetByBuildNumber(context.Background(), 5)
store.Add(eventSucceeded(5))
build = store.GetByBuildNumber(5)
assert.Equal(t, build.ConsecutiveFailure, 0)
store.Add(context.Background(), eventFailed(6))
build = store.GetByBuildNumber(context.Background(), 6)
store.Add(eventFailed(6))
build = store.GetByBuildNumber(6)
assert.Equal(t, build.ConsecutiveFailure, 1)
store.Add(context.Background(), eventSucceeded(7))
build = store.GetByBuildNumber(context.Background(), 7)
store.Add(eventSucceeded(7))
build = store.GetByBuildNumber(7)
assert.Equal(t, build.ConsecutiveFailure, 0)
})
}
@ -159,33 +131,17 @@ func TestBuildFailedJobs(t *testing.T) {
Name: EventJobFinished,
Build: buildkite.Build{State: &buildState, Number: &buildNumber},
Pipeline: buildkite.Pipeline{Name: &pipeline},
Job: buildkite.Job{Name: &name, ExitStatus: &exitCode, State: &jobState},
}
Job: buildkite.Job{Name: &name, ExitStatus: &exitCode, State: &jobState}}
}
builds := make(map[string][]byte)
mockredis := NewMockRedisClient()
mockredis.SetFunc.SetDefaultHook(func(ctx context.Context, s string, i interface{}, d time.Duration) *redis.StatusCmd {
builds[s] = i.([]byte)
return redis.NewStatusCmd(ctx)
})
mockredis.GetFunc.SetDefaultHook(func(ctx context.Context, s string) *redis.StringCmd {
if strings.HasPrefix(s, "build/") {
if b, ok := builds[s]; ok {
return redis.NewStringResult(string(b), nil)
}
return redis.NewStringResult("", redis.Nil)
}
return redis.NewStringCmd(ctx)
})
store := NewBuildStore(logtest.Scoped(t), mockredis, NewMockLocker())
store := NewBuildStore(logtest.Scoped(t))
t.Run("failed jobs should contain different jobs", func(t *testing.T) {
store.Add(context.Background(), eventFailed("Test 1", 1))
store.Add(context.Background(), eventFailed("Test 2", 1))
store.Add(context.Background(), eventFailed("Test 3", 1))
store.Add(eventFailed("Test 1", 1))
store.Add(eventFailed("Test 2", 1))
store.Add(eventFailed("Test 3", 1))
build := store.GetByBuildNumber(context.Background(), 1)
build := store.GetByBuildNumber(1)
unique := make(map[string]int)
for _, s := range FindFailedSteps(build.Steps) {

File diff suppressed because it is too large Load Diff

View File

@ -436,7 +436,7 @@ func TestServerNotify(t *testing.T) {
SlackChannel: os.Getenv("SLACK_CHANNEL"),
}
server := NewServer(":8080", logger, conf, nil, nil, nil)
server := NewServer(":8080", logger, conf, nil)
num := 160000
url := "http://www.google.com"

View File

@ -11,10 +11,7 @@ import (
"time"
"github.com/buildkite/go-buildkite/v3/buildkite"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/gorilla/mux"
"github.com/redis/go-redis/v9"
"golang.org/x/exp/maps"
"github.com/sourcegraph/log"
@ -50,7 +47,7 @@ type Server struct {
}
// NewServer creatse a new server to listen for Buildkite webhook events.
func NewServer(addr string, logger log.Logger, c config.Config, bqWriter BigQueryWriter, rclient build.RedisClient, rlock build.Locker) *Server {
func NewServer(addr string, logger log.Logger, c config.Config, bqWriter BigQueryWriter) *Server {
logger = logger.Scoped("server")
if testutil.IsTest && c.BuildkiteToken == "" {
@ -65,7 +62,7 @@ func NewServer(addr string, logger log.Logger, c config.Config, bqWriter BigQuer
server := &Server{
logger: logger,
store: build.NewBuildStore(logger, rclient, rlock),
store: build.NewBuildStore(logger),
config: &c,
notifyClient: notify.NewClient(logger, c.SlackToken, c.SlackChannel),
bqWriter: bqWriter,
@ -108,9 +105,6 @@ func (s *Server) Stop(ctx context.Context) error {
}
func (s *Server) handleGetBuild(w http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), time.Second*30)
defer cancel()
if s.config.Production {
user, pass, ok := req.BasicAuth()
if !ok {
@ -143,7 +137,7 @@ func (s *Server) handleGetBuild(w http.ResponseWriter, req *http.Request) {
}
s.logger.Info("retrieving build", log.Int("buildNumber", buildNum))
build := s.store.GetByBuildNumber(ctx, buildNum)
build := s.store.GetByBuildNumber(buildNum)
if build == nil {
s.logger.Debug("no build found", log.Int("buildNumber", buildNum))
w.WriteHeader(http.StatusNotFound)
@ -164,9 +158,6 @@ func (s *Server) handleGetBuild(w http.ResponseWriter, req *http.Request) {
// Note that if we received an unwanted event ie. the event is not "job.finished" or "build.finished" we respond with a 200 OK regardless.
// Once all the conditions are met, the event is processed in a go routine with `processEvent`
func (s *Server) handleEvent(w http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), time.Second*30)
defer cancel()
h, ok := req.Header["X-Buildkite-Token"]
if !ok || len(h) == 0 {
w.WriteHeader(http.StatusBadRequest)
@ -202,9 +193,9 @@ func (s *Server) handleEvent(w http.ResponseWriter, req *http.Request) {
}
if testutil.IsTest {
s.processEvent(ctx, &event)
s.processEvent(&event)
} else {
go s.processEvent(ctx, &event)
go s.processEvent(&event)
}
w.WriteHeader(http.StatusOK)
}
@ -230,6 +221,10 @@ func (s *Server) notifyIfFailed(b *build.Build) error {
if info.BuildStatus == string(build.BuildFailed) || info.BuildStatus == string(build.BuildFixed) {
s.logger.Info("sending notification for build", log.Int("buildNumber", b.GetNumber()), log.String("status", string(info.BuildStatus)))
// We lock the build while we send a notification so that we can ensure any late jobs do not interfere with what
// we're about to send.
b.Lock()
defer b.Unlock()
err := s.notifyClient.Send(info)
return err
}
@ -300,11 +295,11 @@ func (s *Server) triggerMetricsPipeline(b *build.Build) error {
// processEvent processes a BuildEvent received from Buildkite. If the event is for a `build.finished` event we get the
// full build which includes all recorded jobs for the build and send a notification.
// processEvent delegates the decision to actually send a notifcation
func (s *Server) processEvent(ctx context.Context, event *build.Event) {
func (s *Server) processEvent(event *build.Event) {
if event.Build.Number != nil {
s.logger.Info("processing event", log.String("eventName", event.Name), log.Int("buildNumber", event.GetBuildNumber()), log.String("jobName", event.GetJobName()))
s.store.Add(ctx, event)
b := s.store.GetByBuildNumber(ctx, event.GetBuildNumber())
s.store.Add(event)
b := s.store.GetByBuildNumber(event.GetBuildNumber())
if event.IsBuildFinished() {
if *event.Build.Branch == "main" {
if err := s.notifyIfFailed(b); err != nil {
@ -428,14 +423,7 @@ func (s Service) Initialize(ctx context.Context, logger log.Logger, contract run
return nil, err
}
redisOpts, err := redis.ParseURL(*contract.RedisEndpoint)
if err != nil {
return nil, err
}
rclient := redis.NewClient(redisOpts)
rlock := redsync.New(goredis.NewPool(rclient)).NewMutex("build-tracker", redsync.WithExpiry(time.Second*30))
server := NewServer(fmt.Sprintf(":%d", contract.Port), logger, config, bqWriter, rclient, rlock)
server := NewServer(fmt.Sprintf(":%d", contract.Port), logger, config, bqWriter)
return background.CombinedRoutine{
server,

View File

@ -12,7 +12,6 @@ import (
"cloud.google.com/go/bigquery"
"github.com/buildkite/go-buildkite/v3/buildkite"
"github.com/gorilla/mux"
"github.com/redis/go-redis/v9"
"github.com/sourcegraph/log/logtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -30,7 +29,7 @@ func TestGetBuild(t *testing.T) {
req, _ := http.NewRequest(http.MethodGet, "/-/debug/1234", nil)
req = mux.SetURLVars(req, map[string]string{"buildNumber": "1234"})
t.Run("401 Unauthorized when in production mode and incorrect credentials", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}, nil, nil, nil)
server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}, nil)
rec := httptest.NewRecorder()
server.handleGetBuild(rec, req)
@ -43,7 +42,7 @@ func TestGetBuild(t *testing.T) {
})
t.Run("404 for build that does not exist", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{}, nil, mockRedisClient(), build.NewMockLocker())
server := NewServer(":8080", logger, config.Config{}, nil)
rec := httptest.NewRecorder()
server.handleGetBuild(rec, req)
@ -51,7 +50,7 @@ func TestGetBuild(t *testing.T) {
})
t.Run("get marshalled json for build", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{}, nil, mockRedisClient(), build.NewMockLocker())
server := NewServer(":8080", logger, config.Config{}, nil)
rec := httptest.NewRecorder()
num := 1234
@ -91,7 +90,7 @@ func TestGetBuild(t *testing.T) {
expected := event.WrappedBuild()
expected.AddJob(event.WrappedJob())
server.store.Add(context.Background(), &event)
server.store.Add(&event)
server.handleGetBuild(rec, req)
@ -104,14 +103,12 @@ func TestGetBuild(t *testing.T) {
})
t.Run("200 with valid credentials in production mode", func(t *testing.T) {
rclient := build.NewMockRedisClient()
rclient.GetFunc.SetDefaultReturn(redis.NewStringResult("", redis.Nil))
server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}, nil, rclient, build.NewMockLocker())
server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}, nil)
rec := httptest.NewRecorder()
req.SetBasicAuth("devx", server.config.DebugPassword)
num := 1234
server.store.Add(context.Background(), &build.Event{
server.store.Add(&build.Event{
Name: "Fake",
Build: buildkite.Build{
Number: &num,
@ -119,7 +116,6 @@ func TestGetBuild(t *testing.T) {
Pipeline: buildkite.Pipeline{},
Job: buildkite.Job{},
})
rclient.GetFunc.PushReturn(redis.NewStringResult("{}", nil))
server.handleGetBuild(rec, req)
require.Equal(t, http.StatusOK, rec.Result().StatusCode)
@ -139,15 +135,15 @@ func TestOldBuildsGetDeleted(t *testing.T) {
}
t.Run("All old builds get removed", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{}, nil, mockRedisClient(), build.NewMockLocker())
server := NewServer(":8080", logger, config.Config{}, nil)
b := finishedBuild(1, "passed", time.Now().AddDate(-1, 0, 0))
server.store.Set(context.Background(), b)
server.store.Set(b)
b = finishedBuild(2, "canceled", time.Now().AddDate(0, -1, 0))
server.store.Set(context.Background(), b)
server.store.Set(b)
b = finishedBuild(3, "failed", time.Now().AddDate(0, 0, -1))
server.store.Set(context.Background(), b)
server.store.Set(b)
ctx, cancel := context.WithCancel(context.Background())
go func() {
@ -160,22 +156,22 @@ func TestOldBuildsGetDeleted(t *testing.T) {
time.Sleep(20 * time.Millisecond)
cancel()
builds := server.store.FinishedBuilds(context.Background())
builds := server.store.FinishedBuilds()
if len(builds) != 0 {
t.Errorf("Not all old builds removed. Got %d, wanted %d", len(builds), 0)
}
})
t.Run("1 build left after old builds are removed", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{}, nil, mockRedisClient(), build.NewMockLocker())
server := NewServer(":8080", logger, config.Config{}, nil)
b := finishedBuild(1, "canceled", time.Now().AddDate(-1, 0, 0))
server.store.Set(context.Background(), b)
server.store.Set(b)
b = finishedBuild(2, "passed", time.Now().AddDate(0, -1, 0))
server.store.Set(context.Background(), b)
server.store.Set(b)
b = finishedBuild(3, "failed", time.Now())
server.store.Set(context.Background(), b)
server.store.Set(b)
ctx, cancel := context.WithCancel(context.Background())
go func() {
@ -188,7 +184,7 @@ func TestOldBuildsGetDeleted(t *testing.T) {
time.Sleep(20 * time.Millisecond)
cancel()
builds := server.store.FinishedBuilds(context.Background())
builds := server.store.FinishedBuilds()
if len(builds) != 1 {
t.Errorf("Expected one build to be left over. Got %d, wanted %d", len(builds), 1)
@ -256,75 +252,74 @@ func TestProcessEvent(t *testing.T) {
return &build.Event{Name: build.EventBuildFinished, Build: buildkite.Build{State: &state, Branch: &branch, Number: &buildNumber, Pipeline: pipeline}, Job: job.Job, Pipeline: *pipeline}
}
t.Run("no send notification on unfinished builds", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{}, nil, mockRedisClient(), build.NewMockLocker())
server := NewServer(":8080", logger, config.Config{}, nil)
mockNotifyClient := &MockNotificationClient{}
server.notifyClient = mockNotifyClient
buildNumber := 1234
buildStartedEvent := newBuildEvent("test 2", buildNumber, "failed", "main", 1)
buildStartedEvent.Name = "build.started"
server.processEvent(context.Background(), buildStartedEvent)
server.processEvent(buildStartedEvent)
require.Equal(t, 0, mockNotifyClient.sendCalled)
server.processEvent(context.Background(), newJobEvent("test", buildNumber, 0))
server.processEvent(newJobEvent("test", buildNumber, 0))
// build is not finished so we should send nothing
require.Equal(t, 0, mockNotifyClient.sendCalled)
builds := server.store.FinishedBuilds(context.Background())
builds := server.store.FinishedBuilds()
require.Equal(t, 1, len(builds))
})
t.Run("failed build sends notification", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{}, nil, mockRedisClient(), build.NewMockLocker())
server := NewServer(":8080", logger, config.Config{}, nil)
mockNotifyClient := &MockNotificationClient{}
server.notifyClient = mockNotifyClient
buildNumber := 1234
server.processEvent(context.Background(), newJobEvent("test", buildNumber, 0))
server.processEvent(context.Background(), newBuildEvent("test 2", buildNumber, "failed", "main", 1))
server.processEvent(newJobEvent("test", buildNumber, 0))
server.processEvent(newBuildEvent("test 2", buildNumber, "failed", "main", 1))
require.Equal(t, 1, mockNotifyClient.sendCalled)
builds := server.store.FinishedBuilds(context.Background())
builds := server.store.FinishedBuilds()
require.Equal(t, 1, len(builds))
require.Equal(t, 1234, *builds[0].Number)
require.Equal(t, "failed", *builds[0].State)
})
t.Run("passed build sends notification", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{}, nil, mockRedisClient(), build.NewMockLocker())
server := NewServer(":8080", logger, config.Config{}, nil)
mockNotifyClient := &MockNotificationClient{}
server.notifyClient = mockNotifyClient
buildNumber := 1234
server.processEvent(context.Background(), newJobEvent("test", buildNumber, 0))
server.processEvent(context.Background(), newBuildEvent("test 2", buildNumber, "passed", "main", 0))
server.processEvent(newJobEvent("test", buildNumber, 0))
server.processEvent(newBuildEvent("test 2", buildNumber, "passed", "main", 0))
require.Equal(t, 0, mockNotifyClient.sendCalled)
builds := server.store.FinishedBuilds(context.Background())
builds := server.store.FinishedBuilds()
require.Equal(t, 1, len(builds))
require.Equal(t, 1234, *builds[0].Number)
require.Equal(t, "passed", *builds[0].State)
})
t.Run("failed build, then passed build sends fixed notification", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{}, nil, mockRedisClient(), build.NewMockLocker())
server := NewServer(":8080", logger, config.Config{}, nil)
mockNotifyClient := &MockNotificationClient{}
server.notifyClient = mockNotifyClient
buildNumber := 1234
server.processEvent(context.Background(), newJobEvent("test 1", buildNumber, 1))
server.processEvent(context.Background(), newBuildEvent("test 2", buildNumber, "failed", "main", 1))
server.processEvent(newJobEvent("test 1", buildNumber, 1))
server.processEvent(newBuildEvent("test 2", buildNumber, "failed", "main", 1))
require.Equal(t, 1, mockNotifyClient.sendCalled)
builds := server.store.FinishedBuilds(context.Background())
builds := server.store.FinishedBuilds()
require.Equal(t, 1, len(builds))
require.Equal(t, 1234, *builds[0].Number)
require.Equal(t, "failed", *builds[0].State)
server.processEvent(context.Background(), newJobEvent("test 1", buildNumber, 0))
server.processEvent(context.Background(), newBuildEvent("test 2", buildNumber, "passed", "main", 0))
server.processEvent(newJobEvent("test 1", buildNumber, 0))
server.processEvent(newBuildEvent("test 2", buildNumber, "passed", "main", 0))
builds = server.store.FinishedBuilds(context.Background())
builds = server.store.FinishedBuilds()
require.Equal(t, 1, len(builds))
require.Equal(t, 1234, *builds[0].Number)
require.Equal(t, "passed", *builds[0].State)
@ -346,7 +341,7 @@ func TestProcessEvent(t *testing.T) {
server := NewServer(":8080", logger, config.Config{
BuildkiteWebhookToken: "asdf",
}, mockBq, nil, nil)
}, mockBq)
rw := httptest.NewRecorder()
body := bytes.NewBufferString(`{

View File

@ -1,53 +0,0 @@
package main
import (
"context"
"strings"
"time"
"github.com/redis/go-redis/v9"
"golang.org/x/exp/maps"
"github.com/sourcegraph/sourcegraph/dev/build-tracker/build"
)
func mockRedisClient() *build.MockRedisClient {
bjsons := make(map[string][]byte)
failureCount := 0
rclient := build.NewMockRedisClient()
rclient.SetFunc.SetDefaultHook(func(ctx context.Context, s string, i interface{}, d time.Duration) *redis.StatusCmd {
if strings.HasPrefix(s, "build/") {
bjsons[s] = i.([]byte)
} else {
failureCount = 0
}
return redis.NewStatusCmd(context.Background())
})
rclient.KeysFunc.SetDefaultHook(func(ctx context.Context, s string) *redis.StringSliceCmd {
return redis.NewStringSliceResult(maps.Keys(bjsons), nil)
})
rclient.MGetFunc.SetDefaultHook(func(ctx context.Context, s ...string) *redis.SliceCmd {
var result []interface{}
for _, key := range s {
result = append(result, string(bjsons[key]))
}
return redis.NewSliceResult(result, nil)
})
rclient.DelFunc.PushHook(func(ctx context.Context, s ...string) *redis.IntCmd {
for _, key := range s {
delete(bjsons, key)
}
return redis.NewIntCmd(ctx)
})
rclient.GetFunc.SetDefaultHook(func(ctx context.Context, s string) *redis.StringCmd {
if b, ok := bjsons[s]; strings.HasPrefix(s, "build/") && ok {
return redis.NewStringResult(string(b), nil)
}
return redis.NewStringResult("", redis.Nil)
})
rclient.IncrFunc.PushHook(func(ctx context.Context, s string) *redis.IntCmd {
failureCount++
return redis.NewIntResult(int64(failureCount), nil)
})
return rclient
}

View File

@ -380,12 +380,6 @@
package: main
interfaces:
- BigQueryWriter
- filename: dev/build-tracker/build/mocks.go
path: github.com/sourcegraph/sourcegraph/dev/build-tracker/build
package: build
interfaces:
- Locker
- RedisClient
- filename: cmd/cody-gateway/internal/actor/productsubscription/productsubscriptiontest/mocks.go
package: productsubscriptiontest
sources: