build-tracker: emit agent state-change webhook events to BigQuery (#62598)

Track when agents come on & offline in build-tracker

Closes https://github.com/sourcegraph/sourcegraph/issues/61275

## Test plan

Added unit test, the rest will be decided by the Prod Gods
This commit is contained in:
Noah S-C 2024-05-12 15:20:04 +01:00 committed by GitHub
parent 74f2e25a0c
commit 0be15f8983
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 303 additions and 24 deletions

View File

@ -123,5 +123,6 @@ write_source_files(
"//lib/background:generate_mocks",
"//cmd/gitserver/internal/gitserverfs:generate_mocks",
"//cmd/repo-updater/internal/gitserver:generate_mocks",
"//dev/build-tracker:generate_mocks",
],
)

View File

@ -1,3 +1,4 @@
load("//dev:go_mockgen.bzl", "go_mockgen")
load("//dev:go_defs.bzl", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
load("//dev:oci_defs.bzl", "image_repository", "oci_image", "oci_push", "oci_tarball")
@ -9,6 +10,7 @@ go_library(
name = "build-tracker_lib",
srcs = [
"background.go",
"bigquery.go",
"main.go",
],
importpath = "github.com/sourcegraph/sourcegraph/dev/build-tracker",
@ -27,6 +29,7 @@ go_library(
"@com_github_buildkite_go_buildkite_v3//buildkite",
"@com_github_gorilla_mux//:mux",
"@com_github_sourcegraph_log//:log",
"@com_google_cloud_go_bigquery//:bigquery",
"@org_golang_x_exp//maps",
],
)
@ -43,6 +46,7 @@ go_test(
srcs = [
"integration_test.go",
"main_test.go",
"mocks_test.go",
"server_test.go",
],
embed = [":build-tracker_lib"],
@ -57,6 +61,7 @@ go_test(
"@com_github_gorilla_mux//:mux",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//require",
"@com_google_cloud_go_bigquery//:bigquery",
],
)
@ -107,3 +112,14 @@ msp_delivery(
msp_service_id = "build-tracker",
repository = "us.gcr.io/sourcegraph-dev/build-tracker",
)
go_mockgen(
name = "generate_mocks",
out = "mocks_test.go",
manifests = [
"//:mockgen.yaml",
"//:mockgen.test.yaml",
"//:mockgen.temp.yaml",
],
deps = [":build-tracker_lib"],
)

View File

@ -0,0 +1,47 @@
package main
import (
"context"
"encoding/base64"
"strings"
"cloud.google.com/go/bigquery"
"github.com/buildkite/go-buildkite/v3/buildkite"
)
type BigQueryWriter interface {
Write(ctx context.Context, values ...bigquery.ValueSaver) error
}
type BuildkiteAgentEvent struct {
event string
buildkite.Agent
}
// Save implements bigquery.ValueSaver.
func (b *BuildkiteAgentEvent) Save() (row map[string]bigquery.Value, insertID string, err error) {
var queues []string
for _, meta := range b.Metadata {
if strings.HasPrefix(meta, "queue=") {
queues = append(queues, strings.TrimPrefix(meta, "queue="))
}
}
uuid, err := base64.StdEncoding.DecodeString(*b.ID)
if err != nil {
return nil, "", err
}
return map[string]bigquery.Value{
"event": strings.TrimPrefix(b.event, "agent."),
"name": b.Name,
"hostname": b.Hostname,
"version": b.Version,
"ip_address": b.IPAddress,
"queues": strings.Join(queues, ","),
"user_agent": b.UserAgent,
"uuid": strings.TrimPrefix(string(uuid), "Agent---"),
}, "", nil
}
var _ (bigquery.ValueSaver) = (*BuildkiteAgentEvent)(nil)

View File

@ -174,6 +174,8 @@ type Event struct {
Pipeline buildkite.Pipeline `json:"pipeline,omitempty"`
// Job is the current job being executed by the Build. When the event is not a job event variant, then this job will be empty
Job buildkite.Job `json:"job,omitempty"`
// Agent is the agent that is running the job that triggered this event. When the event is not an agent event variant, then this will be empty
Agent buildkite.Agent `json:"agent,omitempty"`
}
func (b *Event) WrappedBuild() *Build {

View File

@ -436,7 +436,7 @@ func TestServerNotify(t *testing.T) {
SlackChannel: os.Getenv("SLACK_CHANNEL"),
}
server := NewServer(":8080", logger, conf)
server := NewServer(":8080", logger, conf, nil)
num := 160000
url := "http://www.google.com"

View File

@ -42,11 +42,12 @@ type Server struct {
config *config.Config
notifyClient notify.NotificationClient
bkClient *buildkite.Client
bqWriter BigQueryWriter
http *http.Server
}
// NewServer creatse a new server to listen for Buildkite webhook events.
func NewServer(addr string, logger log.Logger, c config.Config) *Server {
func NewServer(addr string, logger log.Logger, c config.Config, bqWriter BigQueryWriter) *Server {
logger = logger.Scoped("server")
if testutil.IsTest && c.BuildkiteToken == "" {
@ -64,6 +65,7 @@ func NewServer(addr string, logger log.Logger, c config.Config) *Server {
store: build.NewBuildStore(logger),
config: &c,
notifyClient: notify.NewClient(logger, c.SlackToken, c.SlackChannel),
bqWriter: bqWriter,
bkClient: buildkite.NewClient(bk.Client()),
}
@ -288,18 +290,27 @@ func (s *Server) triggerMetricsPipeline(b *build.Build) error {
// full build which includes all recorded jobs for the build and send a notification.
// processEvent delegates the decision to actually send a notifcation
func (s *Server) processEvent(event *build.Event) {
s.logger.Info("processing event", log.String("eventName", event.Name), log.Int("buildNumber", event.GetBuildNumber()), log.String("jobName", event.GetJobName()))
s.store.Add(event)
b := s.store.GetByBuildNumber(event.GetBuildNumber())
if event.IsBuildFinished() {
if *event.Build.Branch == "main" {
if err := s.notifyIfFailed(b); err != nil {
s.logger.Error("failed to send notification for build", log.Int("buildNumber", event.GetBuildNumber()), log.Error(err))
if event.Build.Number != nil {
s.logger.Info("processing event", log.String("eventName", event.Name), log.Int("buildNumber", event.GetBuildNumber()), log.String("jobName", event.GetJobName()))
s.store.Add(event)
b := s.store.GetByBuildNumber(event.GetBuildNumber())
if event.IsBuildFinished() {
if *event.Build.Branch == "main" {
if err := s.notifyIfFailed(b); err != nil {
s.logger.Error("failed to send notification for build", log.Int("buildNumber", event.GetBuildNumber()), log.Error(err))
}
}
if err := s.triggerMetricsPipeline(b); err != nil {
s.logger.Error("failed to trigger metrics pipeline for build", log.Int("buildNumber", event.GetBuildNumber()), log.Error(err))
}
}
if err := s.triggerMetricsPipeline(b); err != nil {
s.logger.Error("failed to trigger metrics pipeline for build", log.Int("buildNumber", event.GetBuildNumber()), log.Error(err))
} else {
if err := s.bqWriter.Write(context.Background(), &BuildkiteAgentEvent{
event: event.Name,
Agent: event.Agent,
}); err != nil {
s.logger.Error("failed to write agent event to BigQuery", log.String("event", event.Name), log.String("agentID", *event.Agent.ID))
}
}
}
@ -362,7 +373,7 @@ func getDevxServiceLatestCommit(token string) (string, error) {
}
req.Header.Add("Authorization", "Bearer "+token)
req.Header.Add("Accept", "application/vnd.github+json")
// I d as the docs command https://docs.github.com/en/rest/branches/branches#get-a-branch
// I do as the docs command https://docs.github.com/en/rest/branches/branches#get-a-branch
req.Header.Add("X-GitHub-Api-Version", "2022-11-28")
resp, err := http.DefaultClient.Do(req)
@ -398,7 +409,12 @@ type Service struct{}
func (s Service) Initialize(ctx context.Context, logger log.Logger, contract runtime.Contract, config config.Config) (background.Routine, error) {
logger.Info("config loaded from environment", log.Object("config", log.String("SlackChannel", config.SlackChannel), log.Bool("Production", config.Production)))
server := NewServer(fmt.Sprintf(":%d", contract.Port), logger, config)
bqWriter, err := contract.BigQuery.GetTableWriter(context.Background(), "agent_status")
if err != nil {
return nil, err
}
server := NewServer(fmt.Sprintf(":%d", contract.Port), logger, config, bqWriter)
return background.CombinedRoutine{
server,

171
dev/build-tracker/mocks_test.go generated Normal file
View File

@ -0,0 +1,171 @@
// Code generated by go-mockgen 1.3.7; DO NOT EDIT.
//
// This file was generated by running `sg generate` (or `go-mockgen`) at the root of
// this repository. To add additional mocks to this or another package, add a new entry
// to the mockgen.yaml file in the root of this repository.
package main
import (
"context"
"sync"
bigquery "cloud.google.com/go/bigquery"
)
// MockBigQueryWriter is a mock implementation of the BigQueryWriter
// interface (from the package
// github.com/sourcegraph/sourcegraph/dev/build-tracker) used for unit
// testing.
type MockBigQueryWriter struct {
// WriteFunc is an instance of a mock function object controlling the
// behavior of the method Write.
WriteFunc *BigQueryWriterWriteFunc
}
// NewMockBigQueryWriter creates a new mock of the BigQueryWriter interface.
// All methods return zero values for all results, unless overwritten.
func NewMockBigQueryWriter() *MockBigQueryWriter {
return &MockBigQueryWriter{
WriteFunc: &BigQueryWriterWriteFunc{
defaultHook: func(context.Context, ...bigquery.ValueSaver) (r0 error) {
return
},
},
}
}
// NewStrictMockBigQueryWriter creates a new mock of the BigQueryWriter
// interface. All methods panic on invocation, unless overwritten.
func NewStrictMockBigQueryWriter() *MockBigQueryWriter {
return &MockBigQueryWriter{
WriteFunc: &BigQueryWriterWriteFunc{
defaultHook: func(context.Context, ...bigquery.ValueSaver) error {
panic("unexpected invocation of MockBigQueryWriter.Write")
},
},
}
}
// NewMockBigQueryWriterFrom creates a new mock of the MockBigQueryWriter
// interface. All methods delegate to the given implementation, unless
// overwritten.
func NewMockBigQueryWriterFrom(i BigQueryWriter) *MockBigQueryWriter {
return &MockBigQueryWriter{
WriteFunc: &BigQueryWriterWriteFunc{
defaultHook: i.Write,
},
}
}
// BigQueryWriterWriteFunc describes the behavior when the Write method of
// the parent MockBigQueryWriter instance is invoked.
type BigQueryWriterWriteFunc struct {
defaultHook func(context.Context, ...bigquery.ValueSaver) error
hooks []func(context.Context, ...bigquery.ValueSaver) error
history []BigQueryWriterWriteFuncCall
mutex sync.Mutex
}
// Write delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockBigQueryWriter) Write(v0 context.Context, v1 ...bigquery.ValueSaver) error {
r0 := m.WriteFunc.nextHook()(v0, v1...)
m.WriteFunc.appendCall(BigQueryWriterWriteFuncCall{v0, v1, r0})
return r0
}
// SetDefaultHook sets function that is called when the Write method of the
// parent MockBigQueryWriter instance is invoked and the hook queue is
// empty.
func (f *BigQueryWriterWriteFunc) SetDefaultHook(hook func(context.Context, ...bigquery.ValueSaver) error) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Write method of the parent MockBigQueryWriter instance invokes the hook
// at the front of the queue and discards it. After the queue is empty, the
// default hook function is invoked for any future action.
func (f *BigQueryWriterWriteFunc) PushHook(hook func(context.Context, ...bigquery.ValueSaver) error) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *BigQueryWriterWriteFunc) SetDefaultReturn(r0 error) {
f.SetDefaultHook(func(context.Context, ...bigquery.ValueSaver) error {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *BigQueryWriterWriteFunc) PushReturn(r0 error) {
f.PushHook(func(context.Context, ...bigquery.ValueSaver) error {
return r0
})
}
func (f *BigQueryWriterWriteFunc) nextHook() func(context.Context, ...bigquery.ValueSaver) error {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *BigQueryWriterWriteFunc) appendCall(r0 BigQueryWriterWriteFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of BigQueryWriterWriteFuncCall objects
// describing the invocations of this function.
func (f *BigQueryWriterWriteFunc) History() []BigQueryWriterWriteFuncCall {
f.mutex.Lock()
history := make([]BigQueryWriterWriteFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// BigQueryWriterWriteFuncCall is an object that describes an invocation of
// method Write on an instance of MockBigQueryWriter.
type BigQueryWriterWriteFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is a slice containing the values of the variadic arguments
// passed to this method invocation.
Arg1 []bigquery.ValueSaver
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 error
}
// Args returns an interface slice containing the arguments of this
// invocation. The variadic slice argument is flattened in this array such
// that one positional argument and three variadic arguments would result in
// a slice of four, not two.
func (c BigQueryWriterWriteFuncCall) Args() []interface{} {
trailing := []interface{}{}
for _, val := range c.Arg1 {
trailing = append(trailing, val)
}
return append([]interface{}{c.Arg0}, trailing...)
}
// Results returns an interface slice containing the results of this
// invocation.
func (c BigQueryWriterWriteFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}

View File

@ -26,7 +26,7 @@ func TestGetBuild(t *testing.T) {
req, _ := http.NewRequest(http.MethodGet, "/-/debug/1234", nil)
req = mux.SetURLVars(req, map[string]string{"buildNumber": "1234"})
t.Run("401 Unauthorized when in production mode and incorrect credentials", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"})
server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}, nil)
rec := httptest.NewRecorder()
server.handleGetBuild(rec, req)
@ -39,7 +39,7 @@ func TestGetBuild(t *testing.T) {
})
t.Run("404 for build that does not exist", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{})
server := NewServer(":8080", logger, config.Config{}, nil)
rec := httptest.NewRecorder()
server.handleGetBuild(rec, req)
@ -47,7 +47,7 @@ func TestGetBuild(t *testing.T) {
})
t.Run("get marshalled json for build", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{})
server := NewServer(":8080", logger, config.Config{}, nil)
rec := httptest.NewRecorder()
num := 1234
@ -100,7 +100,7 @@ func TestGetBuild(t *testing.T) {
})
t.Run("200 with valid credentials in production mode", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"})
server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}, nil)
rec := httptest.NewRecorder()
req.SetBasicAuth("devx", server.config.DebugPassword)
@ -132,7 +132,7 @@ func TestOldBuildsGetDeleted(t *testing.T) {
}
t.Run("All old builds get removed", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{})
server := NewServer(":8080", logger, config.Config{}, nil)
b := finishedBuild(1, "passed", time.Now().AddDate(-1, 0, 0))
server.store.Set(b)
@ -154,7 +154,7 @@ func TestOldBuildsGetDeleted(t *testing.T) {
}
})
t.Run("1 build left after old builds are removed", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{})
server := NewServer(":8080", logger, config.Config{}, nil)
b := finishedBuild(1, "canceled", time.Now().AddDate(-1, 0, 0))
server.store.Set(b)
@ -237,7 +237,7 @@ func TestProcessEvent(t *testing.T) {
return &build.Event{Name: build.EventBuildFinished, Build: buildkite.Build{State: &state, Branch: &branch, Number: &buildNumber, Pipeline: pipeline}, Job: job.Job, Pipeline: *pipeline}
}
t.Run("no send notification on unfinished builds", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{})
server := NewServer(":8080", logger, config.Config{}, nil)
mockNotifyClient := &MockNotificationClient{}
server.notifyClient = mockNotifyClient
buildNumber := 1234
@ -254,7 +254,7 @@ func TestProcessEvent(t *testing.T) {
})
t.Run("failed build sends notification", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{})
server := NewServer(":8080", logger, config.Config{}, nil)
mockNotifyClient := &MockNotificationClient{}
server.notifyClient = mockNotifyClient
buildNumber := 1234
@ -270,7 +270,7 @@ func TestProcessEvent(t *testing.T) {
})
t.Run("passed build sends notification", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{})
server := NewServer(":8080", logger, config.Config{}, nil)
mockNotifyClient := &MockNotificationClient{}
server.notifyClient = mockNotifyClient
buildNumber := 1234
@ -286,7 +286,7 @@ func TestProcessEvent(t *testing.T) {
})
t.Run("failed build, then passed build sends fixed notification", func(t *testing.T) {
server := NewServer(":8080", logger, config.Config{})
server := NewServer(":8080", logger, config.Config{}, nil)
mockNotifyClient := &MockNotificationClient{}
server.notifyClient = mockNotifyClient
buildNumber := 1234
@ -312,4 +312,25 @@ func TestProcessEvent(t *testing.T) {
// fixed notification
require.Equal(t, 2, mockNotifyClient.sendCalled)
})
t.Run("agent webhooks", func(t *testing.T) {
mockBq := NewMockBigQueryWriter()
server := NewServer(":8080", logger, config.Config{}, mockBq)
server.processEvent(&build.Event{
Name: "agent.connected",
Agent: buildkite.Agent{
ID: pointers.Ptr("QWdlbnQtLS0wMThmNjI4Yy1jY2M0LTRhMmEtOTJjOS1kN2NjODE5MDZiNzc="),
Name: pointers.Ptr("banana-agent-default"),
ConnectedState: pointers.Ptr("connected"),
Hostname: pointers.Ptr("banana-agent-deadbeef"),
IPAddress: pointers.Ptr("10.0.0.5"),
UserAgent: pointers.Ptr("buildkite-agent/4.2.0 (linux; amd64)"),
Version: pointers.Ptr("4.2.0"),
Metadata: []string{"queue=express"},
},
})
require.Equal(t, 1, len(mockBq.WriteFunc.History()))
})
}

View File

@ -423,3 +423,8 @@
- service
- RepositoryLocker
- RepositoryLock
- filename: dev/build-tracker/mocks_test.go
path: github.com/sourcegraph/sourcegraph/dev/build-tracker
package: main
interfaces:
- BigQueryWriter