diff --git a/dev/BUILD.bazel b/dev/BUILD.bazel index 09d7b817411..a318899969f 100644 --- a/dev/BUILD.bazel +++ b/dev/BUILD.bazel @@ -123,5 +123,6 @@ write_source_files( "//lib/background:generate_mocks", "//cmd/gitserver/internal/gitserverfs:generate_mocks", "//cmd/repo-updater/internal/gitserver:generate_mocks", + "//dev/build-tracker:generate_mocks", ], ) diff --git a/dev/build-tracker/BUILD.bazel b/dev/build-tracker/BUILD.bazel index f53eed1b9c3..59492bf023d 100644 --- a/dev/build-tracker/BUILD.bazel +++ b/dev/build-tracker/BUILD.bazel @@ -1,3 +1,4 @@ +load("//dev:go_mockgen.bzl", "go_mockgen") load("//dev:go_defs.bzl", "go_test") load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") load("//dev:oci_defs.bzl", "image_repository", "oci_image", "oci_push", "oci_tarball") @@ -9,6 +10,7 @@ go_library( name = "build-tracker_lib", srcs = [ "background.go", + "bigquery.go", "main.go", ], importpath = "github.com/sourcegraph/sourcegraph/dev/build-tracker", @@ -27,6 +29,7 @@ go_library( "@com_github_buildkite_go_buildkite_v3//buildkite", "@com_github_gorilla_mux//:mux", "@com_github_sourcegraph_log//:log", + "@com_google_cloud_go_bigquery//:bigquery", "@org_golang_x_exp//maps", ], ) @@ -43,6 +46,7 @@ go_test( srcs = [ "integration_test.go", "main_test.go", + "mocks_test.go", "server_test.go", ], embed = [":build-tracker_lib"], @@ -57,6 +61,7 @@ go_test( "@com_github_gorilla_mux//:mux", "@com_github_sourcegraph_log//logtest", "@com_github_stretchr_testify//require", + "@com_google_cloud_go_bigquery//:bigquery", ], ) @@ -107,3 +112,14 @@ msp_delivery( msp_service_id = "build-tracker", repository = "us.gcr.io/sourcegraph-dev/build-tracker", ) + +go_mockgen( + name = "generate_mocks", + out = "mocks_test.go", + manifests = [ + "//:mockgen.yaml", + "//:mockgen.test.yaml", + "//:mockgen.temp.yaml", + ], + deps = [":build-tracker_lib"], +) diff --git a/dev/build-tracker/bigquery.go b/dev/build-tracker/bigquery.go new file mode 100644 index 00000000000..8edc09538b3 --- /dev/null +++ b/dev/build-tracker/bigquery.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + "encoding/base64" + "strings" + + "cloud.google.com/go/bigquery" + "github.com/buildkite/go-buildkite/v3/buildkite" +) + +type BigQueryWriter interface { + Write(ctx context.Context, values ...bigquery.ValueSaver) error +} + +type BuildkiteAgentEvent struct { + event string + buildkite.Agent +} + +// Save implements bigquery.ValueSaver. +func (b *BuildkiteAgentEvent) Save() (row map[string]bigquery.Value, insertID string, err error) { + var queues []string + for _, meta := range b.Metadata { + if strings.HasPrefix(meta, "queue=") { + queues = append(queues, strings.TrimPrefix(meta, "queue=")) + } + } + + uuid, err := base64.StdEncoding.DecodeString(*b.ID) + if err != nil { + return nil, "", err + } + + return map[string]bigquery.Value{ + "event": strings.TrimPrefix(b.event, "agent."), + "name": b.Name, + "hostname": b.Hostname, + "version": b.Version, + "ip_address": b.IPAddress, + "queues": strings.Join(queues, ","), + "user_agent": b.UserAgent, + "uuid": strings.TrimPrefix(string(uuid), "Agent---"), + }, "", nil +} + +var _ (bigquery.ValueSaver) = (*BuildkiteAgentEvent)(nil) diff --git a/dev/build-tracker/build/build.go b/dev/build-tracker/build/build.go index d63d0a0f851..18c1f3b0c84 100644 --- a/dev/build-tracker/build/build.go +++ b/dev/build-tracker/build/build.go @@ -174,6 +174,8 @@ type Event struct { Pipeline buildkite.Pipeline `json:"pipeline,omitempty"` // Job is the current job being executed by the Build. When the event is not a job event variant, then this job will be empty Job buildkite.Job `json:"job,omitempty"` + // Agent is the agent that is running the job that triggered this event. When the event is not an agent event variant, then this will be empty + Agent buildkite.Agent `json:"agent,omitempty"` } func (b *Event) WrappedBuild() *Build { diff --git a/dev/build-tracker/integration_test.go b/dev/build-tracker/integration_test.go index 3357e62df69..c85c2309e2b 100644 --- a/dev/build-tracker/integration_test.go +++ b/dev/build-tracker/integration_test.go @@ -436,7 +436,7 @@ func TestServerNotify(t *testing.T) { SlackChannel: os.Getenv("SLACK_CHANNEL"), } - server := NewServer(":8080", logger, conf) + server := NewServer(":8080", logger, conf, nil) num := 160000 url := "http://www.google.com" diff --git a/dev/build-tracker/main.go b/dev/build-tracker/main.go index 2761f751a25..c3b0f3725fc 100644 --- a/dev/build-tracker/main.go +++ b/dev/build-tracker/main.go @@ -42,11 +42,12 @@ type Server struct { config *config.Config notifyClient notify.NotificationClient bkClient *buildkite.Client + bqWriter BigQueryWriter http *http.Server } // NewServer creatse a new server to listen for Buildkite webhook events. -func NewServer(addr string, logger log.Logger, c config.Config) *Server { +func NewServer(addr string, logger log.Logger, c config.Config, bqWriter BigQueryWriter) *Server { logger = logger.Scoped("server") if testutil.IsTest && c.BuildkiteToken == "" { @@ -64,6 +65,7 @@ func NewServer(addr string, logger log.Logger, c config.Config) *Server { store: build.NewBuildStore(logger), config: &c, notifyClient: notify.NewClient(logger, c.SlackToken, c.SlackChannel), + bqWriter: bqWriter, bkClient: buildkite.NewClient(bk.Client()), } @@ -288,18 +290,27 @@ func (s *Server) triggerMetricsPipeline(b *build.Build) error { // full build which includes all recorded jobs for the build and send a notification. // processEvent delegates the decision to actually send a notifcation func (s *Server) processEvent(event *build.Event) { - s.logger.Info("processing event", log.String("eventName", event.Name), log.Int("buildNumber", event.GetBuildNumber()), log.String("jobName", event.GetJobName())) - s.store.Add(event) - b := s.store.GetByBuildNumber(event.GetBuildNumber()) - if event.IsBuildFinished() { - if *event.Build.Branch == "main" { - if err := s.notifyIfFailed(b); err != nil { - s.logger.Error("failed to send notification for build", log.Int("buildNumber", event.GetBuildNumber()), log.Error(err)) + if event.Build.Number != nil { + s.logger.Info("processing event", log.String("eventName", event.Name), log.Int("buildNumber", event.GetBuildNumber()), log.String("jobName", event.GetJobName())) + s.store.Add(event) + b := s.store.GetByBuildNumber(event.GetBuildNumber()) + if event.IsBuildFinished() { + if *event.Build.Branch == "main" { + if err := s.notifyIfFailed(b); err != nil { + s.logger.Error("failed to send notification for build", log.Int("buildNumber", event.GetBuildNumber()), log.Error(err)) + } + } + + if err := s.triggerMetricsPipeline(b); err != nil { + s.logger.Error("failed to trigger metrics pipeline for build", log.Int("buildNumber", event.GetBuildNumber()), log.Error(err)) } } - - if err := s.triggerMetricsPipeline(b); err != nil { - s.logger.Error("failed to trigger metrics pipeline for build", log.Int("buildNumber", event.GetBuildNumber()), log.Error(err)) + } else { + if err := s.bqWriter.Write(context.Background(), &BuildkiteAgentEvent{ + event: event.Name, + Agent: event.Agent, + }); err != nil { + s.logger.Error("failed to write agent event to BigQuery", log.String("event", event.Name), log.String("agentID", *event.Agent.ID)) } } } @@ -362,7 +373,7 @@ func getDevxServiceLatestCommit(token string) (string, error) { } req.Header.Add("Authorization", "Bearer "+token) req.Header.Add("Accept", "application/vnd.github+json") - // I d as the docs command https://docs.github.com/en/rest/branches/branches#get-a-branch + // I do as the docs command https://docs.github.com/en/rest/branches/branches#get-a-branch req.Header.Add("X-GitHub-Api-Version", "2022-11-28") resp, err := http.DefaultClient.Do(req) @@ -398,7 +409,12 @@ type Service struct{} func (s Service) Initialize(ctx context.Context, logger log.Logger, contract runtime.Contract, config config.Config) (background.Routine, error) { logger.Info("config loaded from environment", log.Object("config", log.String("SlackChannel", config.SlackChannel), log.Bool("Production", config.Production))) - server := NewServer(fmt.Sprintf(":%d", contract.Port), logger, config) + bqWriter, err := contract.BigQuery.GetTableWriter(context.Background(), "agent_status") + if err != nil { + return nil, err + } + + server := NewServer(fmt.Sprintf(":%d", contract.Port), logger, config, bqWriter) return background.CombinedRoutine{ server, diff --git a/dev/build-tracker/mocks_test.go b/dev/build-tracker/mocks_test.go new file mode 100644 index 00000000000..406b8ebad19 --- /dev/null +++ b/dev/build-tracker/mocks_test.go @@ -0,0 +1,171 @@ +// Code generated by go-mockgen 1.3.7; DO NOT EDIT. +// +// This file was generated by running `sg generate` (or `go-mockgen`) at the root of +// this repository. To add additional mocks to this or another package, add a new entry +// to the mockgen.yaml file in the root of this repository. + +package main + +import ( + "context" + "sync" + + bigquery "cloud.google.com/go/bigquery" +) + +// MockBigQueryWriter is a mock implementation of the BigQueryWriter +// interface (from the package +// github.com/sourcegraph/sourcegraph/dev/build-tracker) used for unit +// testing. +type MockBigQueryWriter struct { + // WriteFunc is an instance of a mock function object controlling the + // behavior of the method Write. + WriteFunc *BigQueryWriterWriteFunc +} + +// NewMockBigQueryWriter creates a new mock of the BigQueryWriter interface. +// All methods return zero values for all results, unless overwritten. +func NewMockBigQueryWriter() *MockBigQueryWriter { + return &MockBigQueryWriter{ + WriteFunc: &BigQueryWriterWriteFunc{ + defaultHook: func(context.Context, ...bigquery.ValueSaver) (r0 error) { + return + }, + }, + } +} + +// NewStrictMockBigQueryWriter creates a new mock of the BigQueryWriter +// interface. All methods panic on invocation, unless overwritten. +func NewStrictMockBigQueryWriter() *MockBigQueryWriter { + return &MockBigQueryWriter{ + WriteFunc: &BigQueryWriterWriteFunc{ + defaultHook: func(context.Context, ...bigquery.ValueSaver) error { + panic("unexpected invocation of MockBigQueryWriter.Write") + }, + }, + } +} + +// NewMockBigQueryWriterFrom creates a new mock of the MockBigQueryWriter +// interface. All methods delegate to the given implementation, unless +// overwritten. +func NewMockBigQueryWriterFrom(i BigQueryWriter) *MockBigQueryWriter { + return &MockBigQueryWriter{ + WriteFunc: &BigQueryWriterWriteFunc{ + defaultHook: i.Write, + }, + } +} + +// BigQueryWriterWriteFunc describes the behavior when the Write method of +// the parent MockBigQueryWriter instance is invoked. +type BigQueryWriterWriteFunc struct { + defaultHook func(context.Context, ...bigquery.ValueSaver) error + hooks []func(context.Context, ...bigquery.ValueSaver) error + history []BigQueryWriterWriteFuncCall + mutex sync.Mutex +} + +// Write delegates to the next hook function in the queue and stores the +// parameter and result values of this invocation. +func (m *MockBigQueryWriter) Write(v0 context.Context, v1 ...bigquery.ValueSaver) error { + r0 := m.WriteFunc.nextHook()(v0, v1...) + m.WriteFunc.appendCall(BigQueryWriterWriteFuncCall{v0, v1, r0}) + return r0 +} + +// SetDefaultHook sets function that is called when the Write method of the +// parent MockBigQueryWriter instance is invoked and the hook queue is +// empty. +func (f *BigQueryWriterWriteFunc) SetDefaultHook(hook func(context.Context, ...bigquery.ValueSaver) error) { + f.defaultHook = hook +} + +// PushHook adds a function to the end of hook queue. Each invocation of the +// Write method of the parent MockBigQueryWriter instance invokes the hook +// at the front of the queue and discards it. After the queue is empty, the +// default hook function is invoked for any future action. +func (f *BigQueryWriterWriteFunc) PushHook(hook func(context.Context, ...bigquery.ValueSaver) error) { + f.mutex.Lock() + f.hooks = append(f.hooks, hook) + f.mutex.Unlock() +} + +// SetDefaultReturn calls SetDefaultHook with a function that returns the +// given values. +func (f *BigQueryWriterWriteFunc) SetDefaultReturn(r0 error) { + f.SetDefaultHook(func(context.Context, ...bigquery.ValueSaver) error { + return r0 + }) +} + +// PushReturn calls PushHook with a function that returns the given values. +func (f *BigQueryWriterWriteFunc) PushReturn(r0 error) { + f.PushHook(func(context.Context, ...bigquery.ValueSaver) error { + return r0 + }) +} + +func (f *BigQueryWriterWriteFunc) nextHook() func(context.Context, ...bigquery.ValueSaver) error { + f.mutex.Lock() + defer f.mutex.Unlock() + + if len(f.hooks) == 0 { + return f.defaultHook + } + + hook := f.hooks[0] + f.hooks = f.hooks[1:] + return hook +} + +func (f *BigQueryWriterWriteFunc) appendCall(r0 BigQueryWriterWriteFuncCall) { + f.mutex.Lock() + f.history = append(f.history, r0) + f.mutex.Unlock() +} + +// History returns a sequence of BigQueryWriterWriteFuncCall objects +// describing the invocations of this function. +func (f *BigQueryWriterWriteFunc) History() []BigQueryWriterWriteFuncCall { + f.mutex.Lock() + history := make([]BigQueryWriterWriteFuncCall, len(f.history)) + copy(history, f.history) + f.mutex.Unlock() + + return history +} + +// BigQueryWriterWriteFuncCall is an object that describes an invocation of +// method Write on an instance of MockBigQueryWriter. +type BigQueryWriterWriteFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context + // Arg1 is a slice containing the values of the variadic arguments + // passed to this method invocation. + Arg1 []bigquery.ValueSaver + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 error +} + +// Args returns an interface slice containing the arguments of this +// invocation. The variadic slice argument is flattened in this array such +// that one positional argument and three variadic arguments would result in +// a slice of four, not two. +func (c BigQueryWriterWriteFuncCall) Args() []interface{} { + trailing := []interface{}{} + for _, val := range c.Arg1 { + trailing = append(trailing, val) + } + + return append([]interface{}{c.Arg0}, trailing...) +} + +// Results returns an interface slice containing the results of this +// invocation. +func (c BigQueryWriterWriteFuncCall) Results() []interface{} { + return []interface{}{c.Result0} +} diff --git a/dev/build-tracker/server_test.go b/dev/build-tracker/server_test.go index e9eab786af9..184d5715ee5 100644 --- a/dev/build-tracker/server_test.go +++ b/dev/build-tracker/server_test.go @@ -26,7 +26,7 @@ func TestGetBuild(t *testing.T) { req, _ := http.NewRequest(http.MethodGet, "/-/debug/1234", nil) req = mux.SetURLVars(req, map[string]string{"buildNumber": "1234"}) t.Run("401 Unauthorized when in production mode and incorrect credentials", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}) + server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}, nil) rec := httptest.NewRecorder() server.handleGetBuild(rec, req) @@ -39,7 +39,7 @@ func TestGetBuild(t *testing.T) { }) t.Run("404 for build that does not exist", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{}) + server := NewServer(":8080", logger, config.Config{}, nil) rec := httptest.NewRecorder() server.handleGetBuild(rec, req) @@ -47,7 +47,7 @@ func TestGetBuild(t *testing.T) { }) t.Run("get marshalled json for build", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{}) + server := NewServer(":8080", logger, config.Config{}, nil) rec := httptest.NewRecorder() num := 1234 @@ -100,7 +100,7 @@ func TestGetBuild(t *testing.T) { }) t.Run("200 with valid credentials in production mode", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}) + server := NewServer(":8080", logger, config.Config{Production: true, DebugPassword: "this is a test"}, nil) rec := httptest.NewRecorder() req.SetBasicAuth("devx", server.config.DebugPassword) @@ -132,7 +132,7 @@ func TestOldBuildsGetDeleted(t *testing.T) { } t.Run("All old builds get removed", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{}) + server := NewServer(":8080", logger, config.Config{}, nil) b := finishedBuild(1, "passed", time.Now().AddDate(-1, 0, 0)) server.store.Set(b) @@ -154,7 +154,7 @@ func TestOldBuildsGetDeleted(t *testing.T) { } }) t.Run("1 build left after old builds are removed", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{}) + server := NewServer(":8080", logger, config.Config{}, nil) b := finishedBuild(1, "canceled", time.Now().AddDate(-1, 0, 0)) server.store.Set(b) @@ -237,7 +237,7 @@ func TestProcessEvent(t *testing.T) { return &build.Event{Name: build.EventBuildFinished, Build: buildkite.Build{State: &state, Branch: &branch, Number: &buildNumber, Pipeline: pipeline}, Job: job.Job, Pipeline: *pipeline} } t.Run("no send notification on unfinished builds", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{}) + server := NewServer(":8080", logger, config.Config{}, nil) mockNotifyClient := &MockNotificationClient{} server.notifyClient = mockNotifyClient buildNumber := 1234 @@ -254,7 +254,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("failed build sends notification", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{}) + server := NewServer(":8080", logger, config.Config{}, nil) mockNotifyClient := &MockNotificationClient{} server.notifyClient = mockNotifyClient buildNumber := 1234 @@ -270,7 +270,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("passed build sends notification", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{}) + server := NewServer(":8080", logger, config.Config{}, nil) mockNotifyClient := &MockNotificationClient{} server.notifyClient = mockNotifyClient buildNumber := 1234 @@ -286,7 +286,7 @@ func TestProcessEvent(t *testing.T) { }) t.Run("failed build, then passed build sends fixed notification", func(t *testing.T) { - server := NewServer(":8080", logger, config.Config{}) + server := NewServer(":8080", logger, config.Config{}, nil) mockNotifyClient := &MockNotificationClient{} server.notifyClient = mockNotifyClient buildNumber := 1234 @@ -312,4 +312,25 @@ func TestProcessEvent(t *testing.T) { // fixed notification require.Equal(t, 2, mockNotifyClient.sendCalled) }) + + t.Run("agent webhooks", func(t *testing.T) { + mockBq := NewMockBigQueryWriter() + server := NewServer(":8080", logger, config.Config{}, mockBq) + + server.processEvent(&build.Event{ + Name: "agent.connected", + Agent: buildkite.Agent{ + ID: pointers.Ptr("QWdlbnQtLS0wMThmNjI4Yy1jY2M0LTRhMmEtOTJjOS1kN2NjODE5MDZiNzc="), + Name: pointers.Ptr("banana-agent-default"), + ConnectedState: pointers.Ptr("connected"), + Hostname: pointers.Ptr("banana-agent-deadbeef"), + IPAddress: pointers.Ptr("10.0.0.5"), + UserAgent: pointers.Ptr("buildkite-agent/4.2.0 (linux; amd64)"), + Version: pointers.Ptr("4.2.0"), + Metadata: []string{"queue=express"}, + }, + }) + + require.Equal(t, 1, len(mockBq.WriteFunc.History())) + }) } diff --git a/mockgen.test.yaml b/mockgen.test.yaml index c4c64ddb008..2748912f644 100644 --- a/mockgen.test.yaml +++ b/mockgen.test.yaml @@ -423,3 +423,8 @@ - service - RepositoryLocker - RepositoryLock +- filename: dev/build-tracker/mocks_test.go + path: github.com/sourcegraph/sourcegraph/dev/build-tracker + package: main + interfaces: + - BigQueryWriter