From 03c05e5fda522c71019ac869b00cb21665feb017 Mon Sep 17 00:00:00 2001 From: Erik Seliger Date: Tue, 4 Jun 2024 19:00:23 +0200 Subject: [PATCH] repo-updater: Hydrate schedule on startup (#62891) Currently, when repo-updater restarts it loses all intel it collected over time. That causes a large flood of git fetch requests after it restarts. Every repo will be enqueued for an immediate update. This PR fixes that by populating the scheduler with an initial delay per repo that is calculated with the same heuristic that the scheduler uses when it's fully warmed up. This should avoid fetching git repos that are very stale (most likely the majority on instances with many repos). Test plan: Ran it locally, verified the scheduler state using the instrumentation tool for it, the schedule looks as expected and most repos aren't scheduled for the next 8h. --- .../internal/scheduler/BUILD.bazel | 1 + .../internal/scheduler/scheduler.go | 78 +++++++++++++++++++ .../internal/scheduler/scheduler_test.go | 59 ++++++++++++++ cmd/repo-updater/shared/main.go | 4 + 4 files changed, 142 insertions(+) diff --git a/cmd/repo-updater/internal/scheduler/BUILD.bazel b/cmd/repo-updater/internal/scheduler/BUILD.bazel index a8938430571..11dcf17e826 100644 --- a/cmd/repo-updater/internal/scheduler/BUILD.bazel +++ b/cmd/repo-updater/internal/scheduler/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//internal/api", "//internal/conf", "//internal/database", + "//internal/dotcom", "//internal/limiter", "//internal/ratelimit", "//internal/repoupdater/protocol", diff --git a/cmd/repo-updater/internal/scheduler/scheduler.go b/cmd/repo-updater/internal/scheduler/scheduler.go index 217661ac560..d83d7407382 100644 --- a/cmd/repo-updater/internal/scheduler/scheduler.go +++ b/cmd/repo-updater/internal/scheduler/scheduler.go @@ -18,6 +18,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/dotcom" "github.com/sourcegraph/sourcegraph/internal/limiter" "github.com/sourcegraph/sourcegraph/internal/ratelimit" "github.com/sourcegraph/sourcegraph/internal/repoupdater/protocol" @@ -103,10 +104,87 @@ func (s *UpdateScheduler) Start() { ctx, cancel := context.WithCancel(actor.WithInternalActor(context.Background())) s.cancelCtx = cancel + if !dotcom.SourcegraphDotComMode() { + s.logger.Info("hydrating update scheduler") + + // Hydrate the scheduler with the initial set of repos. + // This is done to preset the intervals from the database state, so that + // repos that haven't changed in a while don't need to be refetched once + // after a restart until we restore the previous schedule. + var nextCursor int + errors := 0 + for { + var ( + rs []types.RepoGitserverStatus + err error + ) + rs, nextCursor, err = s.db.GitserverRepos().IterateRepoGitserverStatus(ctx, database.IterateRepoGitserverStatusOptions{ + NextCursor: nextCursor, + BatchSize: 1000, + }) + if err != nil { + errors++ + s.logger.Error("failed to iterate gitserver repos", log.Error(err), log.Int("errors", errors)) + if errors > 5 { + s.logger.Error("too many errors, stopping initial hydration of update queue, the queue will build up lazily") + return + } + time.Sleep(time.Second) + continue + } + for _, r := range rs { + cr := configuredRepo{ + ID: r.ID, + Name: r.Name, + } + if !s.schedule.upsert(cr) { + interval := initialInterval(r) + s.schedule.updateInterval(cr, interval) + } + } + if nextCursor == 0 { + break + } + } + + s.logger.Info("hydrated update scheduler") + } + go s.runUpdateLoop(ctx) go s.runScheduleLoop(ctx) } +// initialInterval determines the initial interval used for the scheduler: +// (Any values outside of [45s, 8h] are capped) +// Last changed: 2h30m ago +// Last fetched: 2h ago +// Time since last changed: 2:30h +// Interval between last fetch and last change: 30 min +// The next fetch will be due at: 2h ago (last fetched) + 30min/2 +// = 1:45h ago. +// Since this time is in the past, it will be scheduled immediately. +// Another example: +// Last Changed: 2h ago +// Last fetched: 30 min ago +// Interval between last fetch and last change: 1h:30 min +// The next fetch will be due at: 30 min ago (last fetched) + 90min/2 +// = in 15 minutes. +func initialInterval(r types.RepoGitserverStatus) time.Duration { + interval := r.LastFetched.Sub(r.LastChanged) / 2 + if interval < minDelay { + interval = minDelay + } else if interval > maxDelay { + interval = maxDelay + } + interval = time.Until(r.LastFetched.Add(interval)) + if interval < minDelay { + interval = minDelay + } else if interval > maxDelay { + interval = maxDelay + } + return interval +} + func (s *UpdateScheduler) Stop(context.Context) error { if s.cancelCtx != nil { s.cancelCtx() diff --git a/cmd/repo-updater/internal/scheduler/scheduler_test.go b/cmd/repo-updater/internal/scheduler/scheduler_test.go index de1092fa29a..0b58a54fb1f 100644 --- a/cmd/repo-updater/internal/scheduler/scheduler_test.go +++ b/cmd/repo-updater/internal/scheduler/scheduler_test.go @@ -1610,3 +1610,62 @@ func TestGetCustomInterval(t *testing.T) { }) } } +func TestInitialInterval(t *testing.T) { + now := time.Now() + cases := []struct { + name string + lastChanged time.Time + lastFetched time.Time + expected time.Duration + }{ + { + name: "changed an hour ago", + lastChanged: now.Add(-1 * time.Hour), + lastFetched: now, + expected: 30 * time.Minute, + }, + { + name: "not fetched a long time", + // The interval here will be 8h, but we haven't fetched in 10h, so + // the next fetch is due immediately. + lastChanged: now.Add(-100 * time.Hour), + lastFetched: now.Add(-10 * time.Hour), + expected: minDelay, + }, + { + name: "both equal", + lastChanged: now, + lastFetched: now, + expected: minDelay, + }, + { + name: "both equal long ago", + lastChanged: now.Add(-100 * time.Hour), + lastFetched: now.Add(-100 * time.Hour), + expected: minDelay, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + r := types.RepoGitserverStatus{ + GitserverRepo: &types.GitserverRepo{ + LastChanged: tc.lastChanged, + LastFetched: tc.lastFetched, + }, + } + actual := initialInterval(r) + // Due to rounding errors, we accept up to 1 second of difference when comparing: + if diff := abs(actual - tc.expected); diff > 1*time.Second { + t.Errorf("expected %v, got %v", tc.expected, actual) + } + }) + } +} + +func abs(x time.Duration) time.Duration { + if x < 0 { + return -x + } + return x +} diff --git a/cmd/repo-updater/shared/main.go b/cmd/repo-updater/shared/main.go index 85f9a645dfe..cb999404074 100644 --- a/cmd/repo-updater/shared/main.go +++ b/cmd/repo-updater/shared/main.go @@ -360,6 +360,10 @@ func newUnclonedReposManager(ctx context.Context, logger log.Logger, isSourcegra // of the queue. managed := sched.ListRepoIDs() + if len(managed) == 0 { + return nil + } + uncloned, err := baseRepoStore.ListMinimalRepos(ctx, database.ReposListOptions{IDs: managed, NoCloned: true}) if err != nil { return errors.Wrap(err, "failed to fetch list of uncloned repositories")