From cd38adb4a779ac52b9e9395012102e1bd845c9bd Mon Sep 17 00:00:00 2001 From: Stefan Hengl Date: Thu, 1 Aug 2024 15:29:10 +0200 Subject: [PATCH] chore(search_jobs): add janitor job (#64186) Fixes SPLF-119 This adds a background job to Search Jobs that periodically scans for finished jobs to aggregate the status, upload logs, and clean up the tables. This drastically reduces the size of the tables and improves the performance of the Search Jobs GQL API. For example, with this change, a finished search job on .com only has 1 entry in the database, whereas before it could have several millions if we searched each repository. Notes: - the diff seems larger than it actually is. I left a couple of comments to help the reviewers. ## Test plan: - new unit tests - manual testing: I ran a couple of search jobs locally (with the janitor job interval set to 1 min) and checked that - logs are uploaded to `blobstore-go/buckets/search-jobs` - repo jobs are deleted from `exhaustive_repo_jobs` - logs are served from the blobstore after the janitor ran - downloading logs while the job is running still works ## Changelog The new background job drastically reduces the size of the `exhaustive_*` tables and improves performance of the Search Jobs GQL API. --- .../internal/search/httpapi/export.go | 53 +++++- cmd/worker/internal/search/BUILD.bazel | 7 + cmd/worker/internal/search/janitor.go | 168 ++++++++++++++++++ cmd/worker/internal/search/janitor_test.go | 150 ++++++++++++++++ cmd/worker/internal/search/job.go | 8 +- internal/database/schema.json | 13 ++ internal/database/schema.md | 1 + internal/search/exhaustive/service/service.go | 35 ++++ internal/search/exhaustive/store/BUILD.bazel | 5 +- .../store/exhaustive_search_jobs.go | 2 + .../store/exhaustive_search_jobs_test.go | 160 +++++------------ .../store/exhaustive_search_repo_jobs_test.go | 5 +- ...haustive_search_repo_revision_jobs_test.go | 5 +- .../search/exhaustive/store/store_test.go | 25 --- .../exhaustive/store/storetest/BUILD.bazel | 20 +++ .../exhaustive/store/storetest/store.go | 106 +++++++++++ .../exhaustive/types/exhaustive_search_job.go | 4 + internal/search/exhaustive/types/worker.go | 11 ++ .../down.sql | 1 + .../metadata.yaml | 2 + .../up.sql | 1 + migrations/frontend/squashed.sql | 3 +- 22 files changed, 624 insertions(+), 161 deletions(-) create mode 100644 cmd/worker/internal/search/janitor.go create mode 100644 cmd/worker/internal/search/janitor_test.go delete mode 100644 internal/search/exhaustive/store/store_test.go create mode 100644 internal/search/exhaustive/store/storetest/BUILD.bazel create mode 100644 internal/search/exhaustive/store/storetest/store.go create mode 100644 migrations/frontend/1722348497_exhaustive_search_jobs_add_column/down.sql create mode 100644 migrations/frontend/1722348497_exhaustive_search_jobs_add_column/metadata.yaml create mode 100644 migrations/frontend/1722348497_exhaustive_search_jobs_add_column/up.sql diff --git a/cmd/frontend/internal/search/httpapi/export.go b/cmd/frontend/internal/search/httpapi/export.go index 582993f2341..a3cc243653b 100644 --- a/cmd/frontend/internal/search/httpapi/export.go +++ b/cmd/frontend/internal/search/httpapi/export.go @@ -1,6 +1,7 @@ package httpapi import ( + "context" "fmt" "io" "net/http" @@ -8,7 +9,6 @@ import ( "time" "github.com/gorilla/mux" - "github.com/sourcegraph/log" "github.com/sourcegraph/sourcegraph/internal/auth" @@ -18,7 +18,7 @@ import ( ) // search-jobs__2020-07-01_150405 -func filenamePrefix(jobID int) string { +func filenamePrefix(jobID int64) string { return fmt.Sprintf("search-jobs_%d_%s", jobID, time.Now().Format("2006-01-02_150405")) } @@ -27,7 +27,7 @@ func ServeSearchJobDownload(logger log.Logger, svc *service.Service) http.Handle return func(w http.ResponseWriter, r *http.Request) { jobIDStr := mux.Vars(r)["id"] - jobID, err := strconv.Atoi(jobIDStr) + jobID, err := strconv.ParseInt(jobIDStr, 10, 64) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -40,7 +40,7 @@ func ServeSearchJobDownload(logger log.Logger, svc *service.Service) http.Handle } filename := filenamePrefix(jobID) + ".jsonl" - writeJSON(logger.With(log.Int("jobID", jobID)), w, filename, writerTo) + writeJSON(logger.With(log.Int64("jobID", jobID)), w, filename, writerTo) } } @@ -48,21 +48,58 @@ func ServeSearchJobLogs(logger log.Logger, svc *service.Service) http.HandlerFun logger = logger.With(log.String("handler", "ServeSearchJobLogs")) return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + jobIDStr := mux.Vars(r)["id"] - jobID, err := strconv.Atoi(jobIDStr) + jobID, err := strconv.ParseInt(jobIDStr, 10, 64) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - csvWriterTo, err := svc.GetSearchJobLogsWriterTo(r.Context(), int64(jobID)) + filename := filenamePrefix(jobID) + ".log.csv" + + // Jobs in a terminal state are aggregated. As part of the aggregation, the logs + // are stored in the blobstore. If the job is not in a terminal state, the logs + // are still in the database. It's possible that this call races with the + // aggregation process, but the chances are slim and the user can always retry + // to download the logs. + job, err := svc.GetSearchJob(ctx, jobID) if err != nil { httpError(w, err) return } + if job.IsAggregated { + serveLogFromBlobstore(ctx, logger, svc, filename, jobID, w) + } else { + serveLogFromDB(ctx, logger, svc, filename, jobID, w) + } + } +} - filename := filenamePrefix(jobID) + ".log.csv" - writeCSV(logger.With(log.Int("jobID", jobID)), w, filename, csvWriterTo) +func serveLogFromDB(ctx context.Context, logger log.Logger, svc *service.Service, filename string, jobID int64, w http.ResponseWriter) { + csvWriterTo, err := svc.GetSearchJobLogsWriterTo(ctx, jobID) + if err != nil { + httpError(w, err) + return + } + + writeCSV(logger.With(log.Int64("jobID", jobID)), w, filename, csvWriterTo) +} + +func serveLogFromBlobstore(ctx context.Context, logger log.Logger, svc *service.Service, filenameNoQuotes string, jobID int64, w http.ResponseWriter) { + rc, err := svc.GetJobLogs(ctx, jobID) + if err != nil { + httpError(w, err) + return + } + defer rc.Close() + w.Header().Set("Content-Type", "text/csv") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filenameNoQuotes)) + w.WriteHeader(200) + n, err := io.Copy(w, rc) + if err != nil { + logger.Warn("failed while writing search job csv response", log.String("filename", filenameNoQuotes), log.Int64("bytesWritten", n), log.Error(err)) } } diff --git a/cmd/worker/internal/search/BUILD.bazel b/cmd/worker/internal/search/BUILD.bazel index 6c3bb44fce3..de4b38ccc2b 100644 --- a/cmd/worker/internal/search/BUILD.bazel +++ b/cmd/worker/internal/search/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "exhaustive_search.go", "exhaustive_search_repo.go", "exhaustive_search_repo_revision.go", + "janitor.go", "job.go", ], importpath = "github.com/sourcegraph/sourcegraph/cmd/worker/internal/search", @@ -21,6 +22,7 @@ go_library( "//internal/env", "//internal/gitserver", "//internal/goroutine", + "//internal/metrics", "//internal/object", "//internal/observation", "//internal/search", @@ -33,6 +35,8 @@ go_library( "//internal/workerutil/dbworker", "//internal/workerutil/dbworker/store", "//lib/errors", + "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_sourcegraph_conc//:conc", "@com_github_sourcegraph_log//:log", ], ) @@ -41,6 +45,7 @@ go_test( name = "search_test", srcs = [ "exhaustive_search_test.go", + "janitor_test.go", "job_test.go", ], embed = [":search"], @@ -60,10 +65,12 @@ go_test( "//internal/observation", "//internal/search/exhaustive/service", "//internal/search/exhaustive/store", + "//internal/search/exhaustive/store/storetest", "//internal/search/exhaustive/types", "//lib/iterator", "//schema", "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], ) diff --git a/cmd/worker/internal/search/janitor.go b/cmd/worker/internal/search/janitor.go new file mode 100644 index 00000000000..ec9845fbea1 --- /dev/null +++ b/cmd/worker/internal/search/janitor.go @@ -0,0 +1,168 @@ +package search + +import ( + "context" + "io" + "time" + + "github.com/keegancsmith/sqlf" + "github.com/sourcegraph/conc" + + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/goroutine" + "github.com/sourcegraph/sourcegraph/internal/metrics" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/service" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +func newJanitorJob(observationCtx *observation.Context, db database.DB, svc *service.Service) goroutine.BackgroundRoutine { + handler := goroutine.HandlerFunc(func(ctx context.Context) error { + return runJanitor(ctx, db, svc) + }) + + operation := observationCtx.Operation(observation.Op{ + Name: "search.jobs.janitor", + Metrics: metrics.NewREDMetrics( + observationCtx.Registerer, + "search_jobs_janitor", + metrics.WithCountHelp("Total number of search_jobs_janitor executions"), + ), + }) + + return goroutine.NewPeriodicGoroutine( + context.Background(), + handler, + goroutine.WithName("search_jobs_janitor"), + goroutine.WithDescription("refresh analytics cache"), + goroutine.WithInterval(8*time.Hour), + goroutine.WithOperation(operation), + ) +} + +func runJanitor(ctx context.Context, db database.DB, svc *service.Service) error { + jobs, err := listSearchJobs(ctx, db) + if err != nil { + return err + } + + var errs error + for _, job := range jobs { + // Use the initiator as the actor for this operation + ctx = actor.WithActor(ctx, actor.FromUser(job.Initiator)) + + aggStatus, err := getAggregateStatus(ctx, svc, job.ID) + if err != nil { + return err + } + if aggStatus.IsTerminal() { + err = db.WithTransact(ctx, func(tx database.DB) error { + if err := updateSearchJobStatus(ctx, tx, job.ID, aggStatus); err != nil { + return err + } + + if err := uploadLogsToBlobstore(ctx, svc, job.ID); err != nil { + return err + } + + if err := deleteRepoJobs(ctx, tx, job.ID); err != nil { + return err + } + + if err := setJobAsAggregated(ctx, tx, job.ID); err != nil { + return err + } + + return nil + }) + if err != nil { + errs = errors.Append(errs, err) + // best effort cleanup + _ = svc.DeleteJobLogs(ctx, job.ID) + continue + } + } + } + + return errs +} + +func setJobAsAggregated(ctx context.Context, db database.DB, searchJobID int64) error { + q := sqlf.Sprintf("UPDATE exhaustive_search_jobs SET is_aggregated = true WHERE id = %s", searchJobID) + _, err := db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...) + return err +} + +type job struct { + ID int64 + Initiator int32 +} + +// listSearchJobs returns a list of search jobs that haven't been aggregated +// yet. +func listSearchJobs(ctx context.Context, db database.DB) ([]job, error) { + q := sqlf.Sprintf("SELECT id, initiator_id FROM exhaustive_search_jobs WHERE is_aggregated = false") + rows, err := db.QueryContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...) + if err != nil { + return nil, err + } + defer rows.Close() + + var jobs []job + for rows.Next() { + var j job + if err := rows.Scan( + &j.ID, + &j.Initiator, + ); err != nil { + return nil, err + } + jobs = append(jobs, j) + } + + return jobs, nil +} + +func getAggregateStatus(ctx context.Context, service *service.Service, searchJobID int64) (types.JobState, error) { + job, err := service.GetSearchJob(ctx, searchJobID) + if err != nil { + return "", err + } + + return job.AggState, nil +} + +func updateSearchJobStatus(ctx context.Context, db database.DB, searchJobID int64, status types.JobState) error { + q := sqlf.Sprintf("UPDATE exhaustive_search_jobs SET state = %s WHERE id = %s", status.String(), searchJobID) + _, err := db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...) + return err +} + +func uploadLogsToBlobstore(ctx context.Context, svc *service.Service, searchJobID int64) error { + csvWriterTo, err := svc.GetSearchJobLogsWriterTo(ctx, searchJobID) + if err != nil { + return err + } + + r, w := io.Pipe() + + var g conc.WaitGroup + defer g.Wait() + + g.Go(func() { + _, err := csvWriterTo.WriteTo(w) + w.CloseWithError(err) + }) + + _, err = svc.UploadJobLogs(ctx, searchJobID, r) + + return err +} + +func deleteRepoJobs(ctx context.Context, db database.DB, searchJobID int64) error { + q := sqlf.Sprintf("DELETE FROM exhaustive_search_repo_jobs WHERE search_job_id = %s", searchJobID) + _, err := db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...) + return err +} diff --git a/cmd/worker/internal/search/janitor_test.go b/cmd/worker/internal/search/janitor_test.go new file mode 100644 index 00000000000..da7ea0b1e80 --- /dev/null +++ b/cmd/worker/internal/search/janitor_test.go @@ -0,0 +1,150 @@ +package search + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/keegancsmith/sqlf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/dbtest" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/service" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" +) + +func TestJanitor(t *testing.T) { + if testing.Short() { + t.Skip() + } + + ctx, db, exhaustiveStore, svc, bucket := setupTest(t) + + cases := []struct { + name string + cascade storetest.StateCascade + wantLogUploaded bool + wantJobState types.JobState + wantColIsAggregated bool + }{ + { + name: "all jobs completed", + cascade: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{ + types.JobStateCompleted, + }, + RepoRevJobs: []types.JobState{ + types.JobStateCompleted, + types.JobStateCompleted, + }, + }, + wantLogUploaded: true, + wantJobState: types.JobStateCompleted, + wantColIsAggregated: true, + }, + { + name: "1 job failed", + cascade: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{ + types.JobStateCompleted, + }, + RepoRevJobs: []types.JobState{ + types.JobStateFailed, // failed is terminal + types.JobStateCompleted, + }, + }, + wantLogUploaded: true, + wantJobState: types.JobStateFailed, + wantColIsAggregated: true, + }, + { + name: "Still processing, don't update job state", + cascade: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{ + types.JobStateCompleted, + }, + RepoRevJobs: []types.JobState{ + types.JobStateErrored, // errored is not terminal + }, + }, + wantLogUploaded: false, + wantJobState: types.JobStateCompleted, + wantColIsAggregated: false, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + searchJobID := storetest.CreateJobCascade(t, ctx, exhaustiveStore, tt.cascade) + + // Use context.Background() to test if the janitor sets the user context + // correctly + err := runJanitor(context.Background(), db, svc) + require.NoError(t, err) + + j, err := exhaustiveStore.GetExhaustiveSearchJob(ctx, searchJobID) + require.NoError(t, err) + assert.Equal(t, tt.wantJobState, j.State) + + logs, ok := bucket[fmt.Sprintf("log-%d.csv", searchJobID)] + require.Equal(t, tt.wantLogUploaded, ok) + if tt.wantLogUploaded { + require.Equal(t, len(strings.Split(logs, "\n")), len(tt.cascade.RepoRevJobs)+2) // 2 = 1 header + 1 final newline + } + + // Ensure that the repo jobs have been deleted. + wantCount := len(tt.cascade.RepoJobs) + if tt.wantLogUploaded { + wantCount = 0 + } + require.Equal(t, wantCount, countRepoJobs(t, db, searchJobID)) + + // Ensure that the job is marked as aggregated + require.Equal(t, tt.wantColIsAggregated, j.IsAggregated) + }) + } +} + +func countRepoJobs(t *testing.T, db database.DB, searchJobID int64) int { + t.Helper() + + q := sqlf.Sprintf("SELECT COUNT(*) FROM exhaustive_search_repo_jobs WHERE search_job_id = %s", searchJobID) + var count int + err := db.QueryRowContext(context.Background(), q.Query(sqlf.PostgresBindVar), q.Args()...).Scan(&count) + require.NoError(t, err) + return count +} + +func setupTest(t *testing.T) (context.Context, database.DB, *store.Store, *service.Service, map[string]string) { + t.Helper() + + observationCtx := observation.TestContextTB(t) + logger := observationCtx.Logger + + mockUploadStore, bucket := newMockUploadStore(t) + db := database.NewDB(logger, dbtest.NewDB(t)) + stor := store.New(db, observation.TestContextTB(t)) + svc := service.New(observationCtx, stor, mockUploadStore, service.NewSearcherFake()) + + bs := basestore.NewWithHandle(db.Handle()) + userID, err := storetest.CreateUser(bs, "user1") + require.NoError(t, err) + + _, err = storetest.CreateRepo(db, "repo1") + require.NoError(t, err) + + ctx := actor.WithActor(context.Background(), actor.FromUser(userID)) + + return ctx, db, stor, svc, bucket +} diff --git a/cmd/worker/internal/search/job.go b/cmd/worker/internal/search/job.go index 4efc8d174a5..0c638420981 100644 --- a/cmd/worker/internal/search/job.go +++ b/cmd/worker/internal/search/job.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/sourcegraph/sourcegraph/cmd/worker/job" + workerjob "github.com/sourcegraph/sourcegraph/cmd/worker/job" workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db" "github.com/sourcegraph/sourcegraph/internal/actor" "github.com/sourcegraph/sourcegraph/internal/conf" @@ -44,7 +44,7 @@ type searchJob struct { workers []goroutine.BackgroundRoutine } -func NewSearchJob() job.Job { +func NewSearchJob() workerjob.Job { return &searchJob{ config: config{ WorkerInterval: 1 * time.Second, @@ -103,6 +103,8 @@ func (j *searchJob) newSearchJobRoutines( repoWorkerStore := store.NewRepoSearchJobWorkerStore(observationCtx, db.Handle()) revWorkerStore := store.NewRevSearchJobWorkerStore(observationCtx, db.Handle()) + svc := service.New(observationCtx, exhaustiveSearchStore, uploadStore, newSearcher) + j.workerStores = append(j.workerStores, searchWorkerStore, repoWorkerStore, @@ -123,6 +125,8 @@ func (j *searchJob) newSearchJobRoutines( newExhaustiveSearchWorkerResetter(observationCtx, searchWorkerStore), newExhaustiveSearchRepoWorkerResetter(observationCtx, repoWorkerStore), newExhaustiveSearchRepoRevisionWorkerResetter(observationCtx, revWorkerStore), + + newJanitorJob(observationCtx, db, svc), } }) diff --git a/internal/database/schema.json b/internal/database/schema.json index 34db3090ee5..f7370e2c912 100644 --- a/internal/database/schema.json +++ b/internal/database/schema.json @@ -11728,6 +11728,19 @@ "GenerationExpression": "", "Comment": "" }, + { + "Name": "is_aggregated", + "Index": 18, + "TypeName": "boolean", + "IsNullable": false, + "Default": "false", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, { "Name": "last_heartbeat_at", "Index": 11, diff --git a/internal/database/schema.md b/internal/database/schema.md index eb382e0ce8b..d98cdcc89c8 100644 --- a/internal/database/schema.md +++ b/internal/database/schema.md @@ -1587,6 +1587,7 @@ Referenced by: created_at | timestamp with time zone | | not null | now() updated_at | timestamp with time zone | | not null | now() queued_at | timestamp with time zone | | | now() + is_aggregated | boolean | | not null | false Indexes: "exhaustive_search_jobs_pkey" PRIMARY KEY, btree (id) "exhaustive_search_jobs_state" btree (state) diff --git a/internal/search/exhaustive/service/service.go b/internal/search/exhaustive/service/service.go index 9b544b10000..5bc35ae6916 100644 --- a/internal/search/exhaustive/service/service.go +++ b/internal/search/exhaustive/service/service.go @@ -232,6 +232,38 @@ func (s *Service) GetSearchJobLogsWriterTo(parentCtx context.Context, id int64) }), nil } +// getLogKey returns the key for the log that is stored in the blobstore. +func getLogKey(searchJobID int64) string { + return fmt.Sprintf("log-%d.csv", searchJobID) +} + +func (s *Service) UploadJobLogs(ctx context.Context, id int64, r io.Reader) (int64, error) { + // 🚨 SECURITY: only someone with access to the job may upload the logs + if err := s.store.UserHasAccess(ctx, id); err != nil { + return 0, err + } + + return s.uploadStore.Upload(ctx, getLogKey(id), r) +} + +func (s *Service) GetJobLogs(ctx context.Context, id int64) (io.ReadCloser, error) { + // 🚨 SECURITY: only someone with access to the job may download the logs + if err := s.store.UserHasAccess(ctx, id); err != nil { + return nil, err + } + + return s.uploadStore.Get(ctx, getLogKey(id)) +} + +func (s *Service) DeleteJobLogs(ctx context.Context, id int64) error { + // 🚨 SECURITY: only someone with access to the job may delete the logs + if err := s.store.UserHasAccess(ctx, id); err != nil { + return err + } + + return s.uploadStore.Delete(ctx, getLogKey(id)) +} + // JobLogsIterLimit is the number of lines the iterator will read from the // database per page. Assuming 100 bytes per line, this will be ~1MB of memory // per 10k repo-rev jobs. @@ -309,6 +341,9 @@ func (s *Service) DeleteSearchJob(ctx context.Context, id int64) (err error) { return err } + // The log file is not guaranteed to exist, so we ignore the error here. + _ = s.uploadStore.Delete(ctx, getLogKey(id)) + return s.store.DeleteExhaustiveSearchJob(ctx, id) } diff --git a/internal/search/exhaustive/store/BUILD.bazel b/internal/search/exhaustive/store/BUILD.bazel index 7e6e5eeb169..dc4448f9fb3 100644 --- a/internal/search/exhaustive/store/BUILD.bazel +++ b/internal/search/exhaustive/store/BUILD.bazel @@ -35,7 +35,6 @@ go_test( "exhaustive_search_jobs_test.go", "exhaustive_search_repo_jobs_test.go", "exhaustive_search_repo_revision_jobs_test.go", - "store_test.go", ], tags = [ TAG_PLATFORM_SEARCH, @@ -45,17 +44,15 @@ go_test( deps = [ ":store", "//internal/actor", - "//internal/api", "//internal/auth", "//internal/database", "//internal/database/basestore", "//internal/database/dbtest", "//internal/observation", + "//internal/search/exhaustive/store/storetest", "//internal/search/exhaustive/types", - "//internal/types", "//lib/errors", "@com_github_google_go_cmp//cmp", - "@com_github_keegancsmith_sqlf//:sqlf", "@com_github_sourcegraph_log//logtest", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/internal/search/exhaustive/store/exhaustive_search_jobs.go b/internal/search/exhaustive/store/exhaustive_search_jobs.go index 6aadc487b34..7740ec1305a 100644 --- a/internal/search/exhaustive/store/exhaustive_search_jobs.go +++ b/internal/search/exhaustive/store/exhaustive_search_jobs.go @@ -60,6 +60,7 @@ var exhaustiveSearchJobColumns = []*sqlf.Query{ sqlf.Sprintf("cancel"), sqlf.Sprintf("created_at"), sqlf.Sprintf("updated_at"), + sqlf.Sprintf("is_aggregated"), } func (s *Store) CreateExhaustiveSearchJob(ctx context.Context, job types.ExhaustiveSearchJob) (_ int64, err error) { @@ -549,6 +550,7 @@ func defaultScanTargets(job *types.ExhaustiveSearchJob) []any { &job.Cancel, &job.CreatedAt, &job.UpdatedAt, + &job.IsAggregated, } } diff --git a/internal/search/exhaustive/store/exhaustive_search_jobs_test.go b/internal/search/exhaustive/store/exhaustive_search_jobs_test.go index 6d28928466c..a45ea945f9d 100644 --- a/internal/search/exhaustive/store/exhaustive_search_jobs_test.go +++ b/internal/search/exhaustive/store/exhaustive_search_jobs_test.go @@ -6,7 +6,6 @@ import ( "testing" "github.com/google/go-cmp/cmp" - "github.com/keegancsmith/sqlf" "github.com/sourcegraph/log/logtest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,6 +17,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/database/dbtest" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" "github.com/sourcegraph/sourcegraph/lib/errors" ) @@ -32,11 +32,11 @@ func TestStore_CreateExhaustiveSearchJob(t *testing.T) { bs := basestore.NewWithHandle(db.Handle()) - userID, err := createUser(bs, "alice") + userID, err := storetest.CreateUser(bs, "alice") require.NoError(t, err) - malloryID, err := createUser(bs, "mallory") + malloryID, err := storetest.CreateUser(bs, "mallory") require.NoError(t, err) - adminID, err := createUser(bs, "admin") + adminID, err := storetest.CreateUser(bs, "admin") require.NoError(t, err) s := store.New(db, observation.TestContextTB(t)) @@ -140,10 +140,10 @@ func TestStore_GetAndListSearchJobs(t *testing.T) { db := database.NewDB(logger, dbtest.NewDB(t)) bs := basestore.NewWithHandle(db.Handle()) - userID, err := createUser(bs, "alice") + userID, err := storetest.CreateUser(bs, "alice") require.NoError(t, err) - adminID, err := createUser(bs, "admin") + adminID, err := storetest.CreateUser(bs, "admin") require.NoError(t, err) ctx := actor.WithActor(context.Background(), actor.FromUser(userID)) @@ -291,31 +291,31 @@ func TestStore_AggregateStatus(t *testing.T) { db := database.NewDB(logger, dbtest.NewDB(t)) bs := basestore.NewWithHandle(db.Handle()) - _, err := createRepo(db, "repo1") + _, err := storetest.CreateRepo(db, "repo1") require.NoError(t, err) s := store.New(db, observation.TestContextTB(t)) tc := []struct { name string - c stateCascade + c storetest.StateCascade want types.JobState }{ { name: "only repo rev jobs running", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateProcessing}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateProcessing}, }, want: types.JobStateProcessing, }, { name: "processing, because at least 1 job is running", - c: stateCascade{ - searchJob: types.JobStateProcessing, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{ + c: storetest.StateCascade{ + SearchJob: types.JobStateProcessing, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{ types.JobStateProcessing, types.JobStateQueued, types.JobStateCompleted, @@ -325,10 +325,10 @@ func TestStore_AggregateStatus(t *testing.T) { }, { name: "processing, although some jobs failed", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{ + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{ types.JobStateProcessing, types.JobStateFailed, }, @@ -337,52 +337,52 @@ func TestStore_AggregateStatus(t *testing.T) { }, { name: "all jobs finished, at least 1 failed", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateFailed}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateFailed}, }, want: types.JobStateFailed, }, { name: "all jobs finished successfully", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateCompleted}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateCompleted}, }, want: types.JobStateCompleted, }, { name: "search job was canceled, but some jobs haven't stopped yet", - c: stateCascade{ - searchJob: types.JobStateCanceled, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateProcessing, types.JobStateFailed}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCanceled, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateProcessing, types.JobStateFailed}, }, want: types.JobStateCanceled, }, { name: "top-level search job finished, but the other jobs haven't started yet", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateQueued}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateQueued}, }, want: types.JobStateProcessing, }, { name: "no job is processing, some are completed, some are queued", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateQueued}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateQueued}, }, want: types.JobStateProcessing, }, { name: "search job is queued, but no other job has been created yet", - c: stateCascade{ - searchJob: types.JobStateQueued, + c: storetest.StateCascade{ + SearchJob: types.JobStateQueued, }, want: types.JobStateQueued, }, @@ -390,11 +390,11 @@ func TestStore_AggregateStatus(t *testing.T) { for i, tt := range tc { t.Run(tt.name, func(t *testing.T) { - userID, err := createUser(bs, fmt.Sprintf("user_%d", i)) + userID, err := storetest.CreateUser(bs, fmt.Sprintf("user_%d", i)) require.NoError(t, err) ctx := actor.WithActor(context.Background(), actor.FromUser(userID)) - jobID := createJobCascade(t, ctx, s, tt.c) + jobID := storetest.CreateJobCascade(t, ctx, s, tt.c) jobs, err := s.ListExhaustiveSearchJobs(ctx, store.ListArgs{}) require.NoError(t, err) @@ -405,78 +405,4 @@ func TestStore_AggregateStatus(t *testing.T) { } } -// createJobCascade creates a cascade of jobs (1 search job -> n repo jobs -> m -// repo rev jobs) with states as defined in stateCascade. -// -// This is a fairly large test helper, because don't want to start the worker -// routines, but instead we want to create a snapshot of the state of the jobs -// at a given point in time. -func createJobCascade( - t *testing.T, - ctx context.Context, - stor *store.Store, - casc stateCascade, -) (searchJobID int64) { - t.Helper() - - searchJob := types.ExhaustiveSearchJob{ - InitiatorID: actor.FromContext(ctx).UID, - Query: "repo:job1", - WorkerJob: types.WorkerJob{State: casc.searchJob}, - } - - repoJobs := make([]types.ExhaustiveSearchRepoJob, len(casc.repoJobs)) - for i, r := range casc.repoJobs { - repoJobs[i] = types.ExhaustiveSearchRepoJob{ - WorkerJob: types.WorkerJob{State: r}, - RepoID: 1, // same repo for all tests - RefSpec: "HEAD", - } - } - - repoRevJobs := make([]types.ExhaustiveSearchRepoRevisionJob, len(casc.repoRevJobs)) - for i, rr := range casc.repoRevJobs { - repoRevJobs[i] = types.ExhaustiveSearchRepoRevisionJob{ - WorkerJob: types.WorkerJob{State: rr}, - Revision: "HEAD", - } - } - - jobID, err := stor.CreateExhaustiveSearchJob(ctx, searchJob) - require.NoError(t, err) - assert.NotZero(t, jobID) - - err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_jobs SET state = %s WHERE id = %s", casc.searchJob, jobID)) - require.NoError(t, err) - - for i, r := range repoJobs { - r.SearchJobID = jobID - repoJobID, err := stor.CreateExhaustiveSearchRepoJob(ctx, r) - require.NoError(t, err) - assert.NotZero(t, repoJobID) - - err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_jobs SET state = %s WHERE id = %s", casc.repoJobs[i], repoJobID)) - require.NoError(t, err) - - for j, rr := range repoRevJobs { - rr.SearchRepoJobID = repoJobID - repoRevJobID, err := stor.CreateExhaustiveSearchRepoRevisionJob(ctx, rr) - require.NoError(t, err) - assert.NotZero(t, repoRevJobID) - require.NoError(t, err) - - err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_revision_jobs SET state = %s WHERE id = %s", casc.repoRevJobs[j], repoRevJobID)) - require.NoError(t, err) - } - } - - return jobID -} - -type stateCascade struct { - searchJob types.JobState - repoJobs []types.JobState - repoRevJobs []types.JobState -} - func intptr(s int) *int { return &s } diff --git a/internal/search/exhaustive/store/exhaustive_search_repo_jobs_test.go b/internal/search/exhaustive/store/exhaustive_search_repo_jobs_test.go index a62b24f1870..67c69b6b4cf 100644 --- a/internal/search/exhaustive/store/exhaustive_search_repo_jobs_test.go +++ b/internal/search/exhaustive/store/exhaustive_search_repo_jobs_test.go @@ -14,6 +14,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/database/dbtest" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" "github.com/sourcegraph/sourcegraph/lib/errors" ) @@ -28,9 +29,9 @@ func TestStore_CreateExhaustiveSearchRepoJob(t *testing.T) { bs := basestore.NewWithHandle(db.Handle()) - userID, err := createUser(bs, "alice") + userID, err := storetest.CreateUser(bs, "alice") require.NoError(t, err) - repoID, err := createRepo(db, "repo-test") + repoID, err := storetest.CreateRepo(db, "repo-test") require.NoError(t, err) ctx := actor.WithActor(context.Background(), &actor.Actor{ diff --git a/internal/search/exhaustive/store/exhaustive_search_repo_revision_jobs_test.go b/internal/search/exhaustive/store/exhaustive_search_repo_revision_jobs_test.go index 2280a64eebd..cffd66538ff 100644 --- a/internal/search/exhaustive/store/exhaustive_search_repo_revision_jobs_test.go +++ b/internal/search/exhaustive/store/exhaustive_search_repo_revision_jobs_test.go @@ -14,6 +14,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/database/dbtest" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" "github.com/sourcegraph/sourcegraph/lib/errors" ) @@ -28,9 +29,9 @@ func TestStore_CreateExhaustiveSearchRepoRevisionJob(t *testing.T) { bs := basestore.NewWithHandle(db.Handle()) - userID, err := createUser(bs, "alice") + userID, err := storetest.CreateUser(bs, "alice") require.NoError(t, err) - repoID, err := createRepo(db, "repo-test") + repoID, err := storetest.CreateRepo(db, "repo-test") require.NoError(t, err) ctx := actor.WithActor(context.Background(), &actor.Actor{ diff --git a/internal/search/exhaustive/store/store_test.go b/internal/search/exhaustive/store/store_test.go deleted file mode 100644 index bc8001de1a2..00000000000 --- a/internal/search/exhaustive/store/store_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package store_test - -import ( - "context" - - "github.com/keegancsmith/sqlf" - - "github.com/sourcegraph/sourcegraph/internal/api" - "github.com/sourcegraph/sourcegraph/internal/database" - "github.com/sourcegraph/sourcegraph/internal/database/basestore" - "github.com/sourcegraph/sourcegraph/internal/types" -) - -func createUser(store *basestore.Store, username string) (int32, error) { - admin := username == "admin" - q := sqlf.Sprintf(`INSERT INTO users(username, site_admin) VALUES(%s, %s) RETURNING id`, username, admin) - return basestore.ScanAny[int32](store.QueryRow(context.Background(), q)) -} - -func createRepo(db database.DB, name string) (api.RepoID, error) { - repoStore := db.Repos() - repo := types.Repo{Name: api.RepoName(name)} - err := repoStore.Create(context.Background(), &repo) - return repo.ID, err -} diff --git a/internal/search/exhaustive/store/storetest/BUILD.bazel b/internal/search/exhaustive/store/storetest/BUILD.bazel new file mode 100644 index 00000000000..97fa623dbae --- /dev/null +++ b/internal/search/exhaustive/store/storetest/BUILD.bazel @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "storetest", + srcs = ["store.go"], + importpath = "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest", + visibility = ["//:__subpackages__"], + deps = [ + "//internal/actor", + "//internal/api", + "//internal/database", + "//internal/database/basestore", + "//internal/search/exhaustive/store", + "//internal/search/exhaustive/types", + "//internal/types", + "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/internal/search/exhaustive/store/storetest/store.go b/internal/search/exhaustive/store/storetest/store.go new file mode 100644 index 00000000000..9b0d001740b --- /dev/null +++ b/internal/search/exhaustive/store/storetest/store.go @@ -0,0 +1,106 @@ +package storetest + +import ( + "context" + "fmt" + "testing" + + "github.com/keegancsmith/sqlf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" + internaltypes "github.com/sourcegraph/sourcegraph/internal/types" +) + +// CreateJobCascade creates a cascade of jobs (1 search job -> n repo jobs -> m +// repo rev jobs) with states as defined in StateCascade. +// +// This is a fairly large test helper, because don't want to start the worker +// routines, but instead we want to create a snapshot of the state of the jobs +// at a given point in time. +func CreateJobCascade( + t *testing.T, + ctx context.Context, + stor *store.Store, + casc StateCascade, +) (searchJobID int64) { + t.Helper() + + searchJob := types.ExhaustiveSearchJob{ + InitiatorID: actor.FromContext(ctx).UID, + Query: "repo:job1", + WorkerJob: types.WorkerJob{State: casc.SearchJob}, + } + + repoJobs := make([]types.ExhaustiveSearchRepoJob, len(casc.RepoJobs)) + for i, r := range casc.RepoJobs { + repoJobs[i] = types.ExhaustiveSearchRepoJob{ + WorkerJob: types.WorkerJob{State: r}, + RepoID: 1, // same repo for all tests + RefSpec: "HEAD", + } + } + + repoRevJobs := make([]types.ExhaustiveSearchRepoRevisionJob, len(casc.RepoRevJobs)) + for i, rr := range casc.RepoRevJobs { + repoRevJobs[i] = types.ExhaustiveSearchRepoRevisionJob{ + WorkerJob: types.WorkerJob{State: rr, FailureMessage: fmt.Sprintf("repoRevJob-%d", i)}, + Revision: "HEAD", + } + } + + jobID, err := stor.CreateExhaustiveSearchJob(ctx, searchJob) + require.NoError(t, err) + assert.NotZero(t, jobID) + + err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_jobs SET state = %s WHERE id = %s", casc.SearchJob, jobID)) + require.NoError(t, err) + + for i, r := range repoJobs { + r.SearchJobID = jobID + repoJobID, err := stor.CreateExhaustiveSearchRepoJob(ctx, r) + require.NoError(t, err) + assert.NotZero(t, repoJobID) + + err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_jobs SET state = %s WHERE id = %s", casc.RepoJobs[i], repoJobID)) + require.NoError(t, err) + + for j, rr := range repoRevJobs { + rr.SearchRepoJobID = repoJobID + repoRevJobID, err := stor.CreateExhaustiveSearchRepoRevisionJob(ctx, rr) + require.NoError(t, err) + assert.NotZero(t, repoRevJobID) + require.NoError(t, err) + + err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_revision_jobs SET state = %s, failure_message = %s WHERE id = %s", casc.RepoRevJobs[j], repoRevJobs[j].FailureMessage, repoRevJobID)) + require.NoError(t, err) + } + } + + return jobID +} + +type StateCascade struct { + SearchJob types.JobState + RepoJobs []types.JobState + RepoRevJobs []types.JobState +} + +func CreateRepo(db database.DB, name string) (api.RepoID, error) { + repoStore := db.Repos() + repo := internaltypes.Repo{Name: api.RepoName(name)} + err := repoStore.Create(context.Background(), &repo) + return repo.ID, err +} + +func CreateUser(store *basestore.Store, username string) (int32, error) { + admin := username == "admin" + q := sqlf.Sprintf(`INSERT INTO users(username, site_admin) VALUES(%s, %s) RETURNING id`, username, admin) + return basestore.ScanAny[int32](store.QueryRow(context.Background(), q)) +} diff --git a/internal/search/exhaustive/types/exhaustive_search_job.go b/internal/search/exhaustive/types/exhaustive_search_job.go index ed2f8434659..d858bdf6f8d 100644 --- a/internal/search/exhaustive/types/exhaustive_search_job.go +++ b/internal/search/exhaustive/types/exhaustive_search_job.go @@ -25,6 +25,10 @@ type ExhaustiveSearchJob struct { // from ListSearchJobs. This state is different from WorkerJob.State, because it // reflects the combined state of all jobs created as part of the search job. AggState JobState + + // Set to true by the janitor job if it has processed the job. This is used to + // avoid aggregating the same job multiple times. + IsAggregated bool } func (j *ExhaustiveSearchJob) RecordID() int { diff --git a/internal/search/exhaustive/types/worker.go b/internal/search/exhaustive/types/worker.go index aa1cefcee1d..b86677b7d62 100644 --- a/internal/search/exhaustive/types/worker.go +++ b/internal/search/exhaustive/types/worker.go @@ -34,3 +34,14 @@ const ( // ToGraphQL returns the GraphQL representation of the worker state. func (s JobState) ToGraphQL() string { return strings.ToUpper(string(s)) } + +func (s JobState) String() string { return strings.ToLower(string(s)) } + +func (s JobState) IsTerminal() bool { + switch s { + case JobStateCompleted, JobStateFailed, JobStateCanceled: + return true + default: + return false + } +} diff --git a/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/down.sql b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/down.sql new file mode 100644 index 00000000000..750639db07d --- /dev/null +++ b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/down.sql @@ -0,0 +1 @@ +ALTER TABLE exhaustive_search_jobs DROP COLUMN IF EXISTS is_aggregated; diff --git a/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/metadata.yaml b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/metadata.yaml new file mode 100644 index 00000000000..2f15f719ea1 --- /dev/null +++ b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/metadata.yaml @@ -0,0 +1,2 @@ +name: exhaustive search jobs add column +parents: [1721814902] diff --git a/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/up.sql b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/up.sql new file mode 100644 index 00000000000..3eab576b72b --- /dev/null +++ b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/up.sql @@ -0,0 +1 @@ +ALTER TABLE exhaustive_search_jobs ADD COLUMN IF NOT EXISTS is_aggregated boolean NOT NULL DEFAULT false; diff --git a/migrations/frontend/squashed.sql b/migrations/frontend/squashed.sql index 5e9e1c9706a..c44de16fe64 100644 --- a/migrations/frontend/squashed.sql +++ b/migrations/frontend/squashed.sql @@ -2453,7 +2453,8 @@ CREATE TABLE exhaustive_search_jobs ( cancel boolean DEFAULT false NOT NULL, created_at timestamp with time zone DEFAULT now() NOT NULL, updated_at timestamp with time zone DEFAULT now() NOT NULL, - queued_at timestamp with time zone DEFAULT now() + queued_at timestamp with time zone DEFAULT now(), + is_aggregated boolean DEFAULT false NOT NULL ); CREATE SEQUENCE exhaustive_search_jobs_id_seq