mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 17:31:43 +00:00
repo-updater: Hydrate schedule on startup (#62891)
Currently, when repo-updater restarts it loses all intel it collected over time. That causes a large flood of git fetch requests after it restarts. Every repo will be enqueued for an immediate update. This PR fixes that by populating the scheduler with an initial delay per repo that is calculated with the same heuristic that the scheduler uses when it's fully warmed up. This should avoid fetching git repos that are very stale (most likely the majority on instances with many repos). Test plan: Ran it locally, verified the scheduler state using the instrumentation tool for it, the schedule looks as expected and most repos aren't scheduled for the next 8h.
This commit is contained in:
parent
b1cbbc82e5
commit
03c05e5fda
@ -18,6 +18,7 @@ go_library(
|
||||
"//internal/api",
|
||||
"//internal/conf",
|
||||
"//internal/database",
|
||||
"//internal/dotcom",
|
||||
"//internal/limiter",
|
||||
"//internal/ratelimit",
|
||||
"//internal/repoupdater/protocol",
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/internal/api"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database"
|
||||
"github.com/sourcegraph/sourcegraph/internal/dotcom"
|
||||
"github.com/sourcegraph/sourcegraph/internal/limiter"
|
||||
"github.com/sourcegraph/sourcegraph/internal/ratelimit"
|
||||
"github.com/sourcegraph/sourcegraph/internal/repoupdater/protocol"
|
||||
@ -103,10 +104,87 @@ func (s *UpdateScheduler) Start() {
|
||||
ctx, cancel := context.WithCancel(actor.WithInternalActor(context.Background()))
|
||||
s.cancelCtx = cancel
|
||||
|
||||
if !dotcom.SourcegraphDotComMode() {
|
||||
s.logger.Info("hydrating update scheduler")
|
||||
|
||||
// Hydrate the scheduler with the initial set of repos.
|
||||
// This is done to preset the intervals from the database state, so that
|
||||
// repos that haven't changed in a while don't need to be refetched once
|
||||
// after a restart until we restore the previous schedule.
|
||||
var nextCursor int
|
||||
errors := 0
|
||||
for {
|
||||
var (
|
||||
rs []types.RepoGitserverStatus
|
||||
err error
|
||||
)
|
||||
rs, nextCursor, err = s.db.GitserverRepos().IterateRepoGitserverStatus(ctx, database.IterateRepoGitserverStatusOptions{
|
||||
NextCursor: nextCursor,
|
||||
BatchSize: 1000,
|
||||
})
|
||||
if err != nil {
|
||||
errors++
|
||||
s.logger.Error("failed to iterate gitserver repos", log.Error(err), log.Int("errors", errors))
|
||||
if errors > 5 {
|
||||
s.logger.Error("too many errors, stopping initial hydration of update queue, the queue will build up lazily")
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
for _, r := range rs {
|
||||
cr := configuredRepo{
|
||||
ID: r.ID,
|
||||
Name: r.Name,
|
||||
}
|
||||
if !s.schedule.upsert(cr) {
|
||||
interval := initialInterval(r)
|
||||
s.schedule.updateInterval(cr, interval)
|
||||
}
|
||||
}
|
||||
if nextCursor == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Info("hydrated update scheduler")
|
||||
}
|
||||
|
||||
go s.runUpdateLoop(ctx)
|
||||
go s.runScheduleLoop(ctx)
|
||||
}
|
||||
|
||||
// initialInterval determines the initial interval used for the scheduler:
|
||||
// (Any values outside of [45s, 8h] are capped)
|
||||
// Last changed: 2h30m ago
|
||||
// Last fetched: 2h ago
|
||||
// Time since last changed: 2:30h
|
||||
// Interval between last fetch and last change: 30 min
|
||||
// The next fetch will be due at: 2h ago (last fetched) + 30min/2
|
||||
// = 1:45h ago.
|
||||
// Since this time is in the past, it will be scheduled immediately.
|
||||
// Another example:
|
||||
// Last Changed: 2h ago
|
||||
// Last fetched: 30 min ago
|
||||
// Interval between last fetch and last change: 1h:30 min
|
||||
// The next fetch will be due at: 30 min ago (last fetched) + 90min/2
|
||||
// = in 15 minutes.
|
||||
func initialInterval(r types.RepoGitserverStatus) time.Duration {
|
||||
interval := r.LastFetched.Sub(r.LastChanged) / 2
|
||||
if interval < minDelay {
|
||||
interval = minDelay
|
||||
} else if interval > maxDelay {
|
||||
interval = maxDelay
|
||||
}
|
||||
interval = time.Until(r.LastFetched.Add(interval))
|
||||
if interval < minDelay {
|
||||
interval = minDelay
|
||||
} else if interval > maxDelay {
|
||||
interval = maxDelay
|
||||
}
|
||||
return interval
|
||||
}
|
||||
|
||||
func (s *UpdateScheduler) Stop(context.Context) error {
|
||||
if s.cancelCtx != nil {
|
||||
s.cancelCtx()
|
||||
|
||||
@ -1610,3 +1610,62 @@ func TestGetCustomInterval(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
func TestInitialInterval(t *testing.T) {
|
||||
now := time.Now()
|
||||
cases := []struct {
|
||||
name string
|
||||
lastChanged time.Time
|
||||
lastFetched time.Time
|
||||
expected time.Duration
|
||||
}{
|
||||
{
|
||||
name: "changed an hour ago",
|
||||
lastChanged: now.Add(-1 * time.Hour),
|
||||
lastFetched: now,
|
||||
expected: 30 * time.Minute,
|
||||
},
|
||||
{
|
||||
name: "not fetched a long time",
|
||||
// The interval here will be 8h, but we haven't fetched in 10h, so
|
||||
// the next fetch is due immediately.
|
||||
lastChanged: now.Add(-100 * time.Hour),
|
||||
lastFetched: now.Add(-10 * time.Hour),
|
||||
expected: minDelay,
|
||||
},
|
||||
{
|
||||
name: "both equal",
|
||||
lastChanged: now,
|
||||
lastFetched: now,
|
||||
expected: minDelay,
|
||||
},
|
||||
{
|
||||
name: "both equal long ago",
|
||||
lastChanged: now.Add(-100 * time.Hour),
|
||||
lastFetched: now.Add(-100 * time.Hour),
|
||||
expected: minDelay,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
r := types.RepoGitserverStatus{
|
||||
GitserverRepo: &types.GitserverRepo{
|
||||
LastChanged: tc.lastChanged,
|
||||
LastFetched: tc.lastFetched,
|
||||
},
|
||||
}
|
||||
actual := initialInterval(r)
|
||||
// Due to rounding errors, we accept up to 1 second of difference when comparing:
|
||||
if diff := abs(actual - tc.expected); diff > 1*time.Second {
|
||||
t.Errorf("expected %v, got %v", tc.expected, actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func abs(x time.Duration) time.Duration {
|
||||
if x < 0 {
|
||||
return -x
|
||||
}
|
||||
return x
|
||||
}
|
||||
|
||||
@ -360,6 +360,10 @@ func newUnclonedReposManager(ctx context.Context, logger log.Logger, isSourcegra
|
||||
// of the queue.
|
||||
managed := sched.ListRepoIDs()
|
||||
|
||||
if len(managed) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
uncloned, err := baseRepoStore.ListMinimalRepos(ctx, database.ReposListOptions{IDs: managed, NoCloned: true})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to fetch list of uncloned repositories")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user