worker: add SAMS notifications subscriber (#63051)

Part of CORE-92

This PR add a new worker for subscribing to [SAMS notifications](https://www.notion.so/sourcegraph/SAMS-notifications-distribution-system-0d174480e0044b05b545d37d24263d5a). The current use case is to automatically (hard-)delete users on Sourcegraph.com when the corresponding user is deleted from SAMS. 

This worker is only started when running in the Sourcegraph.com mode and the credentials file (`service_account.json`) is provided, which has been configured since https://github.com/sourcegraph/deploy-sourcegraph-cloud/pull/18591.

Co-authored-by: Robert Lin <robert@bobheadxi.dev>
This commit is contained in:
Joe Chen 2024-06-03 18:01:19 -04:00 committed by GitHub
parent eb5d334b03
commit dd8ff6013f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 850 additions and 5 deletions

View File

@ -0,0 +1,65 @@
load("//dev:go_mockgen.bzl", "go_mockgen")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("//dev:go_defs.bzl", "go_test")
go_library(
name = "sourcegraphaccounts",
srcs = [
"notifications_subscriber.go",
"notifications_subscriber_handlers.go",
"notifications_subscriber_store.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/worker/internal/sourcegraphaccounts",
visibility = ["//cmd/worker:__subpackages__"],
deps = [
"//cmd/worker/job",
"//cmd/worker/shared/init/db",
"//internal/conf",
"//internal/database",
"//internal/dotcom",
"//internal/env",
"//internal/extsvc",
"//internal/goroutine",
"//internal/observation",
"//lib/errors",
"//schema",
"@com_github_sourcegraph_log//:log",
"@com_github_sourcegraph_sourcegraph_accounts_sdk_go//:sourcegraph-accounts-sdk-go",
"@com_github_sourcegraph_sourcegraph_accounts_sdk_go//clients/v1:clients",
"@com_github_sourcegraph_sourcegraph_accounts_sdk_go//notifications/v1:notifications",
"@com_github_sourcegraph_sourcegraph_accounts_sdk_go//scopes",
"@com_google_cloud_go_pubsub//:pubsub",
"@org_golang_x_oauth2//google",
],
)
go_test(
name = "sourcegraphaccounts_test",
srcs = [
"mocks_test.go",
"notifications_subscriber_handlers_test.go",
],
embed = [":sourcegraphaccounts"],
deps = [
"//internal/database",
"//internal/extsvc",
"//schema",
"@com_github_derision_test_go_mockgen_v2//testutil/require",
"@com_github_sourcegraph_log//logtest",
"@com_github_sourcegraph_sourcegraph_accounts_sdk_go//:sourcegraph-accounts-sdk-go",
"@com_github_sourcegraph_sourcegraph_accounts_sdk_go//clients/v1:clients",
"@com_github_sourcegraph_sourcegraph_accounts_sdk_go//notifications/v1:notifications",
"@com_github_stretchr_testify//require",
],
)
go_mockgen(
name = "generate_mocks",
out = "mocks_test.go",
manifests = [
"//:mockgen.yaml",
"//:mockgen.test.yaml",
"//:mockgen.temp.yaml",
],
deps = [":sourcegraphaccounts"],
)

View File

@ -0,0 +1,439 @@
// Code generated by go-mockgen 1.3.7; DO NOT EDIT.
//
// This file was generated by running `sg generate` (or `go-mockgen`) at the root of
// this repository. To add additional mocks to this or another package, add a new entry
// to the mockgen.yaml file in the root of this repository.
package sourcegraphaccounts
import (
"context"
"sync"
v1 "github.com/sourcegraph/sourcegraph-accounts-sdk-go/clients/v1"
database "github.com/sourcegraph/sourcegraph/internal/database"
extsvc "github.com/sourcegraph/sourcegraph/internal/extsvc"
)
// MockNotificationsSubscriberStore is a mock implementation of the
// notificationsSubscriberStore interface (from the package
// github.com/sourcegraph/sourcegraph/cmd/worker/internal/sourcegraphaccounts)
// used for unit testing.
type MockNotificationsSubscriberStore struct {
// GetSAMSUserByIDFunc is an instance of a mock function object
// controlling the behavior of the method GetSAMSUserByID.
GetSAMSUserByIDFunc *NotificationsSubscriberStoreGetSAMSUserByIDFunc
// HardDeleteUsersFunc is an instance of a mock function object
// controlling the behavior of the method HardDeleteUsers.
HardDeleteUsersFunc *NotificationsSubscriberStoreHardDeleteUsersFunc
// ListUserExternalAccountsFunc is an instance of a mock function object
// controlling the behavior of the method ListUserExternalAccounts.
ListUserExternalAccountsFunc *NotificationsSubscriberStoreListUserExternalAccountsFunc
}
// NewMockNotificationsSubscriberStore creates a new mock of the
// notificationsSubscriberStore interface. All methods return zero values
// for all results, unless overwritten.
func NewMockNotificationsSubscriberStore() *MockNotificationsSubscriberStore {
return &MockNotificationsSubscriberStore{
GetSAMSUserByIDFunc: &NotificationsSubscriberStoreGetSAMSUserByIDFunc{
defaultHook: func(context.Context, string) (r0 *v1.User, r1 error) {
return
},
},
HardDeleteUsersFunc: &NotificationsSubscriberStoreHardDeleteUsersFunc{
defaultHook: func(context.Context, []int32) (r0 error) {
return
},
},
ListUserExternalAccountsFunc: &NotificationsSubscriberStoreListUserExternalAccountsFunc{
defaultHook: func(context.Context, database.ExternalAccountsListOptions) (r0 []*extsvc.Account, r1 error) {
return
},
},
}
}
// NewStrictMockNotificationsSubscriberStore creates a new mock of the
// notificationsSubscriberStore interface. All methods panic on invocation,
// unless overwritten.
func NewStrictMockNotificationsSubscriberStore() *MockNotificationsSubscriberStore {
return &MockNotificationsSubscriberStore{
GetSAMSUserByIDFunc: &NotificationsSubscriberStoreGetSAMSUserByIDFunc{
defaultHook: func(context.Context, string) (*v1.User, error) {
panic("unexpected invocation of MockNotificationsSubscriberStore.GetSAMSUserByID")
},
},
HardDeleteUsersFunc: &NotificationsSubscriberStoreHardDeleteUsersFunc{
defaultHook: func(context.Context, []int32) error {
panic("unexpected invocation of MockNotificationsSubscriberStore.HardDeleteUsers")
},
},
ListUserExternalAccountsFunc: &NotificationsSubscriberStoreListUserExternalAccountsFunc{
defaultHook: func(context.Context, database.ExternalAccountsListOptions) ([]*extsvc.Account, error) {
panic("unexpected invocation of MockNotificationsSubscriberStore.ListUserExternalAccounts")
},
},
}
}
// surrogateMockNotificationsSubscriberStore is a copy of the
// notificationsSubscriberStore interface (from the package
// github.com/sourcegraph/sourcegraph/cmd/worker/internal/sourcegraphaccounts).
// It is redefined here as it is unexported in the source package.
type surrogateMockNotificationsSubscriberStore interface {
GetSAMSUserByID(context.Context, string) (*v1.User, error)
HardDeleteUsers(context.Context, []int32) error
ListUserExternalAccounts(context.Context, database.ExternalAccountsListOptions) ([]*extsvc.Account, error)
}
// NewMockNotificationsSubscriberStoreFrom creates a new mock of the
// MockNotificationsSubscriberStore interface. All methods delegate to the
// given implementation, unless overwritten.
func NewMockNotificationsSubscriberStoreFrom(i surrogateMockNotificationsSubscriberStore) *MockNotificationsSubscriberStore {
return &MockNotificationsSubscriberStore{
GetSAMSUserByIDFunc: &NotificationsSubscriberStoreGetSAMSUserByIDFunc{
defaultHook: i.GetSAMSUserByID,
},
HardDeleteUsersFunc: &NotificationsSubscriberStoreHardDeleteUsersFunc{
defaultHook: i.HardDeleteUsers,
},
ListUserExternalAccountsFunc: &NotificationsSubscriberStoreListUserExternalAccountsFunc{
defaultHook: i.ListUserExternalAccounts,
},
}
}
// NotificationsSubscriberStoreGetSAMSUserByIDFunc describes the behavior
// when the GetSAMSUserByID method of the parent
// MockNotificationsSubscriberStore instance is invoked.
type NotificationsSubscriberStoreGetSAMSUserByIDFunc struct {
defaultHook func(context.Context, string) (*v1.User, error)
hooks []func(context.Context, string) (*v1.User, error)
history []NotificationsSubscriberStoreGetSAMSUserByIDFuncCall
mutex sync.Mutex
}
// GetSAMSUserByID delegates to the next hook function in the queue and
// stores the parameter and result values of this invocation.
func (m *MockNotificationsSubscriberStore) GetSAMSUserByID(v0 context.Context, v1 string) (*v1.User, error) {
r0, r1 := m.GetSAMSUserByIDFunc.nextHook()(v0, v1)
m.GetSAMSUserByIDFunc.appendCall(NotificationsSubscriberStoreGetSAMSUserByIDFuncCall{v0, v1, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the GetSAMSUserByID
// method of the parent MockNotificationsSubscriberStore instance is invoked
// and the hook queue is empty.
func (f *NotificationsSubscriberStoreGetSAMSUserByIDFunc) SetDefaultHook(hook func(context.Context, string) (*v1.User, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// GetSAMSUserByID method of the parent MockNotificationsSubscriberStore
// instance invokes the hook at the front of the queue and discards it.
// After the queue is empty, the default hook function is invoked for any
// future action.
func (f *NotificationsSubscriberStoreGetSAMSUserByIDFunc) PushHook(hook func(context.Context, string) (*v1.User, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *NotificationsSubscriberStoreGetSAMSUserByIDFunc) SetDefaultReturn(r0 *v1.User, r1 error) {
f.SetDefaultHook(func(context.Context, string) (*v1.User, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *NotificationsSubscriberStoreGetSAMSUserByIDFunc) PushReturn(r0 *v1.User, r1 error) {
f.PushHook(func(context.Context, string) (*v1.User, error) {
return r0, r1
})
}
func (f *NotificationsSubscriberStoreGetSAMSUserByIDFunc) nextHook() func(context.Context, string) (*v1.User, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *NotificationsSubscriberStoreGetSAMSUserByIDFunc) appendCall(r0 NotificationsSubscriberStoreGetSAMSUserByIDFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of
// NotificationsSubscriberStoreGetSAMSUserByIDFuncCall objects describing
// the invocations of this function.
func (f *NotificationsSubscriberStoreGetSAMSUserByIDFunc) History() []NotificationsSubscriberStoreGetSAMSUserByIDFuncCall {
f.mutex.Lock()
history := make([]NotificationsSubscriberStoreGetSAMSUserByIDFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// NotificationsSubscriberStoreGetSAMSUserByIDFuncCall is an object that
// describes an invocation of method GetSAMSUserByID on an instance of
// MockNotificationsSubscriberStore.
type NotificationsSubscriberStoreGetSAMSUserByIDFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 string
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 *v1.User
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c NotificationsSubscriberStoreGetSAMSUserByIDFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c NotificationsSubscriberStoreGetSAMSUserByIDFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// NotificationsSubscriberStoreHardDeleteUsersFunc describes the behavior
// when the HardDeleteUsers method of the parent
// MockNotificationsSubscriberStore instance is invoked.
type NotificationsSubscriberStoreHardDeleteUsersFunc struct {
defaultHook func(context.Context, []int32) error
hooks []func(context.Context, []int32) error
history []NotificationsSubscriberStoreHardDeleteUsersFuncCall
mutex sync.Mutex
}
// HardDeleteUsers delegates to the next hook function in the queue and
// stores the parameter and result values of this invocation.
func (m *MockNotificationsSubscriberStore) HardDeleteUsers(v0 context.Context, v1 []int32) error {
r0 := m.HardDeleteUsersFunc.nextHook()(v0, v1)
m.HardDeleteUsersFunc.appendCall(NotificationsSubscriberStoreHardDeleteUsersFuncCall{v0, v1, r0})
return r0
}
// SetDefaultHook sets function that is called when the HardDeleteUsers
// method of the parent MockNotificationsSubscriberStore instance is invoked
// and the hook queue is empty.
func (f *NotificationsSubscriberStoreHardDeleteUsersFunc) SetDefaultHook(hook func(context.Context, []int32) error) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// HardDeleteUsers method of the parent MockNotificationsSubscriberStore
// instance invokes the hook at the front of the queue and discards it.
// After the queue is empty, the default hook function is invoked for any
// future action.
func (f *NotificationsSubscriberStoreHardDeleteUsersFunc) PushHook(hook func(context.Context, []int32) error) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *NotificationsSubscriberStoreHardDeleteUsersFunc) SetDefaultReturn(r0 error) {
f.SetDefaultHook(func(context.Context, []int32) error {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *NotificationsSubscriberStoreHardDeleteUsersFunc) PushReturn(r0 error) {
f.PushHook(func(context.Context, []int32) error {
return r0
})
}
func (f *NotificationsSubscriberStoreHardDeleteUsersFunc) nextHook() func(context.Context, []int32) error {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *NotificationsSubscriberStoreHardDeleteUsersFunc) appendCall(r0 NotificationsSubscriberStoreHardDeleteUsersFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of
// NotificationsSubscriberStoreHardDeleteUsersFuncCall objects describing
// the invocations of this function.
func (f *NotificationsSubscriberStoreHardDeleteUsersFunc) History() []NotificationsSubscriberStoreHardDeleteUsersFuncCall {
f.mutex.Lock()
history := make([]NotificationsSubscriberStoreHardDeleteUsersFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// NotificationsSubscriberStoreHardDeleteUsersFuncCall is an object that
// describes an invocation of method HardDeleteUsers on an instance of
// MockNotificationsSubscriberStore.
type NotificationsSubscriberStoreHardDeleteUsersFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 []int32
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c NotificationsSubscriberStoreHardDeleteUsersFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c NotificationsSubscriberStoreHardDeleteUsersFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// NotificationsSubscriberStoreListUserExternalAccountsFunc describes the
// behavior when the ListUserExternalAccounts method of the parent
// MockNotificationsSubscriberStore instance is invoked.
type NotificationsSubscriberStoreListUserExternalAccountsFunc struct {
defaultHook func(context.Context, database.ExternalAccountsListOptions) ([]*extsvc.Account, error)
hooks []func(context.Context, database.ExternalAccountsListOptions) ([]*extsvc.Account, error)
history []NotificationsSubscriberStoreListUserExternalAccountsFuncCall
mutex sync.Mutex
}
// ListUserExternalAccounts delegates to the next hook function in the queue
// and stores the parameter and result values of this invocation.
func (m *MockNotificationsSubscriberStore) ListUserExternalAccounts(v0 context.Context, v1 database.ExternalAccountsListOptions) ([]*extsvc.Account, error) {
r0, r1 := m.ListUserExternalAccountsFunc.nextHook()(v0, v1)
m.ListUserExternalAccountsFunc.appendCall(NotificationsSubscriberStoreListUserExternalAccountsFuncCall{v0, v1, r0, r1})
return r0, r1
}
// SetDefaultHook sets function that is called when the
// ListUserExternalAccounts method of the parent
// MockNotificationsSubscriberStore instance is invoked and the hook queue
// is empty.
func (f *NotificationsSubscriberStoreListUserExternalAccountsFunc) SetDefaultHook(hook func(context.Context, database.ExternalAccountsListOptions) ([]*extsvc.Account, error)) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// ListUserExternalAccounts method of the parent
// MockNotificationsSubscriberStore instance invokes the hook at the front
// of the queue and discards it. After the queue is empty, the default hook
// function is invoked for any future action.
func (f *NotificationsSubscriberStoreListUserExternalAccountsFunc) PushHook(hook func(context.Context, database.ExternalAccountsListOptions) ([]*extsvc.Account, error)) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *NotificationsSubscriberStoreListUserExternalAccountsFunc) SetDefaultReturn(r0 []*extsvc.Account, r1 error) {
f.SetDefaultHook(func(context.Context, database.ExternalAccountsListOptions) ([]*extsvc.Account, error) {
return r0, r1
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *NotificationsSubscriberStoreListUserExternalAccountsFunc) PushReturn(r0 []*extsvc.Account, r1 error) {
f.PushHook(func(context.Context, database.ExternalAccountsListOptions) ([]*extsvc.Account, error) {
return r0, r1
})
}
func (f *NotificationsSubscriberStoreListUserExternalAccountsFunc) nextHook() func(context.Context, database.ExternalAccountsListOptions) ([]*extsvc.Account, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *NotificationsSubscriberStoreListUserExternalAccountsFunc) appendCall(r0 NotificationsSubscriberStoreListUserExternalAccountsFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of
// NotificationsSubscriberStoreListUserExternalAccountsFuncCall objects
// describing the invocations of this function.
func (f *NotificationsSubscriberStoreListUserExternalAccountsFunc) History() []NotificationsSubscriberStoreListUserExternalAccountsFuncCall {
f.mutex.Lock()
history := make([]NotificationsSubscriberStoreListUserExternalAccountsFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// NotificationsSubscriberStoreListUserExternalAccountsFuncCall is an object
// that describes an invocation of method ListUserExternalAccounts on an
// instance of MockNotificationsSubscriberStore.
type NotificationsSubscriberStoreListUserExternalAccountsFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 database.ExternalAccountsListOptions
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 []*extsvc.Account
// Result1 is the value of the 2nd result returned from this method
// invocation.
Result1 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c NotificationsSubscriberStoreListUserExternalAccountsFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c NotificationsSubscriberStoreListUserExternalAccountsFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}

View File

@ -0,0 +1,150 @@
package sourcegraphaccounts
import (
"context"
"os"
"strings"
"cloud.google.com/go/pubsub"
"github.com/sourcegraph/log"
sams "github.com/sourcegraph/sourcegraph-accounts-sdk-go"
notificationsv1 "github.com/sourcegraph/sourcegraph-accounts-sdk-go/notifications/v1"
"github.com/sourcegraph/sourcegraph-accounts-sdk-go/scopes"
"golang.org/x/oauth2/google"
"github.com/sourcegraph/sourcegraph/cmd/worker/job"
workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/dotcom"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/schema"
)
var _ job.Job = (*notificationsSubscriber)(nil)
// notificationsSubscriber is a worker responsible for receiving notifications
// from Sourcegraph Accounts.
type notificationsSubscriber struct {
config *notificationsSubscriberConfig
}
func NewNotificationsSubscriber() job.Job {
return &notificationsSubscriber{
config: &notificationsSubscriberConfig{},
}
}
func (s *notificationsSubscriber) Description() string {
return "Receives notifications from Sourcegraph Accounts."
}
func (s *notificationsSubscriber) Config() []env.Config {
return []env.Config{s.config}
}
func (s *notificationsSubscriber) Routines(ctx context.Context, observationCtx *observation.Context) ([]goroutine.BackgroundRoutine, error) {
if !dotcom.SourcegraphDotComMode() {
return nil, nil // Not relevant
}
logger := observationCtx.Logger
if s.config.GCP.CredentialsFile == "" {
logger.Info("worker disabled because SOURCEGRAPH_ACCOUNTS_CREDENTIALS_FILE is not set")
return nil, nil
}
// NOTE: Theoretically, we could have multiple SAMS providers configured, but in
// practice, we should ever only have one in production. Otherwise, we might
// have a bigger problem to address than just this worker.
var samsProvider *schema.OpenIDConnectAuthProvider
authProviders := conf.Get().SiteConfig().AuthProviders
for _, p := range authProviders {
if p.Openidconnect != nil && strings.HasPrefix(p.Openidconnect.ClientID, "sams_cid_") {
samsProvider = p.Openidconnect
break
}
}
if samsProvider == nil {
logger.Info("worker disabled because SAMS provider is not configured")
return nil, nil
}
logger.Info("worker enabled",
log.String("samsProvider.Issuer", samsProvider.Issuer),
log.String("samsProvider.ClientID", samsProvider.ClientID),
)
connConfig := sams.ConnConfig{
ExternalURL: samsProvider.Issuer,
}
samsClient, err := sams.NewClientV1(sams.ClientV1Config{
ConnConfig: connConfig,
TokenSource: sams.ClientCredentialsTokenSource(
connConfig,
samsProvider.ClientID,
samsProvider.ClientSecret,
[]scopes.Scope{
scopes.OpenID,
scopes.Profile,
scopes.Email,
},
),
})
if err != nil {
return nil, errors.Wrap(err, "create SAMS client")
}
db, err := workerdb.InitDB(observationCtx)
if err != nil {
return nil, errors.Wrap(err, "init DB")
}
store := newNotificationsSubscriberStore(samsClient, db)
handlers := newNotificationsSubscriberHandlers(logger, store, samsProvider)
credentialsJSON, err := os.ReadFile(s.config.GCP.CredentialsFile)
if err != nil {
return nil, errors.Wrap(err, "read GCP credentials file")
}
credentials, err := google.CredentialsFromJSON(ctx, credentialsJSON, pubsub.ScopePubSub)
if err != nil {
return nil, errors.Wrap(err, "parse GCP credentials JSON")
}
subscriber, err := sams.NewNotificationsV1Subscriber(
logger,
notificationsv1.SubscriberOptions{
ProjectID: s.config.GCP.ProjectID,
SubscriptionID: s.config.GCP.SubscriptionID,
ReceiveSettings: notificationsv1.DefaultReceiveSettings,
Handlers: notificationsv1.SubscriberHandlers{
OnUserDeleted: handlers.onUserDeleted(),
},
Credentials: credentials,
},
)
if err != nil {
return nil, errors.Wrap(err, "create notifications subscriber")
}
return []goroutine.BackgroundRoutine{
subscriber,
}, nil
}
type notificationsSubscriberConfig struct {
env.BaseConfig
GCP struct {
CredentialsFile string
ProjectID string
SubscriptionID string
}
}
func (c *notificationsSubscriberConfig) Load() {
c.GCP.CredentialsFile = c.Get("SOURCEGRAPH_ACCOUNTS_CREDENTIALS_FILE", "", "Path to the Google Cloud credentials file")
c.GCP.ProjectID = c.Get("SOURCEGRAPH_ACCOUNTS_NOTIFICATIONS_PROJECT", "sourcegraph-dev", "The GCP project that the service is running in")
c.GCP.SubscriptionID = c.Get("SOURCEGRAPH_ACCOUNTS_NOTIFICATIONS_SUBSCRIPTION", "sams-notifications", "GCP Pub/Sub subscription ID to receive SAMS notifications from")
}

View File

@ -0,0 +1,75 @@
package sourcegraphaccounts
import (
"context"
"github.com/sourcegraph/log"
sams "github.com/sourcegraph/sourcegraph-accounts-sdk-go"
notificationsv1 "github.com/sourcegraph/sourcegraph-accounts-sdk-go/notifications/v1"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/schema"
)
type notificationsSubscriberHandlers struct {
logger log.Logger
store notificationsSubscriberStore
samsProvider *schema.OpenIDConnectAuthProvider
}
func newNotificationsSubscriberHandlers(
logger log.Logger,
store notificationsSubscriberStore,
samsProvider *schema.OpenIDConnectAuthProvider,
) *notificationsSubscriberHandlers {
return &notificationsSubscriberHandlers{
logger: logger,
store: store,
samsProvider: samsProvider,
}
}
func (hs *notificationsSubscriberHandlers) onUserDeleted() func(ctx context.Context, data *notificationsv1.UserDeletedData) error {
return func(ctx context.Context, data *notificationsv1.UserDeletedData) error {
// Double check with SAMS that the user is deleted
_, err := hs.store.GetSAMSUserByID(ctx, data.AccountID)
if err == nil {
hs.logger.Error("Received user deleted notification for a user still exists", log.String("accountID", data.AccountID))
return nil
} else if !errors.Is(err, sams.ErrNotFound) {
return errors.Wrap(err, "get user by ID")
}
// NOTE: Do not query with "ClientID" nor limit to 1 record because they are
// irrelevant in this context as long as the SAMS instance matches
// ("ServiceID"). No matter how many times we have rotated the client ID, we
// should still delete all the user records with the matching account ID.
extAccts, err := hs.store.ListUserExternalAccounts(
ctx,
database.ExternalAccountsListOptions{
ServiceType: hs.samsProvider.Type,
ServiceID: hs.samsProvider.Issuer,
AccountID: data.AccountID,
},
)
if err != nil {
return errors.Wrap(err, "list user external accounts")
}
userIDs := make([]int32, 0, len(extAccts))
for _, extAcct := range extAccts {
userIDs = append(userIDs, extAcct.UserID)
}
err = hs.store.HardDeleteUsers(ctx, userIDs)
if err != nil {
return errors.Wrap(err, "hard-delete users")
}
hs.logger.Debug("User hard-deleted",
log.String("accountID", data.AccountID),
log.Int("recordsCount", len(userIDs)),
)
return nil
}
}

View File

@ -0,0 +1,55 @@
package sourcegraphaccounts
import (
"context"
"strings"
"testing"
mockrequire "github.com/derision-test/go-mockgen/v2/testutil/require"
"github.com/sourcegraph/log/logtest"
sams "github.com/sourcegraph/sourcegraph-accounts-sdk-go"
clientsv1 "github.com/sourcegraph/sourcegraph-accounts-sdk-go/clients/v1"
notificationsv1 "github.com/sourcegraph/sourcegraph-accounts-sdk-go/notifications/v1"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/internal/extsvc"
"github.com/sourcegraph/sourcegraph/schema"
)
func TestNotificationsSubscriberHandlers_onUserDeleted(t *testing.T) {
ctx := context.Background()
samsProvider := &schema.OpenIDConnectAuthProvider{}
t.Run("user still exists", func(t *testing.T) {
logger, exportLogs := logtest.Captured(t)
store := NewMockNotificationsSubscriberStore()
store.GetSAMSUserByIDFunc.SetDefaultReturn(&clientsv1.User{}, nil)
hs := newNotificationsSubscriberHandlers(logger, store, samsProvider)
err := hs.onUserDeleted()(ctx, &notificationsv1.UserDeletedData{AccountID: "018d21f2-14d7-7c3b-8714-10cdd32dd1ab"})
require.NoError(t, err)
foundLog := false
for _, log := range exportLogs() {
if strings.Contains(log.Message, "deleted notification for a user still exists") {
foundLog = true
break
}
}
require.True(t, foundLog)
})
t.Run("perform user deletion", func(t *testing.T) {
store := NewMockNotificationsSubscriberStore()
store.GetSAMSUserByIDFunc.SetDefaultReturn(nil, sams.ErrNotFound)
store.ListUserExternalAccountsFunc.SetDefaultReturn(
[]*extsvc.Account{
{UserID: 1},
},
nil,
)
hs := newNotificationsSubscriberHandlers(logtest.NoOp(t), store, samsProvider)
err := hs.onUserDeleted()(ctx, &notificationsv1.UserDeletedData{AccountID: "018d21f2-14d7-7c3b-8714-10cdd32dd1ab"})
require.NoError(t, err)
mockrequire.Called(t, store.ListUserExternalAccountsFunc)
mockrequire.Called(t, store.HardDeleteUsersFunc)
})
}

View File

@ -0,0 +1,52 @@
package sourcegraphaccounts
import (
"context"
sams "github.com/sourcegraph/sourcegraph-accounts-sdk-go"
clientsv1 "github.com/sourcegraph/sourcegraph-accounts-sdk-go/clients/v1"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/extsvc"
)
// notificationsSubscriberStore is the data layer carrier for notifications
// subscriber. This interface is meant to abstract away and limit the exposure
// of the underlying data layer to the handler through a thin-wrapper.
type notificationsSubscriberStore interface {
// GetSAMSUserByID returns the SAMS user with the given ID. It returns
// sams.ErrNotFound if no such user exists.
//
// Required scope: profile
GetSAMSUserByID(ctx context.Context, accountID string) (*clientsv1.User, error)
// ListUserExternalAccounts returns the external accounts satisfying the given
// options.
ListUserExternalAccounts(ctx context.Context, opts database.ExternalAccountsListOptions) ([]*extsvc.Account, error)
// HardDeleteUsers hard-deletes the users with the given IDs.
HardDeleteUsers(ctx context.Context, userIDs []int32) error
}
type notificationsSubscriberStoreImpl struct {
samsClient *sams.ClientV1
db database.DB
}
func newNotificationsSubscriberStore(samsClient *sams.ClientV1, db database.DB) notificationsSubscriberStore {
return &notificationsSubscriberStoreImpl{
samsClient: samsClient,
db: db,
}
}
func (s *notificationsSubscriberStoreImpl) GetSAMSUserByID(ctx context.Context, accountID string) (*clientsv1.User, error) {
return s.samsClient.Users().GetUserByID(ctx, accountID)
}
func (s *notificationsSubscriberStoreImpl) ListUserExternalAccounts(ctx context.Context, opts database.ExternalAccountsListOptions) ([]*extsvc.Account, error) {
return s.db.UserExternalAccounts().List(ctx, opts)
}
func (s *notificationsSubscriberStoreImpl) HardDeleteUsers(ctx context.Context, userIDs []int32) error {
return s.db.Users().HardDeleteList(ctx, userIDs)
}

View File

@ -32,6 +32,7 @@ go_library(
"//cmd/worker/internal/ratelimit",
"//cmd/worker/internal/repostatistics",
"//cmd/worker/internal/search",
"//cmd/worker/internal/sourcegraphaccounts",
"//cmd/worker/internal/telemetry",
"//cmd/worker/internal/telemetrygatewayexporter",
"//cmd/worker/internal/webhooks",

View File

@ -33,6 +33,7 @@ import (
"github.com/sourcegraph/sourcegraph/cmd/worker/internal/ratelimit"
"github.com/sourcegraph/sourcegraph/cmd/worker/internal/repostatistics"
"github.com/sourcegraph/sourcegraph/cmd/worker/internal/search"
"github.com/sourcegraph/sourcegraph/cmd/worker/internal/sourcegraphaccounts"
"github.com/sourcegraph/sourcegraph/cmd/worker/internal/telemetry"
"github.com/sourcegraph/sourcegraph/cmd/worker/internal/telemetrygatewayexporter"
"github.com/sourcegraph/sourcegraph/cmd/worker/internal/webhooks"
@ -132,6 +133,8 @@ func LoadConfig(registerEnterpriseMigrators oobmigration.RegisterMigratorsFunc)
"repo-perms-syncer": workerauthz.NewPermsSyncerJob(),
"perforce-changelist-mapper": perforce.NewPerforceChangelistMappingJob(),
"sourcegraph-accounts-notifications-subscriber": sourcegraphaccounts.NewNotificationsSubscriber(),
}
var config Config

View File

@ -5707,8 +5707,8 @@ def go_dependencies():
],
build_file_proto_mode = "disable_global",
importpath = "github.com/sourcegraph/sourcegraph-accounts-sdk-go",
sum = "h1:55o/Oo+gFRmE5tmFod6M/koth7RFtgRxfApjBxxtORI=",
version = "v0.0.0-20240524154739-87189364d07f",
sum = "h1:+7J5NMA9FJDaf0IhNpIcTEg+Gzu/GN5dRT40wdFU10I=",
version = "v0.0.0-20240531163352-fe74c17cf0d1",
)
go_repository(
name = "com_github_sourcegraph_zoekt",

View File

@ -123,5 +123,6 @@ write_source_files(
"//cmd/gitserver/internal/gitserverfs:generate_mocks",
"//cmd/repo-updater/internal/gitserver:generate_mocks",
"//dev/build-tracker:generate_mocks",
"//cmd/worker/internal/sourcegraphaccounts:generate_mocks",
],
)

2
go.mod
View File

@ -301,7 +301,7 @@ require (
github.com/sourcegraph/managed-services-platform-cdktf/gen/tfe v0.0.0-20240513203650-e2b1273f1c1a
github.com/sourcegraph/notionreposync v0.0.0-20240510213306-87052870048d
github.com/sourcegraph/scip v0.3.3
github.com/sourcegraph/sourcegraph-accounts-sdk-go v0.0.0-20240524154739-87189364d07f
github.com/sourcegraph/sourcegraph-accounts-sdk-go v0.0.0-20240531163352-fe74c17cf0d1
github.com/sourcegraph/sourcegraph/lib v0.0.0-20240524140455-2589fef13ea8
github.com/sourcegraph/sourcegraph/lib/managedservicesplatform v0.0.0-00010101000000-000000000000
github.com/sourcegraph/sourcegraph/monitoring v0.0.0-00010101000000-000000000000

4
go.sum
View File

@ -1749,8 +1749,8 @@ github.com/sourcegraph/run v0.12.0 h1:3A8w5e8HIYPfafHekvmdmmh42RHKGVhmiTZAPJclg7
github.com/sourcegraph/run v0.12.0/go.mod h1:PwaP936BTnAJC1cqR5rSbG5kOs/EWStTK3lqvMX5GUA=
github.com/sourcegraph/scip v0.3.3 h1:3EOkChYOntwHl0pPSAju7rj0oRuujh8owC4vjGDEr0s=
github.com/sourcegraph/scip v0.3.3/go.mod h1:Q67VaoTpftINIy/CLrkYQOMwlsx67h8ys+ligmdUcqM=
github.com/sourcegraph/sourcegraph-accounts-sdk-go v0.0.0-20240524154739-87189364d07f h1:55o/Oo+gFRmE5tmFod6M/koth7RFtgRxfApjBxxtORI=
github.com/sourcegraph/sourcegraph-accounts-sdk-go v0.0.0-20240524154739-87189364d07f/go.mod h1:+yFgPzr01Ks+pcvFlStxKtUp4Wq//BqQEJDnZczD3h0=
github.com/sourcegraph/sourcegraph-accounts-sdk-go v0.0.0-20240531163352-fe74c17cf0d1 h1:+7J5NMA9FJDaf0IhNpIcTEg+Gzu/GN5dRT40wdFU10I=
github.com/sourcegraph/sourcegraph-accounts-sdk-go v0.0.0-20240531163352-fe74c17cf0d1/go.mod h1:/MWl0sFvn6w26Y067CkEJgklfxx8gCzbEJ3q6cBzDro=
github.com/sourcegraph/yaml v1.0.1-0.20200714132230-56936252f152 h1:z/MpntplPaW6QW95pzcAR/72Z5TWDyDnSo0EOcyij9o=
github.com/sourcegraph/yaml v1.0.1-0.20200714132230-56936252f152/go.mod h1:GIjDIg/heH5DOkXY3YJ/wNhfHsQHoXGjl8G8amsYQ1I=
github.com/sourcegraph/zoekt v0.0.0-20240528215134-640102a4a30e h1:p2PMQkXe01gWBLgYqpQKnbg6OzR62/Py1CK2YDbaRCU=

View File

@ -347,6 +347,10 @@
path: github.com/sourcegraph/sourcegraph/cmd/worker/internal/telemetry
interfaces:
- bookmarkStore
- filename: cmd/worker/internal/sourcegraphaccounts/mocks_test.go
path: github.com/sourcegraph/sourcegraph/cmd/worker/internal/sourcegraphaccounts
interfaces:
- notificationsSubscriberStore
- filename: internal/codeintel/ranking/mocks_test.go
sources:
- path: github.com/sourcegraph/sourcegraph/internal/codeintel/ranking/internal/store