mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 19:21:50 +00:00
gitserver: Introduce pool_repo_id and make AddrForRepo stateful (#55001)
Part of #54741. ## What At a high level this commit does three things: 1. Add a `repositories: { deduplicateForks: [] }` property to the site config and add a thread safe cache in gitserver and repo-updater to maintain (and update with conf.Watch) a map of the deduplicateForks list because iterating over a list is inefficent 2. Add a column `pool_repo_id (int, null)` to `gitserver_repos` table and adds API methods to update the `pool_repo_id` for a given repo (`UpdatePoolRepoID`) and retrieve the pool repo for a given repo (`GetPoolRepo`) 3 Updates the `AddrForRepo` method in gitserver to: 1. Take into account the site config added in step 1 and the `pool_repo_id` added in step 2 above 2. Cache the address of a repo for 15 mins at a time if it involved a DB lookup This means that `AddrForRepo` is no longer stateless and in the worst case scenario will make 2 DB calls. ## Why We want to be able to support deduplication of git objects and this is the underlying site config and DB schema changes that is required for this feature. ## What changes Ideally nothing should change with this commit because the updated code path for `AddrForRepo` will only come into effect if their is at least one entry in the site config. We will test this initially in dogfood and scaletesting. Additionally, this commit does not change any cloning / fetch related behaviour so even adding entries to the site config property will not change anything specific to how the repos are cloned and stored at the moment. Co-authored-by: Thorsten Ball <mrnugget@gmail.com> Co-authored-by: Alex Ostrikov <alex.ostrikov@sourcegraph.com> Co-authored-by: Erik Seliger <erikseliger@me.com>
This commit is contained in:
parent
bd83e139e0
commit
4049ecb98c
@ -1,6 +1,7 @@
|
||||
package httpapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@ -106,7 +107,7 @@ func serveSendEmail(_ http.ResponseWriter, r *http.Request) error {
|
||||
// gitserver for the repo.
|
||||
type gitServiceHandler struct {
|
||||
Gitserver interface {
|
||||
AddrForRepo(api.RepoName) string
|
||||
AddrForRepo(context.Context, api.RepoName) string
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,7 +126,7 @@ func (s *gitServiceHandler) serveGitUploadPack() func(http.ResponseWriter, *http
|
||||
func (s *gitServiceHandler) redirectToGitServer(w http.ResponseWriter, r *http.Request, gitPath string) error {
|
||||
repo := mux.Vars(r)["RepoName"]
|
||||
|
||||
addrForRepo := s.Gitserver.AddrForRepo(api.RepoName(repo))
|
||||
addrForRepo := s.Gitserver.AddrForRepo(r.Context(), api.RepoName(repo))
|
||||
u := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: addrForRepo,
|
||||
|
||||
@ -59,7 +59,7 @@ func TestGitServiceHandlers(t *testing.T) {
|
||||
|
||||
type mockAddrForRepo struct{}
|
||||
|
||||
func (mockAddrForRepo) AddrForRepo(name api.RepoName) string {
|
||||
func (mockAddrForRepo) AddrForRepo(ctx context.Context, name api.RepoName) string {
|
||||
return strings.ReplaceAll(string(name), "/", ".") + ".gitserver"
|
||||
}
|
||||
|
||||
|
||||
@ -226,7 +226,7 @@ func (s *Server) cleanupRepos(ctx context.Context, gitServerAddrs gitserver.Gits
|
||||
|
||||
// Record the number and disk usage used of repos that should
|
||||
// not belong on this instance and remove up to SRC_WRONG_SHARD_DELETE_LIMIT in a single Janitor run.
|
||||
addr := s.addrForRepo(name, gitServerAddrs)
|
||||
addr := s.addrForRepo(ctx, name, gitServerAddrs)
|
||||
|
||||
if !s.hostnameMatch(addr) {
|
||||
wrongShardRepoCount++
|
||||
|
||||
@ -297,6 +297,11 @@ type Server struct {
|
||||
|
||||
// Perforce is a plugin-like service attached to Server for all things Perforce.
|
||||
Perforce *perforce.Service
|
||||
|
||||
// DeduplicatedForksSet is a set of all repos added to the deduplicateForks site config
|
||||
// property. It exists only to aid in fast lookups instead of having to iterate through the list
|
||||
// each time.
|
||||
DeduplicatedForksSet *types.RepoURISet
|
||||
}
|
||||
|
||||
type locks struct {
|
||||
@ -507,7 +512,7 @@ func (s *Server) Janitor(ctx context.Context, interval time.Duration) {
|
||||
}
|
||||
|
||||
for {
|
||||
gitserverAddrs := gitserver.NewGitserverAddressesFromConf(conf.Get())
|
||||
gitserverAddrs := gitserver.NewGitserverAddresses(s.DB, conf.Get())
|
||||
s.cleanupRepos(actor.WithInternalActor(ctx), gitserverAddrs)
|
||||
time.Sleep(interval)
|
||||
}
|
||||
@ -521,7 +526,7 @@ func (s *Server) SyncRepoState(interval time.Duration, batchSize, perSecond int)
|
||||
var previousAddrs string
|
||||
var previousPinned string
|
||||
for {
|
||||
gitServerAddrs := gitserver.NewGitserverAddressesFromConf(conf.Get())
|
||||
gitServerAddrs := gitserver.NewGitserverAddresses(s.DB, conf.Get())
|
||||
addrs := gitServerAddrs.Addresses
|
||||
// We turn addrs into a string here for easy comparison and storage of previous
|
||||
// addresses since we'd need to take a copy of the slice anyway.
|
||||
@ -548,8 +553,8 @@ func (s *Server) SyncRepoState(interval time.Duration, batchSize, perSecond int)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) addrForRepo(repoName api.RepoName, gitServerAddrs gitserver.GitserverAddresses) string {
|
||||
return gitServerAddrs.AddrForRepo(filepath.Base(os.Args[0]), repoName)
|
||||
func (s *Server) addrForRepo(ctx context.Context, repoName api.RepoName, gitServerAddrs gitserver.GitserverAddresses) string {
|
||||
return gitServerAddrs.AddrForRepo(ctx, s.Logger, filepath.Base(os.Args[0]), repoName)
|
||||
}
|
||||
|
||||
// StartClonePipeline clones repos asynchronously. It creates a producer-consumer
|
||||
@ -760,7 +765,7 @@ func (s *Server) syncRepoState(gitServerAddrs gitserver.GitserverAddresses, batc
|
||||
repo.Name = api.UndeletedRepoName(repo.Name)
|
||||
|
||||
// Ensure we're only dealing with repos we are responsible for.
|
||||
addr := s.addrForRepo(repo.Name, gitServerAddrs)
|
||||
addr := s.addrForRepo(ctx, repo.Name, gitServerAddrs)
|
||||
if !s.hostnameMatch(addr) {
|
||||
repoSyncStateCounter.WithLabelValues("other_shard").Inc()
|
||||
continue
|
||||
|
||||
@ -727,6 +727,7 @@ func makeTestServer(ctx context.Context, t *testing.T, repoDir, remote string, d
|
||||
rpsLimiter: ratelimit.NewInstrumentedLimiter("GitserverTest", rate.NewLimiter(rate.Inf, 10)),
|
||||
RecordingCommandFactory: wrexec.NewRecordingCommandFactory(nil, 0),
|
||||
Perforce: perforce.NewService(ctx, obctx, logger, db, list.New()),
|
||||
DeduplicatedForksSet: types.NewRepoURICache(nil),
|
||||
}
|
||||
|
||||
s.StartClonePipeline(ctx)
|
||||
|
||||
@ -46,6 +46,7 @@ go_library(
|
||||
"//internal/requestclient",
|
||||
"//internal/service",
|
||||
"//internal/trace",
|
||||
"//internal/types",
|
||||
"//internal/wrexec",
|
||||
"//lib/errors",
|
||||
"//schema",
|
||||
|
||||
@ -55,6 +55,7 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/internal/requestclient"
|
||||
"github.com/sourcegraph/sourcegraph/internal/service"
|
||||
"github.com/sourcegraph/sourcegraph/internal/trace"
|
||||
"github.com/sourcegraph/sourcegraph/internal/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/wrexec"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
"github.com/sourcegraph/sourcegraph/schema"
|
||||
@ -139,17 +140,6 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
|
||||
}
|
||||
|
||||
recordingCommandFactory := wrexec.NewRecordingCommandFactory(nil, 0)
|
||||
conf.Watch(func() {
|
||||
// We update the factory with a predicate func. Each subsequent recordable command will use this predicate
|
||||
// to determine whether a command should be recorded or not.
|
||||
recordingConf := conf.Get().SiteConfig().GitRecorder
|
||||
if recordingConf == nil {
|
||||
recordingCommandFactory.Disable()
|
||||
return
|
||||
}
|
||||
recordingCommandFactory.Update(recordCommandsOnRepos(recordingConf.Repos, recordingConf.IgnoredGitCommands), recordingConf.Size)
|
||||
})
|
||||
|
||||
gitserver := server.Server{
|
||||
Logger: logger,
|
||||
ObservationCtx: observationCtx,
|
||||
@ -175,8 +165,22 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
|
||||
GlobalBatchLogSemaphore: semaphore.NewWeighted(int64(batchLogGlobalConcurrencyLimit)),
|
||||
Perforce: perforce.NewService(ctx, observationCtx, logger, db, list.New()),
|
||||
RecordingCommandFactory: recordingCommandFactory,
|
||||
DeduplicatedForksSet: types.NewRepoURICache(conf.GetDeduplicatedForksIndex()),
|
||||
}
|
||||
|
||||
conf.Watch(func() {
|
||||
gitserver.DeduplicatedForksSet.Overwrite(conf.GetDeduplicatedForksIndex())
|
||||
|
||||
// We update the factory with a predicate func. Each subsequent recordable command will use this predicate
|
||||
// to determine whether a command should be recorded or not.
|
||||
recordingConf := conf.Get().SiteConfig().GitRecorder
|
||||
if recordingConf == nil {
|
||||
recordingCommandFactory.Disable()
|
||||
return
|
||||
}
|
||||
recordingCommandFactory.Update(recordCommandsOnRepos(recordingConf.Repos, recordingConf.IgnoredGitCommands), recordingConf.Size)
|
||||
})
|
||||
|
||||
configurationWatcher := conf.DefaultClient()
|
||||
|
||||
var additionalServerOptions []grpc.ServerOption
|
||||
@ -672,5 +676,4 @@ func recordCommandsOnRepos(repos []string, ignoredGitCommands []string) wrexec.S
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -146,11 +146,16 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
|
||||
Store: store,
|
||||
// We always want to listen on the Synced channel since external service syncing
|
||||
// happens on both Cloud and non Cloud instances.
|
||||
Synced: make(chan repos.Diff),
|
||||
Now: clock,
|
||||
ObsvCtx: observation.ContextWithLogger(logger.Scoped("syncer", "repo syncer"), observationCtx),
|
||||
Synced: make(chan repos.Diff),
|
||||
Now: clock,
|
||||
ObsvCtx: observation.ContextWithLogger(logger.Scoped("syncer", "repo syncer"), observationCtx),
|
||||
DeduplicatedForksSet: types.NewRepoURICache(conf.GetDeduplicatedForksIndex()),
|
||||
}
|
||||
|
||||
conf.Watch(func() {
|
||||
syncer.DeduplicatedForksSet.Overwrite(conf.GetDeduplicatedForksIndex())
|
||||
})
|
||||
|
||||
server.Syncer = syncer
|
||||
|
||||
// All dependencies ready
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package executorqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
@ -17,7 +18,7 @@ import (
|
||||
|
||||
type GitserverClient interface {
|
||||
// AddrForRepo returns the gitserver address to use for the given repo name.
|
||||
AddrForRepo(api.RepoName) string
|
||||
AddrForRepo(context.Context, api.RepoName) string
|
||||
}
|
||||
|
||||
// gitserverProxy creates an HTTP handler that will proxy requests to the correct
|
||||
@ -26,7 +27,7 @@ func gitserverProxy(logger log.Logger, gitserverClient GitserverClient, gitPath
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
repo := getRepoName(r)
|
||||
|
||||
addrForRepo := gitserverClient.AddrForRepo(api.RepoName(repo))
|
||||
addrForRepo := gitserverClient.AddrForRepo(r.Context(), api.RepoName(repo))
|
||||
|
||||
p := httputil.ReverseProxy{
|
||||
Director: func(r *http.Request) {
|
||||
|
||||
@ -7,6 +7,7 @@
|
||||
package executorqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
api "github.com/sourcegraph/sourcegraph/internal/api"
|
||||
@ -28,7 +29,7 @@ type MockGitserverClient struct {
|
||||
func NewMockGitserverClient() *MockGitserverClient {
|
||||
return &MockGitserverClient{
|
||||
AddrForRepoFunc: &GitserverClientAddrForRepoFunc{
|
||||
defaultHook: func(api.RepoName) (r0 string) {
|
||||
defaultHook: func(context.Context, api.RepoName) (r0 string) {
|
||||
return
|
||||
},
|
||||
},
|
||||
@ -40,7 +41,7 @@ func NewMockGitserverClient() *MockGitserverClient {
|
||||
func NewStrictMockGitserverClient() *MockGitserverClient {
|
||||
return &MockGitserverClient{
|
||||
AddrForRepoFunc: &GitserverClientAddrForRepoFunc{
|
||||
defaultHook: func(api.RepoName) string {
|
||||
defaultHook: func(context.Context, api.RepoName) string {
|
||||
panic("unexpected invocation of MockGitserverClient.AddrForRepo")
|
||||
},
|
||||
},
|
||||
@ -61,24 +62,24 @@ func NewMockGitserverClientFrom(i GitserverClient) *MockGitserverClient {
|
||||
// GitserverClientAddrForRepoFunc describes the behavior when the
|
||||
// AddrForRepo method of the parent MockGitserverClient instance is invoked.
|
||||
type GitserverClientAddrForRepoFunc struct {
|
||||
defaultHook func(api.RepoName) string
|
||||
hooks []func(api.RepoName) string
|
||||
defaultHook func(context.Context, api.RepoName) string
|
||||
hooks []func(context.Context, api.RepoName) string
|
||||
history []GitserverClientAddrForRepoFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// AddrForRepo delegates to the next hook function in the queue and stores
|
||||
// the parameter and result values of this invocation.
|
||||
func (m *MockGitserverClient) AddrForRepo(v0 api.RepoName) string {
|
||||
r0 := m.AddrForRepoFunc.nextHook()(v0)
|
||||
m.AddrForRepoFunc.appendCall(GitserverClientAddrForRepoFuncCall{v0, r0})
|
||||
func (m *MockGitserverClient) AddrForRepo(v0 context.Context, v1 api.RepoName) string {
|
||||
r0 := m.AddrForRepoFunc.nextHook()(v0, v1)
|
||||
m.AddrForRepoFunc.appendCall(GitserverClientAddrForRepoFuncCall{v0, v1, r0})
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the AddrForRepo method
|
||||
// of the parent MockGitserverClient instance is invoked and the hook queue
|
||||
// is empty.
|
||||
func (f *GitserverClientAddrForRepoFunc) SetDefaultHook(hook func(api.RepoName) string) {
|
||||
func (f *GitserverClientAddrForRepoFunc) SetDefaultHook(hook func(context.Context, api.RepoName) string) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
@ -86,7 +87,7 @@ func (f *GitserverClientAddrForRepoFunc) SetDefaultHook(hook func(api.RepoName)
|
||||
// AddrForRepo method of the parent MockGitserverClient instance invokes the
|
||||
// hook at the front of the queue and discards it. After the queue is empty,
|
||||
// the default hook function is invoked for any future action.
|
||||
func (f *GitserverClientAddrForRepoFunc) PushHook(hook func(api.RepoName) string) {
|
||||
func (f *GitserverClientAddrForRepoFunc) PushHook(hook func(context.Context, api.RepoName) string) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
@ -95,19 +96,19 @@ func (f *GitserverClientAddrForRepoFunc) PushHook(hook func(api.RepoName) string
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *GitserverClientAddrForRepoFunc) SetDefaultReturn(r0 string) {
|
||||
f.SetDefaultHook(func(api.RepoName) string {
|
||||
f.SetDefaultHook(func(context.Context, api.RepoName) string {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *GitserverClientAddrForRepoFunc) PushReturn(r0 string) {
|
||||
f.PushHook(func(api.RepoName) string {
|
||||
f.PushHook(func(context.Context, api.RepoName) string {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
func (f *GitserverClientAddrForRepoFunc) nextHook() func(api.RepoName) string {
|
||||
func (f *GitserverClientAddrForRepoFunc) nextHook() func(context.Context, api.RepoName) string {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
@ -142,7 +143,10 @@ func (f *GitserverClientAddrForRepoFunc) History() []GitserverClientAddrForRepoF
|
||||
type GitserverClientAddrForRepoFuncCall struct {
|
||||
// Arg0 is the value of the 1st argument passed to this method
|
||||
// invocation.
|
||||
Arg0 api.RepoName
|
||||
Arg0 context.Context
|
||||
// Arg1 is the value of the 2nd argument passed to this method
|
||||
// invocation.
|
||||
Arg1 api.RepoName
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 string
|
||||
@ -151,7 +155,7 @@ type GitserverClientAddrForRepoFuncCall struct {
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c GitserverClientAddrForRepoFuncCall) Args() []interface{} {
|
||||
return []interface{}{c.Arg0}
|
||||
return []interface{}{c.Arg0, c.Arg1}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
|
||||
31
internal/batches/sources/mocks_test.go
generated
31
internal/batches/sources/mocks_test.go
generated
@ -10893,7 +10893,7 @@ type MockGitserverClient struct {
|
||||
func NewMockGitserverClient() *MockGitserverClient {
|
||||
return &MockGitserverClient{
|
||||
AddrForRepoFunc: &GitserverClientAddrForRepoFunc{
|
||||
defaultHook: func(api.RepoName) (r0 string) {
|
||||
defaultHook: func(context.Context, api.RepoName) (r0 string) {
|
||||
return
|
||||
},
|
||||
},
|
||||
@ -11160,7 +11160,7 @@ func NewMockGitserverClient() *MockGitserverClient {
|
||||
func NewStrictMockGitserverClient() *MockGitserverClient {
|
||||
return &MockGitserverClient{
|
||||
AddrForRepoFunc: &GitserverClientAddrForRepoFunc{
|
||||
defaultHook: func(api.RepoName) string {
|
||||
defaultHook: func(context.Context, api.RepoName) string {
|
||||
panic("unexpected invocation of MockGitserverClient.AddrForRepo")
|
||||
},
|
||||
},
|
||||
@ -11589,24 +11589,24 @@ func NewMockGitserverClientFrom(i gitserver.Client) *MockGitserverClient {
|
||||
// GitserverClientAddrForRepoFunc describes the behavior when the
|
||||
// AddrForRepo method of the parent MockGitserverClient instance is invoked.
|
||||
type GitserverClientAddrForRepoFunc struct {
|
||||
defaultHook func(api.RepoName) string
|
||||
hooks []func(api.RepoName) string
|
||||
defaultHook func(context.Context, api.RepoName) string
|
||||
hooks []func(context.Context, api.RepoName) string
|
||||
history []GitserverClientAddrForRepoFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// AddrForRepo delegates to the next hook function in the queue and stores
|
||||
// the parameter and result values of this invocation.
|
||||
func (m *MockGitserverClient) AddrForRepo(v0 api.RepoName) string {
|
||||
r0 := m.AddrForRepoFunc.nextHook()(v0)
|
||||
m.AddrForRepoFunc.appendCall(GitserverClientAddrForRepoFuncCall{v0, r0})
|
||||
func (m *MockGitserverClient) AddrForRepo(v0 context.Context, v1 api.RepoName) string {
|
||||
r0 := m.AddrForRepoFunc.nextHook()(v0, v1)
|
||||
m.AddrForRepoFunc.appendCall(GitserverClientAddrForRepoFuncCall{v0, v1, r0})
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the AddrForRepo method
|
||||
// of the parent MockGitserverClient instance is invoked and the hook queue
|
||||
// is empty.
|
||||
func (f *GitserverClientAddrForRepoFunc) SetDefaultHook(hook func(api.RepoName) string) {
|
||||
func (f *GitserverClientAddrForRepoFunc) SetDefaultHook(hook func(context.Context, api.RepoName) string) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
@ -11614,7 +11614,7 @@ func (f *GitserverClientAddrForRepoFunc) SetDefaultHook(hook func(api.RepoName)
|
||||
// AddrForRepo method of the parent MockGitserverClient instance invokes the
|
||||
// hook at the front of the queue and discards it. After the queue is empty,
|
||||
// the default hook function is invoked for any future action.
|
||||
func (f *GitserverClientAddrForRepoFunc) PushHook(hook func(api.RepoName) string) {
|
||||
func (f *GitserverClientAddrForRepoFunc) PushHook(hook func(context.Context, api.RepoName) string) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
@ -11623,19 +11623,19 @@ func (f *GitserverClientAddrForRepoFunc) PushHook(hook func(api.RepoName) string
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *GitserverClientAddrForRepoFunc) SetDefaultReturn(r0 string) {
|
||||
f.SetDefaultHook(func(api.RepoName) string {
|
||||
f.SetDefaultHook(func(context.Context, api.RepoName) string {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *GitserverClientAddrForRepoFunc) PushReturn(r0 string) {
|
||||
f.PushHook(func(api.RepoName) string {
|
||||
f.PushHook(func(context.Context, api.RepoName) string {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
func (f *GitserverClientAddrForRepoFunc) nextHook() func(api.RepoName) string {
|
||||
func (f *GitserverClientAddrForRepoFunc) nextHook() func(context.Context, api.RepoName) string {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
@ -11670,7 +11670,10 @@ func (f *GitserverClientAddrForRepoFunc) History() []GitserverClientAddrForRepoF
|
||||
type GitserverClientAddrForRepoFuncCall struct {
|
||||
// Arg0 is the value of the 1st argument passed to this method
|
||||
// invocation.
|
||||
Arg0 api.RepoName
|
||||
Arg0 context.Context
|
||||
// Arg1 is the value of the 2nd argument passed to this method
|
||||
// invocation.
|
||||
Arg1 api.RepoName
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 string
|
||||
@ -11679,7 +11682,7 @@ type GitserverClientAddrForRepoFuncCall struct {
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c GitserverClientAddrForRepoFuncCall) Args() []interface{} {
|
||||
return []interface{}{c.Arg0}
|
||||
return []interface{}{c.Arg0, c.Arg1}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
|
||||
@ -26,6 +26,7 @@ go_library(
|
||||
"//cmd/frontend/envvar",
|
||||
"//internal/accesstoken",
|
||||
"//internal/api/internalapi",
|
||||
"//internal/collections",
|
||||
"//internal/conf/confdefaults",
|
||||
"//internal/conf/conftypes",
|
||||
"//internal/conf/deploy",
|
||||
@ -65,6 +66,7 @@ go_test(
|
||||
"//cmd/frontend/envvar",
|
||||
"//internal/accesstoken",
|
||||
"//internal/api/internalapi",
|
||||
"//internal/collections",
|
||||
"//internal/conf/conftypes",
|
||||
"//internal/conf/deploy",
|
||||
"//lib/errors",
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
licensing "github.com/sourcegraph/sourcegraph/internal/accesstoken"
|
||||
"github.com/sourcegraph/sourcegraph/internal/api/internalapi"
|
||||
"github.com/sourcegraph/sourcegraph/internal/collections"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/confdefaults"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/deploy"
|
||||
@ -580,6 +581,18 @@ func GitMaxConcurrentClones() int {
|
||||
return v
|
||||
}
|
||||
|
||||
func GetDeduplicatedForksIndex() collections.Set[string] {
|
||||
index := collections.NewSet[string]()
|
||||
|
||||
repoConf := Get().Repositories
|
||||
if repoConf == nil {
|
||||
return index
|
||||
}
|
||||
|
||||
index.Add(repoConf.DeduplicateForks...)
|
||||
return index
|
||||
}
|
||||
|
||||
// GetCompletionsConfig evaluates a complete completions configuration based on
|
||||
// site configuration. The configuration may be nil if completions is disabled.
|
||||
func GetCompletionsConfig(siteConfig schema.SiteConfiguration) (c *conftypes.CompletionsConfig) {
|
||||
|
||||
@ -8,6 +8,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
licensing "github.com/sourcegraph/sourcegraph/internal/accesstoken"
|
||||
"github.com/sourcegraph/sourcegraph/internal/collections"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/deploy"
|
||||
"github.com/sourcegraph/sourcegraph/lib/pointers"
|
||||
@ -313,6 +314,53 @@ func TestCodyEnabled(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetDeduplicatedForksIndex(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
haveConfig *schema.Repositories
|
||||
wantIndex collections.Set[string]
|
||||
}{
|
||||
{
|
||||
name: "config not set",
|
||||
wantIndex: map[string]struct{}{},
|
||||
},
|
||||
{
|
||||
name: "repositories set, but deduplicated forks is empty",
|
||||
haveConfig: &schema.Repositories{},
|
||||
wantIndex: map[string]struct{}{},
|
||||
},
|
||||
{
|
||||
name: "deduplicated forks is not empty",
|
||||
haveConfig: &schema.Repositories{
|
||||
DeduplicateForks: []string{
|
||||
"abc",
|
||||
"def",
|
||||
"abc", // a duplicate
|
||||
},
|
||||
},
|
||||
wantIndex: map[string]struct{}{
|
||||
"abc": {},
|
||||
"def": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
Mock(&Unified{
|
||||
SiteConfiguration: schema.SiteConfiguration{
|
||||
Repositories: tc.haveConfig,
|
||||
},
|
||||
})
|
||||
|
||||
gotIndex := GetDeduplicatedForksIndex()
|
||||
if diff := cmp.Diff(gotIndex, tc.wantIndex); diff != "" {
|
||||
t.Errorf("mismatched deduplicated repos index: (-want, +got)\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetCompletionsConfig(t *testing.T) {
|
||||
licenseKey := "theasdfkey"
|
||||
licenseAccessToken := licensing.GenerateLicenseKeyBasedAccessToken(licenseKey)
|
||||
|
||||
@ -70,6 +70,12 @@ type GitserverRepoStore interface {
|
||||
UpdateRepoSizes(ctx context.Context, shardID string, repos map[api.RepoName]int64) (int, error)
|
||||
// SetCloningProgress updates a piece of text description from how cloning proceeds.
|
||||
SetCloningProgress(context.Context, api.RepoName, string) error
|
||||
|
||||
// UpdatePoolRepoID updates the pool_repo_id column of a gitserver_repo.
|
||||
UpdatePoolRepoID(context.Context, api.RepoName, api.RepoName) error
|
||||
|
||||
// GetPoolRepo will return the PoolRepo of a repository matching the repo name if it exists.
|
||||
GetPoolRepo(context.Context, api.RepoName) (*types.PoolRepo, error)
|
||||
}
|
||||
|
||||
var _ GitserverRepoStore = (*gitserverRepoStore)(nil)
|
||||
@ -117,6 +123,83 @@ func (s *gitserverRepoStore) Update(ctx context.Context, repos ...*types.Gitserv
|
||||
return errors.Wrap(err, "updating GitserverRepo")
|
||||
}
|
||||
|
||||
const updatePoolRepoIDQueryFmtStr = `
|
||||
-- find the repo that matches the poolRepoName
|
||||
WITH pool AS (
|
||||
SELECT
|
||||
id
|
||||
FROM
|
||||
repo
|
||||
WHERE
|
||||
name = %s::citext
|
||||
),
|
||||
-- find the repo that matches repoName
|
||||
repo AS (
|
||||
SELECT
|
||||
id
|
||||
FROM
|
||||
repo
|
||||
WHERE
|
||||
name = %s::citext
|
||||
)
|
||||
-- now update the pool_repo_id of repo with the repo.id of the pool
|
||||
UPDATE
|
||||
gitserver_repos
|
||||
SET
|
||||
pool_repo_id = (SELECT id FROM pool)
|
||||
WHERE
|
||||
repo_id = (SELECT id FROM repo)
|
||||
`
|
||||
|
||||
// UpdatePoolRepoID updates the repo matching `repoName` with the repo ID of the repo matching
|
||||
// `poolRepoName`.
|
||||
func (s *gitserverRepoStore) UpdatePoolRepoID(ctx context.Context, poolRepoName, repoName api.RepoName) error {
|
||||
err := s.Exec(ctx, sqlf.Sprintf(updatePoolRepoIDQueryFmtStr, poolRepoName, repoName))
|
||||
return errors.Wrap(err, "UpdatePoolRepoID: failed to add pool_repo_id to gitserver_repos row")
|
||||
}
|
||||
|
||||
const getPoolRepoQueryFmtStr = `
|
||||
WITH gs AS (
|
||||
SELECT
|
||||
gitserver_repos.pool_repo_id
|
||||
FROM
|
||||
gitserver_repos
|
||||
JOIN repo AS r ON repo_id = r.id
|
||||
WHERE
|
||||
name = %s::citext
|
||||
)
|
||||
SELECT
|
||||
name,
|
||||
uri
|
||||
FROM
|
||||
repo
|
||||
WHERE
|
||||
id = (SELECT pool_repo_id FROM gs)
|
||||
`
|
||||
|
||||
func (s *gitserverRepoStore) GetPoolRepo(ctx context.Context, repoURI api.RepoName) (*types.PoolRepo, error) {
|
||||
row := s.QueryRow(ctx, sqlf.Sprintf(getPoolRepoQueryFmtStr, repoURI))
|
||||
|
||||
poolRepo, err := scanPoolRepo(row)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, errors.Wrap(err, "GetPoolRepoURI failed")
|
||||
}
|
||||
|
||||
return poolRepo, nil
|
||||
}
|
||||
|
||||
func scanPoolRepo(scanner dbutil.Scanner) (*types.PoolRepo, error) {
|
||||
var poolRepo types.PoolRepo
|
||||
err := scanner.Scan(&poolRepo.RepoName, &poolRepo.RepoURI)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to scan poolRepo")
|
||||
}
|
||||
return &poolRepo, nil
|
||||
}
|
||||
|
||||
const updateGitserverReposQueryFmtstr = `
|
||||
UPDATE gitserver_repos AS gr
|
||||
SET
|
||||
@ -316,6 +399,7 @@ SELECT
|
||||
gr.updated_at,
|
||||
gr.corrupted_at,
|
||||
gr.corruption_logs,
|
||||
gr.pool_repo_id,
|
||||
go.last_output
|
||||
FROM gitserver_repos gr
|
||||
JOIN repo ON gr.repo_id = repo.id
|
||||
@ -351,6 +435,7 @@ SELECT
|
||||
gr.updated_at,
|
||||
gr.corrupted_at,
|
||||
gr.corruption_logs,
|
||||
gr.pool_repo_id,
|
||||
go.last_output
|
||||
FROM gitserver_repos gr
|
||||
LEFT OUTER JOIN gitserver_repos_sync_output go ON gr.repo_id = go.repo_id
|
||||
@ -383,6 +468,7 @@ SELECT
|
||||
gr.updated_at,
|
||||
gr.corrupted_at,
|
||||
gr.corruption_logs,
|
||||
gr.pool_repo_id,
|
||||
go.last_output
|
||||
FROM gitserver_repos gr
|
||||
JOIN repo r ON r.id = gr.repo_id
|
||||
@ -409,6 +495,7 @@ SELECT
|
||||
gr.updated_at,
|
||||
gr.corrupted_at,
|
||||
gr.corruption_logs,
|
||||
gr.pool_repo_id,
|
||||
go.last_output
|
||||
FROM gitserver_repos gr
|
||||
JOIN repo r on r.id = gr.repo_id
|
||||
@ -441,6 +528,7 @@ func scanGitserverRepo(scanner dbutil.Scanner) (*types.GitserverRepo, api.RepoNa
|
||||
var rawLogs []byte
|
||||
var cloneStatus string
|
||||
var repoName api.RepoName
|
||||
var poolRepoID int32
|
||||
err := scanner.Scan(
|
||||
&gr.RepoID,
|
||||
&repoName,
|
||||
@ -454,13 +542,19 @@ func scanGitserverRepo(scanner dbutil.Scanner) (*types.GitserverRepo, api.RepoNa
|
||||
&gr.UpdatedAt,
|
||||
&dbutil.NullTime{Time: &gr.CorruptedAt},
|
||||
&rawLogs,
|
||||
&dbutil.NullInt32{N: &poolRepoID},
|
||||
&dbutil.NullString{S: &gr.LastSyncOutput},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, "", errors.Wrap(err, "scanning GitserverRepo")
|
||||
}
|
||||
|
||||
gr.CloneStatus = types.ParseCloneStatus(cloneStatus)
|
||||
|
||||
if poolRepoID != 0 {
|
||||
gr.PoolRepoID = api.RepoID(poolRepoID)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(rawLogs, &gr.CorruptionLogs)
|
||||
if err != nil {
|
||||
return nil, repoName, errors.Wrap(err, "unmarshal of corruption_logs failed")
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/keegancsmith/sqlf"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/sourcegraph/log/logtest"
|
||||
|
||||
@ -1198,10 +1199,78 @@ func TestGitserverUpdateRepoSizes(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGitserverRepos_UpdatePoolRepoID_And_GetPoolRepo(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
logger := logtest.Scoped(t)
|
||||
db := NewDB(logger, dbtest.NewDB(logger, t))
|
||||
ctx := context.Background()
|
||||
|
||||
// Create one test poolRepo
|
||||
poolRepo, _ := createTestRepo(ctx, t, db, &createTestRepoPayload{
|
||||
Name: "internal.github.com/sourcegraph/repo",
|
||||
URI: "github.com/sourcegraph/repo",
|
||||
CloneStatus: types.CloneStatusNotCloned,
|
||||
RepoSizeBytes: 100,
|
||||
})
|
||||
|
||||
// Create one test repo
|
||||
forkedRepo, _ := createTestRepo(ctx, t, db, &createTestRepoPayload{
|
||||
Name: "internal.github.com/forked/repo",
|
||||
URI: "github.com/forked/repo",
|
||||
CloneStatus: types.CloneStatusNotCloned,
|
||||
RepoSizeBytes: 100,
|
||||
})
|
||||
|
||||
t.Run("pool repo does not exist", func(t *testing.T) {
|
||||
err := db.GitserverRepos().UpdatePoolRepoID(ctx, api.RepoName("foo"), forkedRepo.Name)
|
||||
require.NoError(t, err)
|
||||
|
||||
gotPoolRepo, err := db.GitserverRepos().GetPoolRepo(ctx, forkedRepo.Name)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, gotPoolRepo)
|
||||
|
||||
})
|
||||
|
||||
t.Run("forked repo does not exist", func(t *testing.T) {
|
||||
err := db.GitserverRepos().UpdatePoolRepoID(ctx, poolRepo.Name, api.RepoName("foo"))
|
||||
require.NoError(t, err)
|
||||
|
||||
gotPoolRepo, err := db.GitserverRepos().GetPoolRepo(ctx, api.RepoName("foo"))
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, gotPoolRepo)
|
||||
})
|
||||
|
||||
t.Run("both pool and forked repo do not exist", func(t *testing.T) {
|
||||
err := db.GitserverRepos().UpdatePoolRepoID(ctx, api.RepoName("foo"), api.RepoName("bar"))
|
||||
require.NoError(t, err)
|
||||
|
||||
gotPoolRepo, err := db.GitserverRepos().GetPoolRepo(ctx, api.RepoName("bar"))
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, gotPoolRepo)
|
||||
})
|
||||
|
||||
t.Run("both pool and forked repo exist", func(t *testing.T) {
|
||||
err := db.GitserverRepos().UpdatePoolRepoID(ctx, poolRepo.Name, forkedRepo.Name)
|
||||
require.NoError(t, err)
|
||||
gotPoolRepo, err := db.GitserverRepos().GetPoolRepo(ctx, forkedRepo.Name)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, gotPoolRepo)
|
||||
|
||||
wantPoolRepo := types.PoolRepo{RepoName: poolRepo.Name, RepoURI: poolRepo.URI}
|
||||
if diff := cmp.Diff(wantPoolRepo, *gotPoolRepo); diff != "" {
|
||||
t.Fatalf("mismatched pool repo got, (-want, +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func createTestRepo(ctx context.Context, t *testing.T, db DB, payload *createTestRepoPayload) (*types.Repo, *types.GitserverRepo) {
|
||||
t.Helper()
|
||||
|
||||
repo := &types.Repo{Name: payload.Name}
|
||||
repo := &types.Repo{Name: payload.Name, URI: payload.URI}
|
||||
|
||||
// Create Repo
|
||||
err := db.Repos().Create(ctx, repo)
|
||||
@ -1237,6 +1306,8 @@ type createTestRepoPayload struct {
|
||||
// Previously, this was called RepoURI.
|
||||
Name api.RepoName
|
||||
|
||||
URI string
|
||||
|
||||
// Gitserver related properties
|
||||
|
||||
// Size of the repository in bytes.
|
||||
|
||||
@ -35618,6 +35618,9 @@ type MockGitserverRepoStore struct {
|
||||
// GetByNamesFunc is an instance of a mock function object controlling
|
||||
// the behavior of the method GetByNames.
|
||||
GetByNamesFunc *GitserverRepoStoreGetByNamesFunc
|
||||
// GetPoolRepoFunc is an instance of a mock function object controlling
|
||||
// the behavior of the method GetPoolRepo.
|
||||
GetPoolRepoFunc *GitserverRepoStoreGetPoolRepoFunc
|
||||
// HandleFunc is an instance of a mock function object controlling the
|
||||
// behavior of the method Handle.
|
||||
HandleFunc *GitserverRepoStoreHandleFunc
|
||||
@ -35662,6 +35665,9 @@ type MockGitserverRepoStore struct {
|
||||
// UpdateFunc is an instance of a mock function object controlling the
|
||||
// behavior of the method Update.
|
||||
UpdateFunc *GitserverRepoStoreUpdateFunc
|
||||
// UpdatePoolRepoIDFunc is an instance of a mock function object
|
||||
// controlling the behavior of the method UpdatePoolRepoID.
|
||||
UpdatePoolRepoIDFunc *GitserverRepoStoreUpdatePoolRepoIDFunc
|
||||
// UpdateRepoSizesFunc is an instance of a mock function object
|
||||
// controlling the behavior of the method UpdateRepoSizes.
|
||||
UpdateRepoSizesFunc *GitserverRepoStoreUpdateRepoSizesFunc
|
||||
@ -35690,6 +35696,11 @@ func NewMockGitserverRepoStore() *MockGitserverRepoStore {
|
||||
return
|
||||
},
|
||||
},
|
||||
GetPoolRepoFunc: &GitserverRepoStoreGetPoolRepoFunc{
|
||||
defaultHook: func(context.Context, api.RepoName) (r0 *types.PoolRepo, r1 error) {
|
||||
return
|
||||
},
|
||||
},
|
||||
HandleFunc: &GitserverRepoStoreHandleFunc{
|
||||
defaultHook: func() (r0 basestore.TransactableHandle) {
|
||||
return
|
||||
@ -35760,6 +35771,11 @@ func NewMockGitserverRepoStore() *MockGitserverRepoStore {
|
||||
return
|
||||
},
|
||||
},
|
||||
UpdatePoolRepoIDFunc: &GitserverRepoStoreUpdatePoolRepoIDFunc{
|
||||
defaultHook: func(context.Context, api.RepoName, api.RepoName) (r0 error) {
|
||||
return
|
||||
},
|
||||
},
|
||||
UpdateRepoSizesFunc: &GitserverRepoStoreUpdateRepoSizesFunc{
|
||||
defaultHook: func(context.Context, string, map[api.RepoName]int64) (r0 int, r1 error) {
|
||||
return
|
||||
@ -35793,6 +35809,11 @@ func NewStrictMockGitserverRepoStore() *MockGitserverRepoStore {
|
||||
panic("unexpected invocation of MockGitserverRepoStore.GetByNames")
|
||||
},
|
||||
},
|
||||
GetPoolRepoFunc: &GitserverRepoStoreGetPoolRepoFunc{
|
||||
defaultHook: func(context.Context, api.RepoName) (*types.PoolRepo, error) {
|
||||
panic("unexpected invocation of MockGitserverRepoStore.GetPoolRepo")
|
||||
},
|
||||
},
|
||||
HandleFunc: &GitserverRepoStoreHandleFunc{
|
||||
defaultHook: func() basestore.TransactableHandle {
|
||||
panic("unexpected invocation of MockGitserverRepoStore.Handle")
|
||||
@ -35863,6 +35884,11 @@ func NewStrictMockGitserverRepoStore() *MockGitserverRepoStore {
|
||||
panic("unexpected invocation of MockGitserverRepoStore.Update")
|
||||
},
|
||||
},
|
||||
UpdatePoolRepoIDFunc: &GitserverRepoStoreUpdatePoolRepoIDFunc{
|
||||
defaultHook: func(context.Context, api.RepoName, api.RepoName) error {
|
||||
panic("unexpected invocation of MockGitserverRepoStore.UpdatePoolRepoID")
|
||||
},
|
||||
},
|
||||
UpdateRepoSizesFunc: &GitserverRepoStoreUpdateRepoSizesFunc{
|
||||
defaultHook: func(context.Context, string, map[api.RepoName]int64) (int, error) {
|
||||
panic("unexpected invocation of MockGitserverRepoStore.UpdateRepoSizes")
|
||||
@ -35890,6 +35916,9 @@ func NewMockGitserverRepoStoreFrom(i GitserverRepoStore) *MockGitserverRepoStore
|
||||
GetByNamesFunc: &GitserverRepoStoreGetByNamesFunc{
|
||||
defaultHook: i.GetByNames,
|
||||
},
|
||||
GetPoolRepoFunc: &GitserverRepoStoreGetPoolRepoFunc{
|
||||
defaultHook: i.GetPoolRepo,
|
||||
},
|
||||
HandleFunc: &GitserverRepoStoreHandleFunc{
|
||||
defaultHook: i.Handle,
|
||||
},
|
||||
@ -35932,6 +35961,9 @@ func NewMockGitserverRepoStoreFrom(i GitserverRepoStore) *MockGitserverRepoStore
|
||||
UpdateFunc: &GitserverRepoStoreUpdateFunc{
|
||||
defaultHook: i.Update,
|
||||
},
|
||||
UpdatePoolRepoIDFunc: &GitserverRepoStoreUpdatePoolRepoIDFunc{
|
||||
defaultHook: i.UpdatePoolRepoID,
|
||||
},
|
||||
UpdateRepoSizesFunc: &GitserverRepoStoreUpdateRepoSizesFunc{
|
||||
defaultHook: i.UpdateRepoSizes,
|
||||
},
|
||||
@ -36273,6 +36305,116 @@ func (c GitserverRepoStoreGetByNamesFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0, c.Result1}
|
||||
}
|
||||
|
||||
// GitserverRepoStoreGetPoolRepoFunc describes the behavior when the
|
||||
// GetPoolRepo method of the parent MockGitserverRepoStore instance is
|
||||
// invoked.
|
||||
type GitserverRepoStoreGetPoolRepoFunc struct {
|
||||
defaultHook func(context.Context, api.RepoName) (*types.PoolRepo, error)
|
||||
hooks []func(context.Context, api.RepoName) (*types.PoolRepo, error)
|
||||
history []GitserverRepoStoreGetPoolRepoFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// GetPoolRepo delegates to the next hook function in the queue and stores
|
||||
// the parameter and result values of this invocation.
|
||||
func (m *MockGitserverRepoStore) GetPoolRepo(v0 context.Context, v1 api.RepoName) (*types.PoolRepo, error) {
|
||||
r0, r1 := m.GetPoolRepoFunc.nextHook()(v0, v1)
|
||||
m.GetPoolRepoFunc.appendCall(GitserverRepoStoreGetPoolRepoFuncCall{v0, v1, r0, r1})
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the GetPoolRepo method
|
||||
// of the parent MockGitserverRepoStore instance is invoked and the hook
|
||||
// queue is empty.
|
||||
func (f *GitserverRepoStoreGetPoolRepoFunc) SetDefaultHook(hook func(context.Context, api.RepoName) (*types.PoolRepo, error)) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
// PushHook adds a function to the end of hook queue. Each invocation of the
|
||||
// GetPoolRepo method of the parent MockGitserverRepoStore instance invokes
|
||||
// the hook at the front of the queue and discards it. After the queue is
|
||||
// empty, the default hook function is invoked for any future action.
|
||||
func (f *GitserverRepoStoreGetPoolRepoFunc) PushHook(hook func(context.Context, api.RepoName) (*types.PoolRepo, error)) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *GitserverRepoStoreGetPoolRepoFunc) SetDefaultReturn(r0 *types.PoolRepo, r1 error) {
|
||||
f.SetDefaultHook(func(context.Context, api.RepoName) (*types.PoolRepo, error) {
|
||||
return r0, r1
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *GitserverRepoStoreGetPoolRepoFunc) PushReturn(r0 *types.PoolRepo, r1 error) {
|
||||
f.PushHook(func(context.Context, api.RepoName) (*types.PoolRepo, error) {
|
||||
return r0, r1
|
||||
})
|
||||
}
|
||||
|
||||
func (f *GitserverRepoStoreGetPoolRepoFunc) nextHook() func(context.Context, api.RepoName) (*types.PoolRepo, error) {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
if len(f.hooks) == 0 {
|
||||
return f.defaultHook
|
||||
}
|
||||
|
||||
hook := f.hooks[0]
|
||||
f.hooks = f.hooks[1:]
|
||||
return hook
|
||||
}
|
||||
|
||||
func (f *GitserverRepoStoreGetPoolRepoFunc) appendCall(r0 GitserverRepoStoreGetPoolRepoFuncCall) {
|
||||
f.mutex.Lock()
|
||||
f.history = append(f.history, r0)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// History returns a sequence of GitserverRepoStoreGetPoolRepoFuncCall
|
||||
// objects describing the invocations of this function.
|
||||
func (f *GitserverRepoStoreGetPoolRepoFunc) History() []GitserverRepoStoreGetPoolRepoFuncCall {
|
||||
f.mutex.Lock()
|
||||
history := make([]GitserverRepoStoreGetPoolRepoFuncCall, len(f.history))
|
||||
copy(history, f.history)
|
||||
f.mutex.Unlock()
|
||||
|
||||
return history
|
||||
}
|
||||
|
||||
// GitserverRepoStoreGetPoolRepoFuncCall is an object that describes an
|
||||
// invocation of method GetPoolRepo on an instance of
|
||||
// MockGitserverRepoStore.
|
||||
type GitserverRepoStoreGetPoolRepoFuncCall struct {
|
||||
// Arg0 is the value of the 1st argument passed to this method
|
||||
// invocation.
|
||||
Arg0 context.Context
|
||||
// Arg1 is the value of the 2nd argument passed to this method
|
||||
// invocation.
|
||||
Arg1 api.RepoName
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 *types.PoolRepo
|
||||
// Result1 is the value of the 2nd result returned from this method
|
||||
// invocation.
|
||||
Result1 error
|
||||
}
|
||||
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c GitserverRepoStoreGetPoolRepoFuncCall) Args() []interface{} {
|
||||
return []interface{}{c.Arg0, c.Arg1}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
// invocation.
|
||||
func (c GitserverRepoStoreGetPoolRepoFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0, c.Result1}
|
||||
}
|
||||
|
||||
// GitserverRepoStoreHandleFunc describes the behavior when the Handle
|
||||
// method of the parent MockGitserverRepoStore instance is invoked.
|
||||
type GitserverRepoStoreHandleFunc struct {
|
||||
@ -37826,6 +37968,117 @@ func (c GitserverRepoStoreUpdateFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0}
|
||||
}
|
||||
|
||||
// GitserverRepoStoreUpdatePoolRepoIDFunc describes the behavior when the
|
||||
// UpdatePoolRepoID method of the parent MockGitserverRepoStore instance is
|
||||
// invoked.
|
||||
type GitserverRepoStoreUpdatePoolRepoIDFunc struct {
|
||||
defaultHook func(context.Context, api.RepoName, api.RepoName) error
|
||||
hooks []func(context.Context, api.RepoName, api.RepoName) error
|
||||
history []GitserverRepoStoreUpdatePoolRepoIDFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// UpdatePoolRepoID delegates to the next hook function in the queue and
|
||||
// stores the parameter and result values of this invocation.
|
||||
func (m *MockGitserverRepoStore) UpdatePoolRepoID(v0 context.Context, v1 api.RepoName, v2 api.RepoName) error {
|
||||
r0 := m.UpdatePoolRepoIDFunc.nextHook()(v0, v1, v2)
|
||||
m.UpdatePoolRepoIDFunc.appendCall(GitserverRepoStoreUpdatePoolRepoIDFuncCall{v0, v1, v2, r0})
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the UpdatePoolRepoID
|
||||
// method of the parent MockGitserverRepoStore instance is invoked and the
|
||||
// hook queue is empty.
|
||||
func (f *GitserverRepoStoreUpdatePoolRepoIDFunc) SetDefaultHook(hook func(context.Context, api.RepoName, api.RepoName) error) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
// PushHook adds a function to the end of hook queue. Each invocation of the
|
||||
// UpdatePoolRepoID method of the parent MockGitserverRepoStore instance
|
||||
// invokes the hook at the front of the queue and discards it. After the
|
||||
// queue is empty, the default hook function is invoked for any future
|
||||
// action.
|
||||
func (f *GitserverRepoStoreUpdatePoolRepoIDFunc) PushHook(hook func(context.Context, api.RepoName, api.RepoName) error) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *GitserverRepoStoreUpdatePoolRepoIDFunc) SetDefaultReturn(r0 error) {
|
||||
f.SetDefaultHook(func(context.Context, api.RepoName, api.RepoName) error {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *GitserverRepoStoreUpdatePoolRepoIDFunc) PushReturn(r0 error) {
|
||||
f.PushHook(func(context.Context, api.RepoName, api.RepoName) error {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
func (f *GitserverRepoStoreUpdatePoolRepoIDFunc) nextHook() func(context.Context, api.RepoName, api.RepoName) error {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
if len(f.hooks) == 0 {
|
||||
return f.defaultHook
|
||||
}
|
||||
|
||||
hook := f.hooks[0]
|
||||
f.hooks = f.hooks[1:]
|
||||
return hook
|
||||
}
|
||||
|
||||
func (f *GitserverRepoStoreUpdatePoolRepoIDFunc) appendCall(r0 GitserverRepoStoreUpdatePoolRepoIDFuncCall) {
|
||||
f.mutex.Lock()
|
||||
f.history = append(f.history, r0)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// History returns a sequence of GitserverRepoStoreUpdatePoolRepoIDFuncCall
|
||||
// objects describing the invocations of this function.
|
||||
func (f *GitserverRepoStoreUpdatePoolRepoIDFunc) History() []GitserverRepoStoreUpdatePoolRepoIDFuncCall {
|
||||
f.mutex.Lock()
|
||||
history := make([]GitserverRepoStoreUpdatePoolRepoIDFuncCall, len(f.history))
|
||||
copy(history, f.history)
|
||||
f.mutex.Unlock()
|
||||
|
||||
return history
|
||||
}
|
||||
|
||||
// GitserverRepoStoreUpdatePoolRepoIDFuncCall is an object that describes an
|
||||
// invocation of method UpdatePoolRepoID on an instance of
|
||||
// MockGitserverRepoStore.
|
||||
type GitserverRepoStoreUpdatePoolRepoIDFuncCall struct {
|
||||
// Arg0 is the value of the 1st argument passed to this method
|
||||
// invocation.
|
||||
Arg0 context.Context
|
||||
// Arg1 is the value of the 2nd argument passed to this method
|
||||
// invocation.
|
||||
Arg1 api.RepoName
|
||||
// Arg2 is the value of the 3rd argument passed to this method
|
||||
// invocation.
|
||||
Arg2 api.RepoName
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 error
|
||||
}
|
||||
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c GitserverRepoStoreUpdatePoolRepoIDFuncCall) Args() []interface{} {
|
||||
return []interface{}{c.Arg0, c.Arg1, c.Arg2}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
// invocation.
|
||||
func (c GitserverRepoStoreUpdatePoolRepoIDFuncCall) Results() []interface{} {
|
||||
return []interface{}{c.Result0}
|
||||
}
|
||||
|
||||
// GitserverRepoStoreUpdateRepoSizesFunc describes the behavior when the
|
||||
// UpdateRepoSizes method of the parent MockGitserverRepoStore instance is
|
||||
// invoked.
|
||||
|
||||
@ -13445,6 +13445,19 @@
|
||||
"GenerationExpression": "",
|
||||
"Comment": ""
|
||||
},
|
||||
{
|
||||
"Name": "pool_repo_id",
|
||||
"Index": 13,
|
||||
"TypeName": "integer",
|
||||
"IsNullable": true,
|
||||
"Default": "",
|
||||
"CharacterMaximumLength": 0,
|
||||
"IsIdentity": false,
|
||||
"IdentityGeneration": "",
|
||||
"IsGenerated": "NEVER",
|
||||
"GenerationExpression": "",
|
||||
"Comment": "This is used to refer to the pool repository for deduplicated repos"
|
||||
},
|
||||
{
|
||||
"Name": "repo_id",
|
||||
"Index": 1,
|
||||
@ -13591,6 +13604,13 @@
|
||||
}
|
||||
],
|
||||
"Constraints": [
|
||||
{
|
||||
"Name": "gitserver_repos_pool_repo_id_fkey",
|
||||
"ConstraintType": "f",
|
||||
"RefTableName": "repo",
|
||||
"IsDeferrable": false,
|
||||
"ConstraintDefinition": "FOREIGN KEY (pool_repo_id) REFERENCES repo(id)"
|
||||
},
|
||||
{
|
||||
"Name": "gitserver_repos_repo_id_fkey",
|
||||
"ConstraintType": "f",
|
||||
|
||||
@ -1831,6 +1831,7 @@ Indexes:
|
||||
corrupted_at | timestamp with time zone | | |
|
||||
corruption_logs | jsonb | | not null | '[]'::jsonb
|
||||
cloning_progress | text | | | ''::text
|
||||
pool_repo_id | integer | | |
|
||||
Indexes:
|
||||
"gitserver_repos_pkey" PRIMARY KEY, btree (repo_id)
|
||||
"gitserver_repo_size_bytes" btree (repo_size_bytes)
|
||||
@ -1842,6 +1843,7 @@ Indexes:
|
||||
"gitserver_repos_not_explicitly_cloned_idx" btree (repo_id) WHERE clone_status <> 'cloned'::text
|
||||
"gitserver_repos_shard_id" btree (shard_id, repo_id)
|
||||
Foreign-key constraints:
|
||||
"gitserver_repos_pool_repo_id_fkey" FOREIGN KEY (pool_repo_id) REFERENCES repo(id)
|
||||
"gitserver_repos_repo_id_fkey" FOREIGN KEY (repo_id) REFERENCES repo(id) ON DELETE CASCADE
|
||||
Triggers:
|
||||
trig_recalc_gitserver_repos_statistics_on_delete AFTER DELETE ON gitserver_repos REFERENCING OLD TABLE AS oldtab FOR EACH STATEMENT EXECUTE FUNCTION recalc_gitserver_repos_statistics_on_delete()
|
||||
@ -1854,6 +1856,8 @@ Triggers:
|
||||
|
||||
**corruption_logs**: Log output of repo corruptions that have been detected - encoded as json
|
||||
|
||||
**pool_repo_id**: This is used to refer to the pool repository for deduplicated repos
|
||||
|
||||
# Table "public.gitserver_repos_statistics"
|
||||
```
|
||||
Column | Type | Collation | Nullable | Default
|
||||
@ -3455,6 +3459,7 @@ Referenced by:
|
||||
TABLE "codeowners" CONSTRAINT "codeowners_repo_id_fkey" FOREIGN KEY (repo_id) REFERENCES repo(id) ON DELETE CASCADE
|
||||
TABLE "discussion_threads_target_repo" CONSTRAINT "discussion_threads_target_repo_repo_id_fkey" FOREIGN KEY (repo_id) REFERENCES repo(id) ON DELETE CASCADE
|
||||
TABLE "external_service_repos" CONSTRAINT "external_service_repos_repo_id_fkey" FOREIGN KEY (repo_id) REFERENCES repo(id) ON DELETE CASCADE DEFERRABLE
|
||||
TABLE "gitserver_repos" CONSTRAINT "gitserver_repos_pool_repo_id_fkey" FOREIGN KEY (pool_repo_id) REFERENCES repo(id)
|
||||
TABLE "gitserver_repos" CONSTRAINT "gitserver_repos_repo_id_fkey" FOREIGN KEY (repo_id) REFERENCES repo(id) ON DELETE CASCADE
|
||||
TABLE "gitserver_repos_sync_output" CONSTRAINT "gitserver_repos_sync_output_repo_id_fkey" FOREIGN KEY (repo_id) REFERENCES repo(id) ON DELETE CASCADE
|
||||
TABLE "lsif_index_configuration" CONSTRAINT "lsif_index_configuration_repository_id_fkey" FOREIGN KEY (repository_id) REFERENCES repo(id) ON DELETE CASCADE
|
||||
|
||||
@ -1,11 +1,14 @@
|
||||
package gitserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
@ -17,6 +20,7 @@ import (
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/gitserver/protocol"
|
||||
proto "github.com/sourcegraph/sourcegraph/internal/gitserver/v1"
|
||||
@ -26,15 +30,28 @@ import (
|
||||
|
||||
const maxMessageSizeBytes = 64 * 1024 * 1024 // 64MiB
|
||||
|
||||
var addrForRepoInvoked = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "src_gitserver_addr_for_repo_invoked",
|
||||
Help: "Number of times gitserver.AddrForRepo was invoked",
|
||||
}, []string{"user_agent"})
|
||||
var (
|
||||
addrForRepoInvoked = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "src_gitserver_addr_for_repo_invoked",
|
||||
Help: "Number of times gitserver.AddrForRepo was invoked",
|
||||
}, []string{"user_agent"})
|
||||
|
||||
// NewGitserverAddressesFromConf fetches the current set of gitserver addresses
|
||||
addrForRepoCacheHit = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "src_gitserver_addr_for_repo_cache_hit",
|
||||
Help: "Number of cache hits of the repoAddressCache managed by GitserverAddresses",
|
||||
}, []string{"user_agent"})
|
||||
|
||||
addrForRepoCacheMiss = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "src_gitserver_addr_for_repo_cache_miss",
|
||||
Help: "Number of cache misses of the repoAddressCache managed by GitserverAddresses",
|
||||
}, []string{"user_agent"})
|
||||
)
|
||||
|
||||
// NewGitserverAddresses fetches the current set of gitserver addresses
|
||||
// and pinned repos for gitserver.
|
||||
func NewGitserverAddressesFromConf(cfg *conf.Unified) GitserverAddresses {
|
||||
func NewGitserverAddresses(db database.DB, cfg *conf.Unified) GitserverAddresses {
|
||||
addrs := GitserverAddresses{
|
||||
db: db,
|
||||
Addresses: cfg.ServiceConnectionConfig.GitServers,
|
||||
}
|
||||
if cfg.ExperimentalFeatures != nil {
|
||||
@ -53,7 +70,7 @@ type TestClientSourceOptions struct {
|
||||
Logger log.Logger
|
||||
}
|
||||
|
||||
func NewTestClientSource(t *testing.T, addrs []string, options ...func(o *TestClientSourceOptions)) ClientSource {
|
||||
func NewTestClientSource(t *testing.T, db database.DB, addrs []string, options ...func(o *TestClientSourceOptions)) ClientSource {
|
||||
logger := logtest.Scoped(t)
|
||||
opts := TestClientSourceOptions{
|
||||
ClientFunc: func(conn *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
@ -81,8 +98,11 @@ func NewTestClientSource(t *testing.T, addrs []string, options ...func(o *TestCl
|
||||
}
|
||||
|
||||
source := testGitserverConns{
|
||||
logger: logger,
|
||||
conns: &GitserverConns{
|
||||
logger: logger,
|
||||
GitserverAddresses: GitserverAddresses{
|
||||
db: db,
|
||||
Addresses: addrs,
|
||||
},
|
||||
grpcConns: conns,
|
||||
@ -96,6 +116,7 @@ func NewTestClientSource(t *testing.T, addrs []string, options ...func(o *TestCl
|
||||
}
|
||||
|
||||
type testGitserverConns struct {
|
||||
logger log.Logger
|
||||
conns *GitserverConns
|
||||
testAddresses []AddressWithClient
|
||||
|
||||
@ -103,8 +124,8 @@ type testGitserverConns struct {
|
||||
}
|
||||
|
||||
// AddrForRepo returns the gitserver address to use for the given repo name.
|
||||
func (c *testGitserverConns) AddrForRepo(userAgent string, repo api.RepoName) string {
|
||||
return c.conns.AddrForRepo(userAgent, repo)
|
||||
func (c *testGitserverConns) AddrForRepo(ctx context.Context, userAgent string, repo api.RepoName) string {
|
||||
return c.conns.AddrForRepo(ctx, c.logger, userAgent, repo)
|
||||
}
|
||||
|
||||
// Addresses returns the current list of gitserver addresses.
|
||||
@ -113,8 +134,8 @@ func (c *testGitserverConns) Addresses() []AddressWithClient {
|
||||
}
|
||||
|
||||
// ClientForRepo returns a client or host for the given repo name.
|
||||
func (c *testGitserverConns) ClientForRepo(userAgent string, repo api.RepoName) (proto.GitserverServiceClient, error) {
|
||||
conn, err := c.conns.ConnForRepo(userAgent, repo)
|
||||
func (c *testGitserverConns) ClientForRepo(ctx context.Context, userAgent string, repo api.RepoName) (proto.GitserverServiceClient, error) {
|
||||
conn, err := c.conns.ConnForRepo(ctx, userAgent, repo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -122,8 +143,8 @@ func (c *testGitserverConns) ClientForRepo(userAgent string, repo api.RepoName)
|
||||
return c.clientFunc(conn), nil
|
||||
}
|
||||
|
||||
func (c *testGitserverConns) ConnForRepo(userAgent string, repo api.RepoName) (*grpc.ClientConn, error) {
|
||||
return c.conns.ConnForRepo(userAgent, repo)
|
||||
func (c *testGitserverConns) ConnForRepo(ctx context.Context, userAgent string, repo api.RepoName) (*grpc.ClientConn, error) {
|
||||
return c.conns.ConnForRepo(ctx, userAgent, repo)
|
||||
}
|
||||
|
||||
type testConnAndErr struct {
|
||||
@ -146,7 +167,66 @@ func (t *testConnAndErr) GRPCClient() (proto.GitserverServiceClient, error) {
|
||||
var _ ClientSource = &testGitserverConns{}
|
||||
var _ AddressWithClient = &testConnAndErr{}
|
||||
|
||||
const repoAddressCacheTTL = 15 * time.Minute
|
||||
|
||||
var ttlJitterGenerator = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
type repoAddressCachedItem struct {
|
||||
// address is the gitserver address of the repository.
|
||||
address string
|
||||
|
||||
// expiresAt is the time beyond which this item is considered stale.
|
||||
expiresAt time.Time
|
||||
}
|
||||
|
||||
// repoAddressCache is used to cache the gitserver shard address of a repo. It is safe for
|
||||
// concurrent access.
|
||||
//
|
||||
// but ultimately leaves the decision of invalidating the cache to the consumer.
|
||||
type repoAddressCache struct {
|
||||
mu sync.RWMutex
|
||||
cache map[api.RepoName]repoAddressCachedItem
|
||||
}
|
||||
|
||||
// Read returns the item from cache or else returns nil if it does not exist.
|
||||
func (rc *repoAddressCache) Read(name api.RepoName) *repoAddressCachedItem {
|
||||
// We might have to wait for the lock, so get the current timestamp first.
|
||||
now := time.Now()
|
||||
|
||||
rc.mu.RLock()
|
||||
defer rc.mu.RUnlock()
|
||||
|
||||
item, ok := rc.cache[name]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if now.After(item.expiresAt) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &item
|
||||
}
|
||||
|
||||
// Write inserts a new repoAddressCachedItem in the cache for the given repo name. It will overwrite
|
||||
// the cache if an entry already exists in the cache.
|
||||
func (rc *repoAddressCache) Write(name api.RepoName, address string) {
|
||||
rc.mu.Lock()
|
||||
defer rc.mu.Unlock()
|
||||
|
||||
if rc.cache == nil {
|
||||
rc.cache = make(map[api.RepoName]repoAddressCachedItem)
|
||||
}
|
||||
|
||||
// Add a jitter of ± 30 seconds around the repoAddressCacheTTL to avoid a spike of DB reads when
|
||||
// the cache expires for workload types that process repositories in bulk.
|
||||
jitter := time.Duration(ttlJitterGenerator.Int63n(2*30) - 30)
|
||||
expiresAt := time.Now().Add(repoAddressCacheTTL + (jitter * time.Second))
|
||||
rc.cache[name] = repoAddressCachedItem{address: address, expiresAt: expiresAt}
|
||||
}
|
||||
|
||||
type GitserverAddresses struct {
|
||||
db database.DB
|
||||
// The current list of gitserver addresses
|
||||
Addresses []string
|
||||
|
||||
@ -154,20 +234,91 @@ type GitserverAddresses struct {
|
||||
// ensures that, even if the number of gitservers changes, these repos will
|
||||
// not be moved.
|
||||
PinnedServers map[string]string
|
||||
|
||||
repoAddressCache *repoAddressCache
|
||||
}
|
||||
|
||||
// AddrForRepo returns the gitserver address to use for the given repo name.
|
||||
func (g GitserverAddresses) AddrForRepo(userAgent string, repo api.RepoName) string {
|
||||
addrForRepoInvoked.WithLabelValues(userAgent).Inc()
|
||||
|
||||
repo = protocol.NormalizeRepo(repo) // in case the caller didn't already normalize it
|
||||
rs := string(repo)
|
||||
|
||||
if pinnedAddr, ok := g.PinnedServers[rs]; ok {
|
||||
return pinnedAddr
|
||||
// TODO: Insert link to doc with decision tree once the PR is merged. For the time being see
|
||||
// decision tree in the PR description.
|
||||
func (g *GitserverAddresses) AddrForRepo(ctx context.Context, logger log.Logger, userAgent string, repoName api.RepoName) string {
|
||||
if logger == nil {
|
||||
logger = log.Scoped("GitserverAddresses.AddrForRepo", "a logger scoped to GitserverAddresses.AddrForRepo")
|
||||
logger.Warn("a nil logger being passed in the args, but handled gracefully, please investigate source of nil logger")
|
||||
}
|
||||
|
||||
return addrForKey(rs, g.Addresses)
|
||||
logger = logger.With(log.String("repoName", string(repoName)))
|
||||
|
||||
addrForRepoInvoked.WithLabelValues(userAgent).Inc()
|
||||
|
||||
getRepoAddress := func(repoName api.RepoName) string {
|
||||
// Normalizing the name in case the caller didn't.
|
||||
name := string(protocol.NormalizeRepo(repoName))
|
||||
|
||||
if pinnedAddr, ok := g.PinnedServers[name]; ok {
|
||||
return pinnedAddr
|
||||
}
|
||||
|
||||
return addrForKey(name, g.Addresses)
|
||||
}
|
||||
|
||||
repoConf := conf.Get().Repositories
|
||||
if repoConf == nil || len(repoConf.DeduplicateForks) == 0 {
|
||||
return getRepoAddress(repoName)
|
||||
}
|
||||
|
||||
if addr := g.getCachedRepoAddress(repoName); addr != "" {
|
||||
addrForRepoCacheHit.WithLabelValues(userAgent).Inc()
|
||||
return addr
|
||||
}
|
||||
|
||||
addrForRepoCacheMiss.WithLabelValues(userAgent).Inc()
|
||||
|
||||
repo, err := g.db.Repos().GetByName(ctx, repoName)
|
||||
// Maybe the repo was not found or the repo is not a fork. The repo is also not in the
|
||||
// deduplicateforks list, so we do not need to look up a pool repo for this.
|
||||
//
|
||||
// Or in the worst case a SQL error occurred while looking up the repo. Either way, fallback to
|
||||
// regular name based hashing.
|
||||
if err != nil || (repo != nil && !repo.Fork) {
|
||||
return g.withUpdateCache(repoName, getRepoAddress(repoName))
|
||||
}
|
||||
|
||||
poolRepo, err := g.db.GitserverRepos().GetPoolRepo(ctx, repo.Name)
|
||||
if err != nil {
|
||||
logger.Warn("failed to get pool repo (if fork deduplication is enabled this repo may not be colocated on the same shard as the parent/other forks)", log.Error(err))
|
||||
return g.withUpdateCache(repoName, getRepoAddress(repoName))
|
||||
}
|
||||
|
||||
if poolRepo != nil {
|
||||
return g.withUpdateCache(poolRepo.RepoName, getRepoAddress(poolRepo.RepoName))
|
||||
}
|
||||
|
||||
return getRepoAddress(repoName)
|
||||
}
|
||||
|
||||
func (g *GitserverAddresses) withUpdateCache(repoName api.RepoName, address string) string {
|
||||
if g.repoAddressCache == nil {
|
||||
g.repoAddressCache = &repoAddressCache{cache: make(map[api.RepoName]repoAddressCachedItem)}
|
||||
}
|
||||
|
||||
g.repoAddressCache.Write(repoName, address)
|
||||
return address
|
||||
}
|
||||
|
||||
func (g *GitserverAddresses) getCachedRepoAddress(repoName api.RepoName) string {
|
||||
if g.repoAddressCache == nil {
|
||||
g.repoAddressCache = &repoAddressCache{cache: make(map[api.RepoName]repoAddressCachedItem)}
|
||||
return ""
|
||||
}
|
||||
|
||||
item := g.repoAddressCache.Read(repoName)
|
||||
if item == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return item.address
|
||||
}
|
||||
|
||||
// addrForKey returns the gitserver address to use for the given string key,
|
||||
@ -180,12 +331,14 @@ func addrForKey(key string, addrs []string) string {
|
||||
|
||||
type GitserverConns struct {
|
||||
GitserverAddresses
|
||||
|
||||
logger log.Logger
|
||||
// invariant: there is one conn for every gitserver address
|
||||
grpcConns map[string]connAndErr
|
||||
}
|
||||
|
||||
func (g *GitserverConns) ConnForRepo(userAgent string, repo api.RepoName) (*grpc.ClientConn, error) {
|
||||
addr := g.AddrForRepo(userAgent, repo)
|
||||
func (g *GitserverConns) ConnForRepo(ctx context.Context, userAgent string, repo api.RepoName) (*grpc.ClientConn, error) {
|
||||
addr := g.AddrForRepo(ctx, g.logger, userAgent, repo)
|
||||
ce, ok := g.grpcConns[addr]
|
||||
if !ok {
|
||||
return nil, errors.Newf("no gRPC connection found for address %q", addr)
|
||||
@ -214,24 +367,26 @@ func (c *connAndErr) GRPCClient() (proto.GitserverServiceClient, error) {
|
||||
}
|
||||
|
||||
type atomicGitServerConns struct {
|
||||
db database.DB
|
||||
logger log.Logger
|
||||
conns atomic.Pointer[GitserverConns]
|
||||
watchOnce sync.Once
|
||||
}
|
||||
|
||||
func (a *atomicGitServerConns) AddrForRepo(userAgent string, repo api.RepoName) string {
|
||||
return a.get().AddrForRepo(userAgent, repo)
|
||||
func (a *atomicGitServerConns) AddrForRepo(ctx context.Context, userAgent string, repo api.RepoName) string {
|
||||
return a.get().AddrForRepo(ctx, a.logger, userAgent, repo)
|
||||
}
|
||||
|
||||
func (a *atomicGitServerConns) ClientForRepo(userAgent string, repo api.RepoName) (proto.GitserverServiceClient, error) {
|
||||
conn, err := a.get().ConnForRepo(userAgent, repo)
|
||||
func (a *atomicGitServerConns) ClientForRepo(ctx context.Context, userAgent string, repo api.RepoName) (proto.GitserverServiceClient, error) {
|
||||
conn, err := a.get().ConnForRepo(ctx, userAgent, repo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return proto.NewGitserverServiceClient(conn), nil
|
||||
}
|
||||
|
||||
func (a *atomicGitServerConns) ConnForRepo(userAgent string, repo api.RepoName) (*grpc.ClientConn, error) {
|
||||
return a.get().ConnForRepo(userAgent, repo)
|
||||
func (a *atomicGitServerConns) ConnForRepo(ctx context.Context, userAgent string, repo api.RepoName) (*grpc.ClientConn, error) {
|
||||
return a.get().ConnForRepo(ctx, userAgent, repo)
|
||||
}
|
||||
|
||||
func (a *atomicGitServerConns) Addresses() []AddressWithClient {
|
||||
@ -263,7 +418,8 @@ func (a *atomicGitServerConns) initOnce() {
|
||||
|
||||
func (a *atomicGitServerConns) update(cfg *conf.Unified) {
|
||||
after := GitserverConns{
|
||||
GitserverAddresses: NewGitserverAddressesFromConf(cfg),
|
||||
logger: a.logger,
|
||||
GitserverAddresses: NewGitserverAddresses(a.db, cfg),
|
||||
grpcConns: nil, // to be filled in
|
||||
}
|
||||
|
||||
|
||||
@ -1,52 +1,227 @@
|
||||
package gitserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sourcegraph/log/logtest"
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/types"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
"github.com/sourcegraph/sourcegraph/schema"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAddrForRepo(t *testing.T) {
|
||||
db := database.NewMockDB()
|
||||
ga := GitserverAddresses{
|
||||
db: db,
|
||||
Addresses: []string{"gitserver-1", "gitserver-2", "gitserver-3"},
|
||||
PinnedServers: map[string]string{
|
||||
"repo2": "gitserver-1",
|
||||
},
|
||||
}
|
||||
ctx := context.Background()
|
||||
logger := logtest.Scoped(t)
|
||||
|
||||
t.Run("no deduplicated forks", func(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
repo api.RepoName
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "repo1",
|
||||
repo: api.RepoName("repo1"),
|
||||
want: "gitserver-3",
|
||||
},
|
||||
{
|
||||
name: "check we normalise",
|
||||
repo: api.RepoName("repo1.git"),
|
||||
want: "gitserver-3",
|
||||
},
|
||||
{
|
||||
name: "another repo",
|
||||
repo: api.RepoName("github.com/sourcegraph/sourcegraph.git"),
|
||||
want: "gitserver-2",
|
||||
},
|
||||
{
|
||||
name: "pinned repo", // different server address that the hashing function would normally yield
|
||||
repo: api.RepoName("repo2"),
|
||||
want: "gitserver-1",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := ga.AddrForRepo(ctx, logger, "gitserver", tc.repo)
|
||||
if got != tc.want {
|
||||
t.Fatalf("Want %q, got %q", tc.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("deduplicated forks", func(t *testing.T) {
|
||||
parentRepo := api.RepoName("github.com/sourcegraph/sourcegraph")
|
||||
forkedRepo := api.RepoName("github.com/forked/sourcegraph")
|
||||
|
||||
// At this point no additional config has been set so we expect to get the hashed names
|
||||
// directly.
|
||||
//
|
||||
// This serves both as a test and a test documentation on what shard to expect for which
|
||||
// repo.
|
||||
shardParentRepo := ga.AddrForRepo(ctx, logger, "gitserver", parentRepo)
|
||||
require.Equal(t, "gitserver-2", shardParentRepo)
|
||||
|
||||
shardForkedRepo := ga.AddrForRepo(ctx, logger, "gitserver", forkedRepo)
|
||||
require.Equal(t, "gitserver-3", shardForkedRepo)
|
||||
|
||||
conf.Mock(&conf.Unified{
|
||||
SiteConfiguration: schema.SiteConfiguration{
|
||||
Repositories: &schema.Repositories{
|
||||
DeduplicateForks: []string{
|
||||
string(parentRepo),
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
getPoolRepoFuncDefaultReturn func() (*types.PoolRepo, error)
|
||||
expectedShardParentRepo string
|
||||
expectedShardForkedRepo string
|
||||
}{
|
||||
{
|
||||
name: "valid pool repo",
|
||||
getPoolRepoFuncDefaultReturn: func() (*types.PoolRepo, error) { return &types.PoolRepo{RepoName: parentRepo}, nil },
|
||||
expectedShardParentRepo: shardParentRepo,
|
||||
expectedShardForkedRepo: shardParentRepo,
|
||||
},
|
||||
{
|
||||
name: "no pool repo",
|
||||
getPoolRepoFuncDefaultReturn: func() (*types.PoolRepo, error) { return nil, nil },
|
||||
expectedShardParentRepo: shardParentRepo,
|
||||
expectedShardForkedRepo: shardForkedRepo,
|
||||
},
|
||||
{
|
||||
name: "get pool repo returns an error",
|
||||
getPoolRepoFuncDefaultReturn: func() (*types.PoolRepo, error) { return nil, errors.New("mocked error") },
|
||||
expectedShardParentRepo: shardParentRepo,
|
||||
expectedShardForkedRepo: shardForkedRepo,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
repos := database.NewMockRepoStore()
|
||||
repos.GetByNameFunc.PushReturn(
|
||||
&types.Repo{
|
||||
ID: api.RepoID(1),
|
||||
Name: parentRepo,
|
||||
Fork: false,
|
||||
}, nil,
|
||||
)
|
||||
repos.GetByNameFunc.PushReturn(
|
||||
&types.Repo{
|
||||
ID: api.RepoID(2),
|
||||
Name: forkedRepo,
|
||||
Fork: true,
|
||||
}, nil,
|
||||
)
|
||||
db.ReposFunc.SetDefaultReturn(repos)
|
||||
|
||||
gs := database.NewMockGitserverRepoStore()
|
||||
gs.GetPoolRepoFunc.SetDefaultReturn(tc.getPoolRepoFuncDefaultReturn())
|
||||
db.GitserverReposFunc.SetDefaultReturn(gs)
|
||||
|
||||
require.Equal(t, tc.expectedShardParentRepo, ga.AddrForRepo(ctx, logger, "gitserver", parentRepo))
|
||||
require.Equal(t, tc.expectedShardForkedRepo, ga.AddrForRepo(ctx, logger, "gitserver", forkedRepo))
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestRepoAddressCache(t *testing.T) {
|
||||
repoAddrCache := repoAddressCache{}
|
||||
|
||||
// Read an object that does not exist in the cache.
|
||||
item := repoAddrCache.Read("foo")
|
||||
require.Nil(t, item)
|
||||
|
||||
// Now insert an item to the cache.
|
||||
repoName := api.RepoName("github.com/foo/bar")
|
||||
addr := "127.0.0.1:3080"
|
||||
repoAddrCache.Write(repoName, addr)
|
||||
|
||||
cachedItem := repoAddrCache.cache[repoName]
|
||||
|
||||
item = repoAddrCache.Read(repoName)
|
||||
require.NotNil(t, item)
|
||||
require.Equal(t, addr, item.address)
|
||||
require.Equal(t, cachedItem.expiresAt, item.expiresAt)
|
||||
|
||||
// Now verify that the item in the cache when read again is the same, that is we did not update
|
||||
// the cached on the Read path.
|
||||
//
|
||||
// The following test may seem unnecessary looking at the current design of the cache, but the
|
||||
// first version of this cache during development was updating the timestamp of the cached item
|
||||
// on the read path. This test is to ensure that is not happening anymore.
|
||||
item2 := repoAddrCache.cache[repoName]
|
||||
require.NotNil(t, item2)
|
||||
require.Equal(t, item.address, item2.address)
|
||||
require.Equal(t, item.expiresAt, item2.expiresAt)
|
||||
|
||||
// Mock now to be 17 minutes in the past such that the next time the item is read, it will be
|
||||
// expired.
|
||||
now := time.Now().Add(-17 * time.Minute)
|
||||
repoAddrCache.cache[repoName] = repoAddressCachedItem{
|
||||
address: addr,
|
||||
expiresAt: now,
|
||||
}
|
||||
|
||||
require.Nil(t, repoAddrCache.Read(repoName))
|
||||
}
|
||||
|
||||
func TestGitserverAddresses_withUpdateCache(t *testing.T) {
|
||||
ga := GitserverAddresses{}
|
||||
|
||||
// Ensures that a nil repoAddressCache will not cause a panic if consumers of GitserverAddresses
|
||||
// do not initialise a cache.
|
||||
require.Nil(t, ga.repoAddressCache)
|
||||
|
||||
repo := api.RepoName("repo1")
|
||||
addr := "address1"
|
||||
gotAddress := ga.withUpdateCache(repo, addr)
|
||||
require.Equal(t, addr, gotAddress)
|
||||
|
||||
item := ga.repoAddressCache.Read(repo)
|
||||
require.Equal(t, addr, item.address)
|
||||
}
|
||||
|
||||
func TestGitserverAddresses_getCachedRepoAddress(t *testing.T) {
|
||||
db := database.NewMockDB()
|
||||
ga := &GitserverAddresses{
|
||||
db: db,
|
||||
Addresses: []string{"gitserver-1", "gitserver-2", "gitserver-3"},
|
||||
PinnedServers: map[string]string{
|
||||
"repo2": "gitserver-1",
|
||||
},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
repo api.RepoName
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "repo1",
|
||||
repo: api.RepoName("repo1"),
|
||||
want: "gitserver-3",
|
||||
},
|
||||
{
|
||||
name: "check we normalise",
|
||||
repo: api.RepoName("repo1.git"),
|
||||
want: "gitserver-3",
|
||||
},
|
||||
{
|
||||
name: "another repo",
|
||||
repo: api.RepoName("github.com/sourcegraph/sourcegraph.git"),
|
||||
want: "gitserver-2",
|
||||
},
|
||||
{
|
||||
name: "pinned repo", // different server address that the hashing function would normally yield
|
||||
repo: api.RepoName("repo2"),
|
||||
want: "gitserver-1",
|
||||
},
|
||||
}
|
||||
require.Nil(t, ga.repoAddressCache)
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := ga.AddrForRepo("gitserver", tc.repo)
|
||||
if got != tc.want {
|
||||
t.Fatalf("Want %q, got %q", tc.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
repo := api.RepoName("repo1")
|
||||
require.Equal(t, "", ga.getCachedRepoAddress(repo))
|
||||
|
||||
require.NotNil(t, ga.repoAddressCache)
|
||||
require.Equal(t, "", ga.getCachedRepoAddress(repo))
|
||||
|
||||
addr := "address1"
|
||||
ga.repoAddressCache.Write(repo, addr)
|
||||
require.Equal(t, addr, ga.getCachedRepoAddress(repo))
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@ -51,7 +52,6 @@ var (
|
||||
clientFactory = httpcli.NewInternalClientFactory("gitserver")
|
||||
defaultDoer, _ = clientFactory.Doer()
|
||||
defaultLimiter = limiter.New(500)
|
||||
conns = &atomicGitServerConns{}
|
||||
)
|
||||
|
||||
var ClientMocks, emptyClientMocks struct {
|
||||
@ -61,6 +61,23 @@ var ClientMocks, emptyClientMocks struct {
|
||||
LocalGitCommandReposDir string
|
||||
}
|
||||
|
||||
// initConnsOnce is used internally in getAtomicGitServerConns. Only use it there.
|
||||
var initConnsOnce sync.Once
|
||||
|
||||
// conns is the global variable holding a reference to the gitserver connections.
|
||||
//
|
||||
// WARNING: Do not use it directly. Instead use getAtomicGitServerConns to ensure conns is
|
||||
// initialised correctly.
|
||||
var conns *atomicGitServerConns
|
||||
|
||||
func getAtomicGitserverConns(logger sglog.Logger, db database.DB) *atomicGitServerConns {
|
||||
initConnsOnce.Do(func() {
|
||||
conns = &atomicGitServerConns{db: db, logger: logger}
|
||||
})
|
||||
|
||||
return conns
|
||||
}
|
||||
|
||||
// ResetClientMocks clears the mock functions set on Mocks (so that subsequent
|
||||
// tests don't inadvertently use them).
|
||||
func ResetClientMocks() {
|
||||
@ -73,19 +90,20 @@ var _ Client = &clientImplementor{}
|
||||
// It allows for mocking out the client source in tests.
|
||||
type ClientSource interface {
|
||||
// ClientForRepo returns a Client for the given repo.
|
||||
ClientForRepo(userAgent string, repo api.RepoName) (proto.GitserverServiceClient, error)
|
||||
ClientForRepo(ctx context.Context, userAgent string, repo api.RepoName) (proto.GitserverServiceClient, error)
|
||||
// ConnForRepo returns a grpc.ClientConn for the given repo.
|
||||
ConnForRepo(userAgent string, repo api.RepoName) (*grpc.ClientConn, error)
|
||||
ConnForRepo(ctx context.Context, userAgent string, repo api.RepoName) (*grpc.ClientConn, error)
|
||||
// AddrForRepo returns the address of the gitserver for the given repo.
|
||||
AddrForRepo(userAgent string, repo api.RepoName) string
|
||||
AddrForRepo(ctx context.Context, userAgent string, repo api.RepoName) string
|
||||
// Address the current list of gitserver addresses.
|
||||
Addresses() []AddressWithClient
|
||||
}
|
||||
|
||||
// NewClient returns a new gitserver.Client.
|
||||
func NewClient(_ database.DB) Client {
|
||||
func NewClient(db database.DB) Client {
|
||||
logger := sglog.Scoped("GitserverClient", "Client to talk from other services to Gitserver")
|
||||
return &clientImplementor{
|
||||
logger: sglog.Scoped("NewClient", "returns a new gitserver.Client"),
|
||||
logger: logger,
|
||||
httpClient: defaultDoer,
|
||||
HTTPLimiter: defaultLimiter,
|
||||
// Use the binary name for userAgent. This should effectively identify
|
||||
@ -93,7 +111,7 @@ func NewClient(_ database.DB) Client {
|
||||
// frontend internal API)
|
||||
userAgent: filepath.Base(os.Args[0]),
|
||||
operations: getOperations(),
|
||||
clientSource: conns,
|
||||
clientSource: getAtomicGitserverConns(logger, db),
|
||||
}
|
||||
}
|
||||
|
||||
@ -229,7 +247,7 @@ type CommitLog struct {
|
||||
|
||||
type Client interface {
|
||||
// AddrForRepo returns the gitserver address to use for the given repo name.
|
||||
AddrForRepo(api.RepoName) string
|
||||
AddrForRepo(ctx context.Context, repoName api.RepoName) string
|
||||
|
||||
// ArchiveReader streams back the file contents of an archived git repo.
|
||||
ArchiveReader(ctx context.Context, checker authz.SubRepoPermissionChecker, repo api.RepoName, options ArchiveOptions) (io.ReadCloser, error)
|
||||
@ -457,16 +475,17 @@ func (c *clientImplementor) Addrs() []string {
|
||||
return addrs
|
||||
}
|
||||
|
||||
func (c *clientImplementor) AddrForRepo(repo api.RepoName) string {
|
||||
return c.clientSource.AddrForRepo(c.userAgent, repo)
|
||||
func (c *clientImplementor) AddrForRepo(ctx context.Context, repo api.RepoName) string {
|
||||
return c.clientSource.AddrForRepo(ctx, c.userAgent, repo)
|
||||
}
|
||||
|
||||
func (c *clientImplementor) ClientForRepo(repo api.RepoName) (proto.GitserverServiceClient, error) {
|
||||
return c.clientSource.ClientForRepo(c.userAgent, repo)
|
||||
func (c *clientImplementor) ClientForRepo(ctx context.Context, repo api.RepoName) (proto.GitserverServiceClient, error) {
|
||||
return c.clientSource.ClientForRepo(ctx, c.userAgent, repo)
|
||||
}
|
||||
|
||||
func (c *clientImplementor) ConnForRepo(repo api.RepoName) (*grpc.ClientConn, error) {
|
||||
return c.clientSource.ConnForRepo(c.userAgent, repo)
|
||||
// TODO: Looks like this is not used anywhere.
|
||||
func (c *clientImplementor) ConnForRepo(ctx context.Context, repo api.RepoName) (*grpc.ClientConn, error) {
|
||||
return c.clientSource.ConnForRepo(ctx, c.userAgent, repo)
|
||||
}
|
||||
|
||||
// ArchiveOptions contains options for the Archive func.
|
||||
@ -554,7 +573,7 @@ func (a *archiveReader) Close() error {
|
||||
|
||||
// archiveURL returns a URL from which an archive of the given Git repository can
|
||||
// be downloaded from.
|
||||
func (c *clientImplementor) archiveURL(repo api.RepoName, opt ArchiveOptions) *url.URL {
|
||||
func (c *clientImplementor) archiveURL(ctx context.Context, repo api.RepoName, opt ArchiveOptions) *url.URL {
|
||||
q := url.Values{
|
||||
"repo": {string(repo)},
|
||||
"treeish": {opt.Treeish},
|
||||
@ -565,7 +584,7 @@ func (c *clientImplementor) archiveURL(repo api.RepoName, opt ArchiveOptions) *u
|
||||
q.Add("path", string(pathspec))
|
||||
}
|
||||
|
||||
addrForRepo := c.AddrForRepo(repo)
|
||||
addrForRepo := c.AddrForRepo(ctx, repo)
|
||||
return &url.URL{
|
||||
Scheme: "http",
|
||||
Host: addrForRepo,
|
||||
@ -603,7 +622,7 @@ func (c *RemoteGitCommand) sendExec(ctx context.Context) (_ io.ReadCloser, err e
|
||||
}
|
||||
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.execer.ClientForRepo(repoName)
|
||||
client, err := c.execer.ClientForRepo(ctx, repoName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -760,7 +779,7 @@ func (c *clientImplementor) Search(ctx context.Context, args *protocol.SearchReq
|
||||
repoName := protocol.NormalizeRepo(args.Repo)
|
||||
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.ClientForRepo(repoName)
|
||||
client, err := c.ClientForRepo(ctx, repoName)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -788,7 +807,7 @@ func (c *clientImplementor) Search(ctx context.Context, args *protocol.SearchReq
|
||||
}
|
||||
}
|
||||
|
||||
addrForRepo := c.AddrForRepo(repoName)
|
||||
addrForRepo := c.AddrForRepo(ctx, repoName)
|
||||
|
||||
protocol.RegisterGob()
|
||||
var buf bytes.Buffer
|
||||
@ -881,7 +900,7 @@ func (c *clientImplementor) P4Exec(ctx context.Context, host, user, password str
|
||||
Args: args,
|
||||
}
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.ClientForRepo("")
|
||||
client, err := c.ClientForRepo(ctx, "")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -1074,7 +1093,7 @@ func (c *clientImplementor) BatchLog(ctx context.Context, opts BatchLogOptions,
|
||||
for _, repoCommit := range opts.RepoCommits {
|
||||
addr, ok := addrsByName[repoCommit.Repo]
|
||||
if !ok {
|
||||
addr = c.AddrForRepo(repoCommit.Repo)
|
||||
addr = c.AddrForRepo(ctx, repoCommit.Repo)
|
||||
addrsByName[repoCommit.Repo] = addr
|
||||
}
|
||||
|
||||
@ -1109,7 +1128,7 @@ func (c *clientImplementor) BatchLog(ctx context.Context, opts BatchLogOptions,
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := c.ClientForRepo(repoCommits[0].Repo)
|
||||
client, err := c.ClientForRepo(ctx, repoCommits[0].Repo)
|
||||
if err != nil {
|
||||
err = errors.Wrapf(err, "getting gRPC client for repository %q", repoCommits[0].Repo)
|
||||
}
|
||||
@ -1165,7 +1184,7 @@ func (c *clientImplementor) RequestRepoUpdate(ctx context.Context, repo api.Repo
|
||||
}
|
||||
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.ClientForRepo(repo)
|
||||
client, err := c.ClientForRepo(ctx, repo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1203,7 +1222,7 @@ func (c *clientImplementor) RequestRepoUpdate(ctx context.Context, repo api.Repo
|
||||
// RequestRepoClone requests that the gitserver does an asynchronous clone of the repository.
|
||||
func (c *clientImplementor) RequestRepoClone(ctx context.Context, repo api.RepoName) (*protocol.RepoCloneResponse, error) {
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.ClientForRepo(repo)
|
||||
client, err := c.ClientForRepo(ctx, repo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1256,7 +1275,7 @@ func (c *clientImplementor) IsRepoCloneable(ctx context.Context, repo api.RepoNa
|
||||
var resp protocol.IsRepoCloneableResponse
|
||||
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.ClientForRepo(repo)
|
||||
client, err := c.ClientForRepo(ctx, repo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1336,7 +1355,7 @@ func (c *clientImplementor) RepoCloneProgress(ctx context.Context, repos ...api.
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
shards := make(map[proto.GitserverServiceClient]*proto.RepoCloneProgressRequest, (len(repos)/numPossibleShards)*2) // 2x because it may not be a perfect division
|
||||
for _, r := range repos {
|
||||
client, err := c.ClientForRepo(r)
|
||||
client, err := c.ClientForRepo(ctx, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1386,7 +1405,7 @@ func (c *clientImplementor) RepoCloneProgress(ctx context.Context, repos ...api.
|
||||
shards := make(map[string]*protocol.RepoCloneProgressRequest, (len(repos)/numPossibleShards)*2) // 2x because it may not be a perfect division
|
||||
|
||||
for _, r := range repos {
|
||||
addr := c.AddrForRepo(r)
|
||||
addr := c.AddrForRepo(ctx, r)
|
||||
shard := shards[addr]
|
||||
|
||||
if shard == nil {
|
||||
@ -1508,7 +1527,7 @@ func (c *clientImplementor) Remove(ctx context.Context, repo api.RepoName) error
|
||||
// the old name in order to land on the correct gitserver instance
|
||||
repo = api.UndeletedRepoName(repo)
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.ClientForRepo(repo)
|
||||
client, err := c.ClientForRepo(ctx, repo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1517,7 +1536,7 @@ func (c *clientImplementor) Remove(ctx context.Context, repo api.RepoName) error
|
||||
})
|
||||
return err
|
||||
} else {
|
||||
addr := c.AddrForRepo(repo)
|
||||
addr := c.AddrForRepo(ctx, repo)
|
||||
return c.RemoveFrom(ctx, repo, addr)
|
||||
}
|
||||
}
|
||||
@ -1555,7 +1574,7 @@ func (c *clientImplementor) httpPost(ctx context.Context, repo api.RepoName, op
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addrForRepo := c.AddrForRepo(repo)
|
||||
addrForRepo := c.AddrForRepo(ctx, repo)
|
||||
uri := "http://" + addrForRepo + "/" + op
|
||||
return c.do(ctx, repo, "POST", uri, b)
|
||||
}
|
||||
@ -1599,7 +1618,7 @@ func (c *clientImplementor) do(ctx context.Context, repoForTracing api.RepoName,
|
||||
|
||||
func (c *clientImplementor) CreateCommitFromPatch(ctx context.Context, req protocol.CreateCommitFromPatchRequest) (*protocol.CreateCommitFromPatchResponse, error) {
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.ClientForRepo(req.Repo)
|
||||
client, err := c.ClientForRepo(ctx, req.Repo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1654,7 +1673,7 @@ func (c *clientImplementor) GetObject(ctx context.Context, repo api.RepoName, ob
|
||||
ObjectName: objectName,
|
||||
}
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.ClientForRepo(req.Repo)
|
||||
client, err := c.ClientForRepo(ctx, req.Repo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -373,12 +373,13 @@ func TestClient_ListGitolite_ProtoRoundTrip(t *testing.T) {
|
||||
|
||||
func TestClient_Remove(t *testing.T) {
|
||||
test := func(t *testing.T, called *bool) {
|
||||
db := database.NewMockDB()
|
||||
repo := api.RepoName("github.com/sourcegraph/sourcegraph")
|
||||
addrs := []string{"172.16.8.1:8080", "172.16.8.2:8080"}
|
||||
|
||||
expected := "http://172.16.8.1:8080"
|
||||
|
||||
source := gitserver.NewTestClientSource(t, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
source := gitserver.NewTestClientSource(t, db, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
o.ClientFunc = func(cc *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
mockRepoDelete := func(ctx context.Context, in *proto.RepoDeleteRequest, opts ...grpc.CallOption) (*proto.RepoDeleteResponse, error) {
|
||||
*called = true
|
||||
@ -608,13 +609,14 @@ func TestClient_ArchiveReader(t *testing.T) {
|
||||
},
|
||||
})
|
||||
for _, test := range tests {
|
||||
db := database.NewMockDB()
|
||||
repoName := api.RepoName(test.name)
|
||||
called := false
|
||||
|
||||
mkClient := func(t *testing.T, addrs []string) gitserver.Client {
|
||||
t.Helper()
|
||||
|
||||
source := gitserver.NewTestClientSource(t, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
source := gitserver.NewTestClientSource(t, db, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
o.ClientFunc = func(cc *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
base := proto.NewGitserverServiceClient(cc)
|
||||
|
||||
@ -654,13 +656,14 @@ func TestClient_ArchiveReader(t *testing.T) {
|
||||
})
|
||||
|
||||
for _, test := range tests {
|
||||
db := database.NewMockDB()
|
||||
repoName := api.RepoName(test.name)
|
||||
called := false
|
||||
|
||||
mkClient := func(t *testing.T, addrs []string) gitserver.Client {
|
||||
t.Helper()
|
||||
|
||||
source := gitserver.NewTestClientSource(t, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
source := gitserver.NewTestClientSource(t, db, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
o.ClientFunc = func(cc *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
mockArchive := func(ctx context.Context, in *proto.ArchiveRequest, opts ...grpc.CallOption) (proto.GitserverService_ArchiveClient, error) {
|
||||
called = true
|
||||
@ -875,6 +878,7 @@ func TestClient_P4ExecGRPC(t *testing.T) {
|
||||
|
||||
t.Run("GRPC", func(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
db := database.NewMockDB()
|
||||
conf.Mock(&conf.Unified{
|
||||
SiteConfiguration: schema.SiteConfiguration{
|
||||
ExperimentalFeatures: &schema.ExperimentalFeatures{
|
||||
@ -889,7 +893,7 @@ func TestClient_P4ExecGRPC(t *testing.T) {
|
||||
addrs := []string{gitserverAddr}
|
||||
called := false
|
||||
|
||||
source := gitserver.NewTestClientSource(t, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
source := gitserver.NewTestClientSource(t, db, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
o.ClientFunc = func(cc *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
mockP4Exec := func(ctx context.Context, in *proto.P4ExecRequest, opts ...grpc.CallOption) (proto.GitserverService_P4ExecClient, error) {
|
||||
called = true
|
||||
@ -999,6 +1003,7 @@ func TestClient_P4Exec(t *testing.T) {
|
||||
|
||||
}
|
||||
t.Run("HTTP", func(t *testing.T) {
|
||||
db := database.NewMockDB()
|
||||
for _, test := range tests {
|
||||
conf.Mock(&conf.Unified{
|
||||
SiteConfiguration: schema.SiteConfiguration{
|
||||
@ -1013,7 +1018,7 @@ func TestClient_P4Exec(t *testing.T) {
|
||||
|
||||
u, _ := url.Parse(testServer.URL)
|
||||
addrs := []string{u.Host}
|
||||
source := gitserver.NewTestClientSource(t, addrs)
|
||||
source := gitserver.NewTestClientSource(t, db, addrs)
|
||||
called := false
|
||||
|
||||
cli := gitserver.NewTestClient(&http.Client{}, source)
|
||||
@ -1090,7 +1095,7 @@ func TestClient_ResolveRevisions(t *testing.T) {
|
||||
|
||||
u, _ := url.Parse(srv.URL)
|
||||
addrs := []string{u.Host}
|
||||
source := gitserver.NewTestClientSource(t, addrs)
|
||||
source := gitserver.NewTestClientSource(t, db, addrs)
|
||||
|
||||
cli := gitserver.NewTestClient(&http.Client{}, source)
|
||||
|
||||
@ -1125,7 +1130,8 @@ func TestClient_BatchLogGRPC(t *testing.T) {
|
||||
|
||||
called := false
|
||||
|
||||
source := gitserver.NewTestClientSource(t, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
o.ClientFunc = func(cc *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
mockBatchLog := func(ctx context.Context, in *proto.BatchLogRequest, opts ...grpc.CallOption) (*proto.BatchLogResponse, error) {
|
||||
called = true
|
||||
@ -1210,8 +1216,9 @@ func TestClient_BatchLogGRPC(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestClient_BatchLog(t *testing.T) {
|
||||
db := database.NewMockDB()
|
||||
addrs := []string{"172.16.8.1:8080", "172.16.8.2:8080", "172.16.8.3:8080"}
|
||||
source := gitserver.NewTestClientSource(t, addrs)
|
||||
source := gitserver.NewTestClientSource(t, db, addrs)
|
||||
|
||||
cli := gitserver.NewTestClient(
|
||||
httpcli.DoerFunc(func(r *http.Request) (*http.Response, error) {
|
||||
@ -1347,7 +1354,8 @@ func TestClient_ReposStats(t *testing.T) {
|
||||
GitDirBytes: 1337,
|
||||
}
|
||||
|
||||
source := gitserver.NewTestClientSource(t, addrs)
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, addrs)
|
||||
cli := gitserver.NewTestClient(
|
||||
httpcli.DoerFunc(func(r *http.Request) (*http.Response, error) {
|
||||
switch r.URL.String() {
|
||||
@ -1388,7 +1396,8 @@ func TestClient_ReposStatsGRPC(t *testing.T) {
|
||||
GitDirBytes: 1337,
|
||||
}
|
||||
called := false
|
||||
source := gitserver.NewTestClientSource(t, []string{gitserverAddr}, func(o *gitserver.TestClientSourceOptions) {
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, []string{gitserverAddr}, func(o *gitserver.TestClientSourceOptions) {
|
||||
o.ClientFunc = func(cc *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
mockRepoStats := func(ctx context.Context, in *proto.ReposStatsRequest, opts ...grpc.CallOption) (*proto.ReposStatsResponse, error) {
|
||||
called = true
|
||||
@ -1482,7 +1491,8 @@ func TestClient_IsRepoCloneableGRPC(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
|
||||
called := false
|
||||
source := gitserver.NewTestClientSource(t, []string{gitserverAddr}, func(o *gitserver.TestClientSourceOptions) {
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, []string{gitserverAddr}, func(o *gitserver.TestClientSourceOptions) {
|
||||
o.ClientFunc = func(cc *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
mockIsRepoCloneable := func(ctx context.Context, in *proto.IsRepoCloneableRequest, opts ...grpc.CallOption) (*proto.IsRepoCloneableResponse, error) {
|
||||
called = true
|
||||
@ -1517,7 +1527,8 @@ func TestClient_IsRepoCloneableGRPC(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
|
||||
called := false
|
||||
source := gitserver.NewTestClientSource(t, []string{gitserverAddr}, func(o *gitserver.TestClientSourceOptions) {
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, []string{gitserverAddr}, func(o *gitserver.TestClientSourceOptions) {
|
||||
o.ClientFunc = func(cc *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
mockIsRepoCloneable := func(ctx context.Context, in *proto.IsRepoCloneableRequest, opts ...grpc.CallOption) (*proto.IsRepoCloneableResponse, error) {
|
||||
called = true
|
||||
@ -1594,7 +1605,7 @@ func TestGitserverClient_RepoClone(t *testing.T) {
|
||||
u, _ := url.Parse(srv.URL)
|
||||
addrs := []string{u.Host}
|
||||
called := false
|
||||
source := gitserver.NewTestClientSource(t, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
source := gitserver.NewTestClientSource(t, db, addrs, func(o *gitserver.TestClientSourceOptions) {
|
||||
o.ClientFunc = func(cc *grpc.ClientConn) proto.GitserverServiceClient {
|
||||
mockRepoClone := func(ctx context.Context, in *proto.RepoCloneRequest, opts ...grpc.CallOption) (*proto.RepoCloneResponse, error) {
|
||||
base := proto.NewGitserverServiceClient(cc)
|
||||
|
||||
@ -2507,7 +2507,7 @@ func (c *clientImplementor) ArchiveReader(
|
||||
}
|
||||
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
client, err := c.clientSource.ClientForRepo(c.userAgent, repo)
|
||||
client, err := c.clientSource.ClientForRepo(ctx, c.userAgent, repo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -2585,7 +2585,7 @@ func (c *clientImplementor) ArchiveReader(
|
||||
|
||||
} else {
|
||||
// Fall back to http request
|
||||
u := c.archiveURL(repo, options)
|
||||
u := c.archiveURL(ctx, repo, options)
|
||||
resp, err := c.do(ctx, repo, "POST", u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -176,8 +176,8 @@ type RemoteGitCommand struct {
|
||||
|
||||
type execer interface {
|
||||
httpPost(ctx context.Context, repo api.RepoName, op string, payload any) (resp *http.Response, err error)
|
||||
AddrForRepo(repo api.RepoName) string
|
||||
ClientForRepo(repo api.RepoName) (proto.GitserverServiceClient, error)
|
||||
AddrForRepo(ctx context.Context, repo api.RepoName) string
|
||||
ClientForRepo(ctx context.Context, repo api.RepoName) (proto.GitserverServiceClient, error)
|
||||
}
|
||||
|
||||
// DividedOutput runs the command and returns its standard output and standard error.
|
||||
|
||||
@ -8,6 +8,8 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/sourcegraph/log"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/extsvc/gitolite"
|
||||
proto "github.com/sourcegraph/sourcegraph/internal/gitserver/v1"
|
||||
internalgrpc "github.com/sourcegraph/sourcegraph/internal/grpc"
|
||||
@ -21,13 +23,16 @@ type GitoliteLister struct {
|
||||
userAgent string
|
||||
}
|
||||
|
||||
func NewGitoliteLister(cli httpcli.Doer) *GitoliteLister {
|
||||
func NewGitoliteLister(db database.DB, cli httpcli.Doer) *GitoliteLister {
|
||||
logger := log.Scoped("GitoliteLister", "logger scoped to a GitoliteLister")
|
||||
atomicConns := getAtomicGitserverConns(logger, db)
|
||||
|
||||
return &GitoliteLister{
|
||||
httpClient: cli,
|
||||
addrs: func() []string {
|
||||
return conns.get().Addresses
|
||||
return atomicConns.get().Addresses
|
||||
},
|
||||
grpcClient: conns,
|
||||
grpcClient: atomicConns,
|
||||
userAgent: filepath.Base(os.Args[0]),
|
||||
}
|
||||
}
|
||||
@ -39,7 +44,7 @@ func (c *GitoliteLister) ListRepos(ctx context.Context, gitoliteHost string) (li
|
||||
}
|
||||
if internalgrpc.IsGRPCEnabled(ctx) {
|
||||
|
||||
client, err := c.grpcClient.ClientForRepo(c.userAgent, "")
|
||||
client, err := c.grpcClient.ClientForRepo(ctx, c.userAgent, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/sourcegraph/log/logtest"
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
|
||||
@ -20,13 +21,25 @@ import (
|
||||
)
|
||||
|
||||
func TestClientSource_AddrMatchesTarget(t *testing.T) {
|
||||
source := NewTestClientSource(t, []string{"localhost:1234", "localhost:4321"})
|
||||
db := database.NewMockDB()
|
||||
repos := database.NewMockRepoStore()
|
||||
repos.GetByNameFunc.SetDefaultReturn(nil, nil)
|
||||
|
||||
gs := database.NewMockGitserverRepoStore()
|
||||
gs.GetPoolRepoFunc.SetDefaultReturn(nil, nil)
|
||||
|
||||
db.ReposFunc.SetDefaultReturn(repos)
|
||||
db.GitserverReposFunc.SetDefaultReturn(gs)
|
||||
|
||||
source := NewTestClientSource(t, db, []string{"localhost:1234", "localhost:4321"})
|
||||
testGitserverConns := source.(*testGitserverConns)
|
||||
conns := GitserverConns(*testGitserverConns.conns)
|
||||
conns.logger = logtest.Scoped(t)
|
||||
|
||||
ctx := context.Background()
|
||||
for _, repo := range []api.RepoName{"a", "b", "c", "d"} {
|
||||
addr := source.AddrForRepo("test", repo)
|
||||
conn, err := conns.ConnForRepo("test", repo)
|
||||
addr := source.AddrForRepo(ctx, "test", repo)
|
||||
conn, err := conns.ConnForRepo(ctx, "test", repo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -93,15 +106,22 @@ func TestClient_GRPCRouting(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestClient_AddrForRepo_UsesConfToRead_PinnedRepos(t *testing.T) {
|
||||
client := NewClient(database.NewMockDB())
|
||||
db := database.NewMockDB()
|
||||
client := NewClient(db)
|
||||
|
||||
cfg := newConfig(
|
||||
[]string{"gitserver1", "gitserver2"},
|
||||
map[string]string{"repo1": "gitserver2"},
|
||||
)
|
||||
conns.update(cfg)
|
||||
|
||||
addr := client.AddrForRepo("repo1")
|
||||
logger := logtest.NoOp(t)
|
||||
|
||||
atomicConns := getAtomicGitserverConns(logger, db)
|
||||
|
||||
atomicConns.update(cfg)
|
||||
|
||||
ctx := context.Background()
|
||||
addr := client.AddrForRepo(ctx, "repo1")
|
||||
require.Equal(t, "gitserver2", addr)
|
||||
|
||||
// simulate config change - site admin manually changes the pinned repo config
|
||||
@ -109,9 +129,9 @@ func TestClient_AddrForRepo_UsesConfToRead_PinnedRepos(t *testing.T) {
|
||||
[]string{"gitserver1", "gitserver2"},
|
||||
map[string]string{"repo1": "gitserver1"},
|
||||
)
|
||||
conns.update(cfg)
|
||||
atomicConns.update(cfg)
|
||||
|
||||
require.Equal(t, "gitserver1", client.AddrForRepo("repo1"))
|
||||
require.Equal(t, "gitserver1", client.AddrForRepo(ctx, "repo1"))
|
||||
}
|
||||
|
||||
func newConfig(addrs []string, pinned map[string]string) *conf.Unified {
|
||||
|
||||
@ -79,7 +79,7 @@ func TestGetCommits(t *testing.T) {
|
||||
nil,
|
||||
}
|
||||
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
commits, err := gitserver.NewTestClient(http.DefaultClient, source).GetCommits(ctx, nil, repoCommits, true)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error calling getCommits: %s", err)
|
||||
@ -117,7 +117,7 @@ func TestGetCommits(t *testing.T) {
|
||||
nil,
|
||||
nil,
|
||||
}
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
|
||||
commits, err := gitserver.NewTestClient(http.DefaultClient, source).GetCommits(ctx, getTestSubRepoPermsChecker("file1", "file3"), repoCommits, true)
|
||||
if err != nil {
|
||||
@ -150,7 +150,8 @@ func mustParseDate(s string, t *testing.T) *time.Time {
|
||||
}
|
||||
|
||||
func TestHead(t *testing.T) {
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
client := gitserver.NewTestClient(http.DefaultClient, source)
|
||||
t.Run("basic", func(t *testing.T) {
|
||||
gitCommands := []string{
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/gitserver"
|
||||
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
|
||||
"github.com/sourcegraph/sourcegraph/schema"
|
||||
@ -61,7 +62,8 @@ func TestGetObject(t *testing.T) {
|
||||
},
|
||||
})
|
||||
for label, test := range tests {
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
cli := gitserver.NewTestClient(http.DefaultClient, source)
|
||||
runTest(t, label, test, cli)
|
||||
}
|
||||
@ -76,7 +78,8 @@ func TestGetObject(t *testing.T) {
|
||||
},
|
||||
})
|
||||
for label, test := range tests {
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
cli := gitserver.NewTestClient(http.DefaultClient, source)
|
||||
runTest(t, label, test, cli)
|
||||
}
|
||||
|
||||
@ -99,7 +99,7 @@ func InitGitserver() {
|
||||
}()
|
||||
|
||||
serverAddress := l.Addr().String()
|
||||
source := gitserver.NewTestClientSource(&t, []string{serverAddress})
|
||||
source := gitserver.NewTestClientSource(&t, db, []string{serverAddress})
|
||||
testGitserverClient = gitserver.NewTestClient(httpcli.InternalDoer, source)
|
||||
GitserverAddresses = []string{serverAddress}
|
||||
}
|
||||
|
||||
@ -60,7 +60,9 @@ func TestRepository_FileSystem(t *testing.T) {
|
||||
third: "ba3c51080ed4a5b870952ecd7f0e15f255b24cca",
|
||||
},
|
||||
}
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
client := gitserver.NewTestClient(http.DefaultClient, source)
|
||||
for label, test := range tests {
|
||||
// notafile should not exist.
|
||||
@ -87,7 +89,7 @@ func TestRepository_FileSystem(t *testing.T) {
|
||||
if got, want := "ab771ba54f5571c99ffdae54f44acc7993d9f115", dir1Info.Sys().(gitdomain.ObjectInfo).OID().String(); got != want {
|
||||
t.Errorf("%s: got dir1 OID %q, want %q", label, got, want)
|
||||
}
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
client := gitserver.NewTestClient(http.DefaultClient, source)
|
||||
|
||||
// dir1 should contain one entry: file1.
|
||||
@ -253,7 +255,9 @@ func TestRepository_FileSystem_quoteChars(t *testing.T) {
|
||||
repo: MakeGitRepository(t, append([]string{"git config core.quotepath off"}, gitCommands...)...),
|
||||
},
|
||||
}
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
client := gitserver.NewTestClient(http.DefaultClient, source)
|
||||
for label, test := range tests {
|
||||
commitID, err := client.ResolveRevision(ctx, test.repo, "master", gitserver.ResolveRevisionOptions{})
|
||||
@ -313,7 +317,9 @@ func TestRepository_FileSystem_gitSubmodules(t *testing.T) {
|
||||
repo: MakeGitRepository(t, gitCommands...),
|
||||
},
|
||||
}
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
client := gitserver.NewTestClient(http.DefaultClient, source)
|
||||
for label, test := range tests {
|
||||
commitID, err := client.ResolveRevision(ctx, test.repo, "master", gitserver.ResolveRevisionOptions{})
|
||||
@ -411,7 +417,8 @@ func TestReadDir_SubRepoFiltering(t *testing.T) {
|
||||
t.Fatalf("unexpected error creating sub-repo perms client: %s", err)
|
||||
}
|
||||
|
||||
source := gitserver.NewTestClientSource(t, GitserverAddresses)
|
||||
db := database.NewMockDB()
|
||||
source := gitserver.NewTestClientSource(t, db, GitserverAddresses)
|
||||
client := gitserver.NewTestClient(http.DefaultClient, source)
|
||||
files, err := client.ReadDir(ctx, checker, repo, commitID, "", false)
|
||||
if err != nil {
|
||||
|
||||
@ -188,7 +188,7 @@ type MockClient struct {
|
||||
func NewMockClient() *MockClient {
|
||||
return &MockClient{
|
||||
AddrForRepoFunc: &ClientAddrForRepoFunc{
|
||||
defaultHook: func(api.RepoName) (r0 string) {
|
||||
defaultHook: func(context.Context, api.RepoName) (r0 string) {
|
||||
return
|
||||
},
|
||||
},
|
||||
@ -455,7 +455,7 @@ func NewMockClient() *MockClient {
|
||||
func NewStrictMockClient() *MockClient {
|
||||
return &MockClient{
|
||||
AddrForRepoFunc: &ClientAddrForRepoFunc{
|
||||
defaultHook: func(api.RepoName) string {
|
||||
defaultHook: func(context.Context, api.RepoName) string {
|
||||
panic("unexpected invocation of MockClient.AddrForRepo")
|
||||
},
|
||||
},
|
||||
@ -883,23 +883,23 @@ func NewMockClientFrom(i Client) *MockClient {
|
||||
// ClientAddrForRepoFunc describes the behavior when the AddrForRepo method
|
||||
// of the parent MockClient instance is invoked.
|
||||
type ClientAddrForRepoFunc struct {
|
||||
defaultHook func(api.RepoName) string
|
||||
hooks []func(api.RepoName) string
|
||||
defaultHook func(context.Context, api.RepoName) string
|
||||
hooks []func(context.Context, api.RepoName) string
|
||||
history []ClientAddrForRepoFuncCall
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// AddrForRepo delegates to the next hook function in the queue and stores
|
||||
// the parameter and result values of this invocation.
|
||||
func (m *MockClient) AddrForRepo(v0 api.RepoName) string {
|
||||
r0 := m.AddrForRepoFunc.nextHook()(v0)
|
||||
m.AddrForRepoFunc.appendCall(ClientAddrForRepoFuncCall{v0, r0})
|
||||
func (m *MockClient) AddrForRepo(v0 context.Context, v1 api.RepoName) string {
|
||||
r0 := m.AddrForRepoFunc.nextHook()(v0, v1)
|
||||
m.AddrForRepoFunc.appendCall(ClientAddrForRepoFuncCall{v0, v1, r0})
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetDefaultHook sets function that is called when the AddrForRepo method
|
||||
// of the parent MockClient instance is invoked and the hook queue is empty.
|
||||
func (f *ClientAddrForRepoFunc) SetDefaultHook(hook func(api.RepoName) string) {
|
||||
func (f *ClientAddrForRepoFunc) SetDefaultHook(hook func(context.Context, api.RepoName) string) {
|
||||
f.defaultHook = hook
|
||||
}
|
||||
|
||||
@ -907,7 +907,7 @@ func (f *ClientAddrForRepoFunc) SetDefaultHook(hook func(api.RepoName) string) {
|
||||
// AddrForRepo method of the parent MockClient instance invokes the hook at
|
||||
// the front of the queue and discards it. After the queue is empty, the
|
||||
// default hook function is invoked for any future action.
|
||||
func (f *ClientAddrForRepoFunc) PushHook(hook func(api.RepoName) string) {
|
||||
func (f *ClientAddrForRepoFunc) PushHook(hook func(context.Context, api.RepoName) string) {
|
||||
f.mutex.Lock()
|
||||
f.hooks = append(f.hooks, hook)
|
||||
f.mutex.Unlock()
|
||||
@ -916,19 +916,19 @@ func (f *ClientAddrForRepoFunc) PushHook(hook func(api.RepoName) string) {
|
||||
// SetDefaultReturn calls SetDefaultHook with a function that returns the
|
||||
// given values.
|
||||
func (f *ClientAddrForRepoFunc) SetDefaultReturn(r0 string) {
|
||||
f.SetDefaultHook(func(api.RepoName) string {
|
||||
f.SetDefaultHook(func(context.Context, api.RepoName) string {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
// PushReturn calls PushHook with a function that returns the given values.
|
||||
func (f *ClientAddrForRepoFunc) PushReturn(r0 string) {
|
||||
f.PushHook(func(api.RepoName) string {
|
||||
f.PushHook(func(context.Context, api.RepoName) string {
|
||||
return r0
|
||||
})
|
||||
}
|
||||
|
||||
func (f *ClientAddrForRepoFunc) nextHook() func(api.RepoName) string {
|
||||
func (f *ClientAddrForRepoFunc) nextHook() func(context.Context, api.RepoName) string {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
@ -963,7 +963,10 @@ func (f *ClientAddrForRepoFunc) History() []ClientAddrForRepoFuncCall {
|
||||
type ClientAddrForRepoFuncCall struct {
|
||||
// Arg0 is the value of the 1st argument passed to this method
|
||||
// invocation.
|
||||
Arg0 api.RepoName
|
||||
Arg0 context.Context
|
||||
// Arg1 is the value of the 2nd argument passed to this method
|
||||
// invocation.
|
||||
Arg1 api.RepoName
|
||||
// Result0 is the value of the 1st result returned from this method
|
||||
// invocation.
|
||||
Result0 string
|
||||
@ -972,7 +975,7 @@ type ClientAddrForRepoFuncCall struct {
|
||||
// Args returns an interface slice containing the arguments of this
|
||||
// invocation.
|
||||
func (c ClientAddrForRepoFuncCall) Args() []interface{} {
|
||||
return []interface{}{c.Arg0}
|
||||
return []interface{}{c.Arg0, c.Arg1}
|
||||
}
|
||||
|
||||
// Results returns an interface slice containing the results of this
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/reposource"
|
||||
database "github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/extsvc/gitolite"
|
||||
"github.com/sourcegraph/sourcegraph/internal/gitserver"
|
||||
"github.com/sourcegraph/sourcegraph/internal/httpcli"
|
||||
@ -29,7 +30,7 @@ type GitoliteSource struct {
|
||||
}
|
||||
|
||||
// NewGitoliteSource returns a new GitoliteSource from the given external service.
|
||||
func NewGitoliteSource(ctx context.Context, svc *types.ExternalService, cf *httpcli.Factory) (*GitoliteSource, error) {
|
||||
func NewGitoliteSource(ctx context.Context, db database.DB, svc *types.ExternalService, cf *httpcli.Factory) (*GitoliteSource, error) {
|
||||
rawConfig, err := svc.Config.Decrypt(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("external service id=%d config error: %s", svc.ID, err)
|
||||
@ -60,7 +61,7 @@ func NewGitoliteSource(ctx context.Context, svc *types.ExternalService, cf *http
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lister := gitserver.NewGitoliteLister(gitoliteDoer)
|
||||
lister := gitserver.NewGitoliteLister(db, gitoliteDoer)
|
||||
|
||||
return &GitoliteSource{
|
||||
svc: svc,
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
database "github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/extsvc"
|
||||
"github.com/sourcegraph/sourcegraph/internal/httpcli"
|
||||
"github.com/sourcegraph/sourcegraph/internal/types"
|
||||
@ -20,7 +21,8 @@ func TestGitoliteSource(t *testing.T) {
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
_, err := NewGitoliteSource(ctx, svc, cf)
|
||||
db := database.NewMockDB()
|
||||
_, err := NewGitoliteSource(ctx, db, svc, cf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -56,7 +56,7 @@ func NewSource(ctx context.Context, logger log.Logger, db database.DB, svc *type
|
||||
case extsvc.KindBitbucketCloud:
|
||||
return NewBitbucketCloudSource(ctx, logger.Scoped("BitbucketCloudSource", "bitbucket cloud repo source"), svc, cf)
|
||||
case extsvc.KindGitolite:
|
||||
return NewGitoliteSource(ctx, svc, cf)
|
||||
return NewGitoliteSource(ctx, db, svc, cf)
|
||||
case extsvc.KindPhabricator:
|
||||
return NewPhabricatorSource(ctx, logger.Scoped("PhabricatorSource", "phabricator repo source"), svc, cf)
|
||||
case extsvc.KindAWSCodeCommit:
|
||||
|
||||
@ -47,6 +47,11 @@ type Syncer struct {
|
||||
// Hooks for enterprise specific functionality. Ignored in OSS
|
||||
EnterpriseCreateRepoHook func(context.Context, Store, *types.Repo) error
|
||||
EnterpriseUpdateRepoHook func(context.Context, Store, *types.Repo, *types.Repo) error
|
||||
|
||||
// DeduplicatedForksSet is a set of all repos added to the deduplicateForks site config
|
||||
// property. It exists only to aid in fast lookups instead of having to iterate through the list
|
||||
// each time.
|
||||
DeduplicatedForksSet *types.RepoURISet
|
||||
}
|
||||
|
||||
// RunOptions contains options customizing Run behaviour.
|
||||
|
||||
@ -12,6 +12,7 @@ go_library(
|
||||
"outbound_webhook_jobs.go",
|
||||
"outbound_webhook_logs.go",
|
||||
"outbound_webhooks.go",
|
||||
"repos.go",
|
||||
"saved_searches.go",
|
||||
"secret.go",
|
||||
"types.go",
|
||||
@ -21,6 +22,7 @@ go_library(
|
||||
visibility = ["//:__subpackages__"],
|
||||
deps = [
|
||||
"//internal/api",
|
||||
"//internal/collections",
|
||||
"//internal/database/dbutil",
|
||||
"//internal/encryption",
|
||||
"//internal/executor",
|
||||
@ -38,11 +40,16 @@ go_library(
|
||||
go_test(
|
||||
name = "types_test",
|
||||
timeout = "short",
|
||||
srcs = ["secret_test.go"],
|
||||
srcs = [
|
||||
"repos_test.go",
|
||||
"secret_test.go",
|
||||
],
|
||||
embed = [":types"],
|
||||
deps = [
|
||||
"//internal/collections",
|
||||
"//internal/extsvc",
|
||||
"//schema",
|
||||
"@com_github_stretchr_testify//assert",
|
||||
"@com_github_stretchr_testify//require",
|
||||
],
|
||||
)
|
||||
|
||||
36
internal/types/repos.go
Normal file
36
internal/types/repos.go
Normal file
@ -0,0 +1,36 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/collections"
|
||||
)
|
||||
|
||||
type RepoURISet struct {
|
||||
mu sync.RWMutex
|
||||
index collections.Set[string]
|
||||
}
|
||||
|
||||
func NewRepoURICache(index collections.Set[string]) *RepoURISet {
|
||||
if index == nil {
|
||||
index = make(collections.Set[string])
|
||||
}
|
||||
|
||||
return &RepoURISet{
|
||||
index: index,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RepoURISet) Contains(name string) bool {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
_, ok := c.index[name]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (c *RepoURISet) Overwrite(index collections.Set[string]) {
|
||||
c.mu.Lock()
|
||||
c.index = index
|
||||
c.mu.Unlock()
|
||||
}
|
||||
21
internal/types/repos_test.go
Normal file
21
internal/types/repos_test.go
Normal file
@ -0,0 +1,21 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/collections"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRepoURICache(t *testing.T) {
|
||||
index := collections.NewSet[string]("abc", "def")
|
||||
cache := NewRepoURICache(index)
|
||||
|
||||
require.True(t, cache.Contains("abc"))
|
||||
require.False(t, cache.Contains("ghi"))
|
||||
require.Equal(t, index, cache.index)
|
||||
|
||||
index2 := collections.NewSet[string]("xyz", "pqr")
|
||||
cache.Overwrite(index2)
|
||||
require.Equal(t, index2, cache.index)
|
||||
}
|
||||
@ -577,7 +577,7 @@ func ParseCloneStatusFromGraphQL(s string) CloneStatus {
|
||||
return ParseCloneStatus(strings.ToLower(s))
|
||||
}
|
||||
|
||||
// GitserverRepo represents the data gitserver knows about a repo
|
||||
// GitserverRepo represents the data gitserver knows about a repo
|
||||
type GitserverRepo struct {
|
||||
RepoID api.RepoID
|
||||
// Usually represented by a gitserver hostname
|
||||
@ -600,6 +600,54 @@ type GitserverRepo struct {
|
||||
// A log of the different types of corruption that was detected on this repo. The order of the log entries are
|
||||
// stored from most recent to least recent and capped at 10 entries. See LogCorruption on Gitserverrepo store.
|
||||
CorruptionLogs []RepoCorruptionLog
|
||||
|
||||
// PoolRepoID is the repo_id of the parent repo of which this repo is a fork. This is referenced
|
||||
// for deduplicated storage of the repo itself on disk.
|
||||
//
|
||||
// It is an optional value so consumers must check for non-zero value before using it.
|
||||
PoolRepoID api.RepoID
|
||||
}
|
||||
|
||||
// PoolRepo is used for using deduplicated storage for a repository and all its forks. If a
|
||||
// repository exists in the database as:
|
||||
//
|
||||
// github.com/sourcegraph/sourcegraph with ID: 1
|
||||
//
|
||||
// And a fork of this repository exists in the database as:
|
||||
//
|
||||
// github.com/forked/sourcegraph with ID: 2
|
||||
//
|
||||
// Then the PoolRepo for github.com/forked/sourcegraph will be:
|
||||
//
|
||||
// PoolRepo{
|
||||
// RepoName: "github.com/sourcegraph/sourcegraph",
|
||||
// RepoURI: "github.com/sourcegraph/sourcegraph",
|
||||
// }
|
||||
//
|
||||
// A parent repository does not have a poolrepo.
|
||||
//
|
||||
// The pool repo is stored in $REPODIR/.pool/ by default and the repositories (both the parent and
|
||||
// the forks) are stored in $REPODIR configured as git-alternates of the repository stored at
|
||||
// $REPODIR/.pool. So for the above example we expect to have the following directory structure:
|
||||
//
|
||||
// $REPODIR
|
||||
// |_ .pool
|
||||
// | |_ github.com
|
||||
// | |_ sourcegraph
|
||||
// | |_ sourcegraph
|
||||
// |
|
||||
// |_ github.com
|
||||
//
|
||||
// |_ forked
|
||||
// |_ sourcegraph
|
||||
// |_ sourcegraph
|
||||
// |_sourcegraph
|
||||
//
|
||||
// The PoolRepo type here is used to identify the fork -> parent repo relationship since the pool
|
||||
// repository in the disk is stored with the name of the parent.
|
||||
type PoolRepo struct {
|
||||
RepoName api.RepoName
|
||||
RepoURI string
|
||||
}
|
||||
|
||||
// RepoCorruptionLog represents a corruption event that has been detected on a repo.
|
||||
|
||||
@ -1096,6 +1096,9 @@ go_library(
|
||||
"frontend/1688649829_user_completed_post_signup/down.sql",
|
||||
"frontend/1688649829_user_completed_post_signup/metadata.yaml",
|
||||
"frontend/1688649829_user_completed_post_signup/up.sql",
|
||||
"frontend/1689692530_add_column_pool_repo_id_to_gitserver_repos/down.sql",
|
||||
"frontend/1689692530_add_column_pool_repo_id_to_gitserver_repos/metadata.yaml",
|
||||
"frontend/1689692530_add_column_pool_repo_id_to_gitserver_repos/up.sql",
|
||||
],
|
||||
importpath = "github.com/sourcegraph/sourcegraph/migrations",
|
||||
visibility = ["//visibility:public"],
|
||||
|
||||
@ -0,0 +1,2 @@
|
||||
ALTER TABLE gitserver_repos
|
||||
DROP COLUMN IF EXISTS pool_repo_id;
|
||||
@ -0,0 +1,2 @@
|
||||
name: add column pool_repo_id to gitserver_repos
|
||||
parents: [1688649829]
|
||||
@ -0,0 +1,4 @@
|
||||
ALTER TABLE gitserver_repos
|
||||
ADD COLUMN IF NOT EXISTS pool_repo_id INTEGER DEFAULT NULL REFERENCES repo (id);
|
||||
|
||||
COMMENT ON COLUMN gitserver_repos.pool_repo_id IS 'This is used to refer to the pool repository for deduplicated repos';
|
||||
@ -2693,13 +2693,16 @@ CREATE TABLE gitserver_repos (
|
||||
repo_size_bytes bigint,
|
||||
corrupted_at timestamp with time zone,
|
||||
corruption_logs jsonb DEFAULT '[]'::jsonb NOT NULL,
|
||||
cloning_progress text DEFAULT ''::text
|
||||
cloning_progress text DEFAULT ''::text,
|
||||
pool_repo_id integer
|
||||
);
|
||||
|
||||
COMMENT ON COLUMN gitserver_repos.corrupted_at IS 'Timestamp of when repo corruption was detected';
|
||||
|
||||
COMMENT ON COLUMN gitserver_repos.corruption_logs IS 'Log output of repo corruptions that have been detected - encoded as json';
|
||||
|
||||
COMMENT ON COLUMN gitserver_repos.pool_repo_id IS 'This is used to refer to the pool repository for deduplicated repos';
|
||||
|
||||
CREATE TABLE gitserver_repos_statistics (
|
||||
shard_id text NOT NULL,
|
||||
total bigint DEFAULT 0 NOT NULL,
|
||||
@ -6539,6 +6542,9 @@ ALTER TABLE ONLY github_app_installs
|
||||
ALTER TABLE ONLY github_apps
|
||||
ADD CONSTRAINT github_apps_webhook_id_fkey FOREIGN KEY (webhook_id) REFERENCES webhooks(id) ON DELETE SET NULL;
|
||||
|
||||
ALTER TABLE ONLY gitserver_repos
|
||||
ADD CONSTRAINT gitserver_repos_pool_repo_id_fkey FOREIGN KEY (pool_repo_id) REFERENCES repo(id);
|
||||
|
||||
ALTER TABLE ONLY gitserver_repos
|
||||
ADD CONSTRAINT gitserver_repos_repo_id_fkey FOREIGN KEY (repo_id) REFERENCES repo(id) ON DELETE CASCADE;
|
||||
|
||||
|
||||
@ -1912,6 +1912,12 @@ type Repos struct {
|
||||
Path string `json:"path"`
|
||||
}
|
||||
|
||||
// Repositories description: Top level configuration key for all things repositories.
|
||||
type Repositories struct {
|
||||
// DeduplicateForks description: EXPERIMENTAL: A list of absolute paths of repositories whose forks will use deduplicated storage in gitserver
|
||||
DeduplicateForks []string `json:"deduplicateForks,omitempty"`
|
||||
}
|
||||
|
||||
// Repository description: The repository to get the latest version of.
|
||||
type Repository struct {
|
||||
// Name description: The repository name.
|
||||
@ -2639,6 +2645,8 @@ type SiteConfiguration struct {
|
||||
RepoListUpdateInterval int `json:"repoListUpdateInterval,omitempty"`
|
||||
// RepoPurgeWorker description: Configuration for repository purge worker.
|
||||
RepoPurgeWorker *RepoPurgeWorker `json:"repoPurgeWorker,omitempty"`
|
||||
// Repositories description: Top level configuration key for all things repositories.
|
||||
Repositories *Repositories `json:"repositories,omitempty"`
|
||||
// ScimAuthToken description: The SCIM auth token is used to authenticate SCIM requests. If not set, SCIM is disabled.
|
||||
ScimAuthToken string `json:"scim.authToken,omitempty"`
|
||||
// ScimIdentityProvider description: Identity provider used for SCIM support. "STANDARD" should be used unless a more specific value is available
|
||||
@ -2809,6 +2817,7 @@ func (v *SiteConfiguration) UnmarshalJSON(data []byte) error {
|
||||
delete(m, "repoConcurrentExternalServiceSyncers")
|
||||
delete(m, "repoListUpdateInterval")
|
||||
delete(m, "repoPurgeWorker")
|
||||
delete(m, "repositories")
|
||||
delete(m, "scim.authToken")
|
||||
delete(m, "scim.identityProvider")
|
||||
delete(m, "search.index.symbols.enabled")
|
||||
|
||||
@ -860,6 +860,21 @@
|
||||
"default": -1,
|
||||
"group": "External services"
|
||||
},
|
||||
"repositories": {
|
||||
"description": "Top level configuration key for all things repositories.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"deduplicateForks": {
|
||||
"type": "array",
|
||||
"description": "EXPERIMENTAL: A list of absolute paths of repositories whose forks will use deduplicated storage in gitserver",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"examples": [["github.com/sourcegraph/sourcegraph"]],
|
||||
"uniqueItems": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"syntaxHighlighting": {
|
||||
"title": "SyntaxHighlighting",
|
||||
"description": "Syntax highlighting configuration",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user