diff --git a/cmd/frontend/internal/search/httpapi/export.go b/cmd/frontend/internal/search/httpapi/export.go index 582993f2341..a3cc243653b 100644 --- a/cmd/frontend/internal/search/httpapi/export.go +++ b/cmd/frontend/internal/search/httpapi/export.go @@ -1,6 +1,7 @@ package httpapi import ( + "context" "fmt" "io" "net/http" @@ -8,7 +9,6 @@ import ( "time" "github.com/gorilla/mux" - "github.com/sourcegraph/log" "github.com/sourcegraph/sourcegraph/internal/auth" @@ -18,7 +18,7 @@ import ( ) // search-jobs__2020-07-01_150405 -func filenamePrefix(jobID int) string { +func filenamePrefix(jobID int64) string { return fmt.Sprintf("search-jobs_%d_%s", jobID, time.Now().Format("2006-01-02_150405")) } @@ -27,7 +27,7 @@ func ServeSearchJobDownload(logger log.Logger, svc *service.Service) http.Handle return func(w http.ResponseWriter, r *http.Request) { jobIDStr := mux.Vars(r)["id"] - jobID, err := strconv.Atoi(jobIDStr) + jobID, err := strconv.ParseInt(jobIDStr, 10, 64) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -40,7 +40,7 @@ func ServeSearchJobDownload(logger log.Logger, svc *service.Service) http.Handle } filename := filenamePrefix(jobID) + ".jsonl" - writeJSON(logger.With(log.Int("jobID", jobID)), w, filename, writerTo) + writeJSON(logger.With(log.Int64("jobID", jobID)), w, filename, writerTo) } } @@ -48,21 +48,58 @@ func ServeSearchJobLogs(logger log.Logger, svc *service.Service) http.HandlerFun logger = logger.With(log.String("handler", "ServeSearchJobLogs")) return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + jobIDStr := mux.Vars(r)["id"] - jobID, err := strconv.Atoi(jobIDStr) + jobID, err := strconv.ParseInt(jobIDStr, 10, 64) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - csvWriterTo, err := svc.GetSearchJobLogsWriterTo(r.Context(), int64(jobID)) + filename := filenamePrefix(jobID) + ".log.csv" + + // Jobs in a terminal state are aggregated. As part of the aggregation, the logs + // are stored in the blobstore. If the job is not in a terminal state, the logs + // are still in the database. It's possible that this call races with the + // aggregation process, but the chances are slim and the user can always retry + // to download the logs. + job, err := svc.GetSearchJob(ctx, jobID) if err != nil { httpError(w, err) return } + if job.IsAggregated { + serveLogFromBlobstore(ctx, logger, svc, filename, jobID, w) + } else { + serveLogFromDB(ctx, logger, svc, filename, jobID, w) + } + } +} - filename := filenamePrefix(jobID) + ".log.csv" - writeCSV(logger.With(log.Int("jobID", jobID)), w, filename, csvWriterTo) +func serveLogFromDB(ctx context.Context, logger log.Logger, svc *service.Service, filename string, jobID int64, w http.ResponseWriter) { + csvWriterTo, err := svc.GetSearchJobLogsWriterTo(ctx, jobID) + if err != nil { + httpError(w, err) + return + } + + writeCSV(logger.With(log.Int64("jobID", jobID)), w, filename, csvWriterTo) +} + +func serveLogFromBlobstore(ctx context.Context, logger log.Logger, svc *service.Service, filenameNoQuotes string, jobID int64, w http.ResponseWriter) { + rc, err := svc.GetJobLogs(ctx, jobID) + if err != nil { + httpError(w, err) + return + } + defer rc.Close() + w.Header().Set("Content-Type", "text/csv") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filenameNoQuotes)) + w.WriteHeader(200) + n, err := io.Copy(w, rc) + if err != nil { + logger.Warn("failed while writing search job csv response", log.String("filename", filenameNoQuotes), log.Int64("bytesWritten", n), log.Error(err)) } } diff --git a/cmd/worker/internal/search/BUILD.bazel b/cmd/worker/internal/search/BUILD.bazel index 6c3bb44fce3..de4b38ccc2b 100644 --- a/cmd/worker/internal/search/BUILD.bazel +++ b/cmd/worker/internal/search/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "exhaustive_search.go", "exhaustive_search_repo.go", "exhaustive_search_repo_revision.go", + "janitor.go", "job.go", ], importpath = "github.com/sourcegraph/sourcegraph/cmd/worker/internal/search", @@ -21,6 +22,7 @@ go_library( "//internal/env", "//internal/gitserver", "//internal/goroutine", + "//internal/metrics", "//internal/object", "//internal/observation", "//internal/search", @@ -33,6 +35,8 @@ go_library( "//internal/workerutil/dbworker", "//internal/workerutil/dbworker/store", "//lib/errors", + "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_sourcegraph_conc//:conc", "@com_github_sourcegraph_log//:log", ], ) @@ -41,6 +45,7 @@ go_test( name = "search_test", srcs = [ "exhaustive_search_test.go", + "janitor_test.go", "job_test.go", ], embed = [":search"], @@ -60,10 +65,12 @@ go_test( "//internal/observation", "//internal/search/exhaustive/service", "//internal/search/exhaustive/store", + "//internal/search/exhaustive/store/storetest", "//internal/search/exhaustive/types", "//lib/iterator", "//schema", "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], ) diff --git a/cmd/worker/internal/search/janitor.go b/cmd/worker/internal/search/janitor.go new file mode 100644 index 00000000000..ec9845fbea1 --- /dev/null +++ b/cmd/worker/internal/search/janitor.go @@ -0,0 +1,168 @@ +package search + +import ( + "context" + "io" + "time" + + "github.com/keegancsmith/sqlf" + "github.com/sourcegraph/conc" + + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/goroutine" + "github.com/sourcegraph/sourcegraph/internal/metrics" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/service" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +func newJanitorJob(observationCtx *observation.Context, db database.DB, svc *service.Service) goroutine.BackgroundRoutine { + handler := goroutine.HandlerFunc(func(ctx context.Context) error { + return runJanitor(ctx, db, svc) + }) + + operation := observationCtx.Operation(observation.Op{ + Name: "search.jobs.janitor", + Metrics: metrics.NewREDMetrics( + observationCtx.Registerer, + "search_jobs_janitor", + metrics.WithCountHelp("Total number of search_jobs_janitor executions"), + ), + }) + + return goroutine.NewPeriodicGoroutine( + context.Background(), + handler, + goroutine.WithName("search_jobs_janitor"), + goroutine.WithDescription("refresh analytics cache"), + goroutine.WithInterval(8*time.Hour), + goroutine.WithOperation(operation), + ) +} + +func runJanitor(ctx context.Context, db database.DB, svc *service.Service) error { + jobs, err := listSearchJobs(ctx, db) + if err != nil { + return err + } + + var errs error + for _, job := range jobs { + // Use the initiator as the actor for this operation + ctx = actor.WithActor(ctx, actor.FromUser(job.Initiator)) + + aggStatus, err := getAggregateStatus(ctx, svc, job.ID) + if err != nil { + return err + } + if aggStatus.IsTerminal() { + err = db.WithTransact(ctx, func(tx database.DB) error { + if err := updateSearchJobStatus(ctx, tx, job.ID, aggStatus); err != nil { + return err + } + + if err := uploadLogsToBlobstore(ctx, svc, job.ID); err != nil { + return err + } + + if err := deleteRepoJobs(ctx, tx, job.ID); err != nil { + return err + } + + if err := setJobAsAggregated(ctx, tx, job.ID); err != nil { + return err + } + + return nil + }) + if err != nil { + errs = errors.Append(errs, err) + // best effort cleanup + _ = svc.DeleteJobLogs(ctx, job.ID) + continue + } + } + } + + return errs +} + +func setJobAsAggregated(ctx context.Context, db database.DB, searchJobID int64) error { + q := sqlf.Sprintf("UPDATE exhaustive_search_jobs SET is_aggregated = true WHERE id = %s", searchJobID) + _, err := db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...) + return err +} + +type job struct { + ID int64 + Initiator int32 +} + +// listSearchJobs returns a list of search jobs that haven't been aggregated +// yet. +func listSearchJobs(ctx context.Context, db database.DB) ([]job, error) { + q := sqlf.Sprintf("SELECT id, initiator_id FROM exhaustive_search_jobs WHERE is_aggregated = false") + rows, err := db.QueryContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...) + if err != nil { + return nil, err + } + defer rows.Close() + + var jobs []job + for rows.Next() { + var j job + if err := rows.Scan( + &j.ID, + &j.Initiator, + ); err != nil { + return nil, err + } + jobs = append(jobs, j) + } + + return jobs, nil +} + +func getAggregateStatus(ctx context.Context, service *service.Service, searchJobID int64) (types.JobState, error) { + job, err := service.GetSearchJob(ctx, searchJobID) + if err != nil { + return "", err + } + + return job.AggState, nil +} + +func updateSearchJobStatus(ctx context.Context, db database.DB, searchJobID int64, status types.JobState) error { + q := sqlf.Sprintf("UPDATE exhaustive_search_jobs SET state = %s WHERE id = %s", status.String(), searchJobID) + _, err := db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...) + return err +} + +func uploadLogsToBlobstore(ctx context.Context, svc *service.Service, searchJobID int64) error { + csvWriterTo, err := svc.GetSearchJobLogsWriterTo(ctx, searchJobID) + if err != nil { + return err + } + + r, w := io.Pipe() + + var g conc.WaitGroup + defer g.Wait() + + g.Go(func() { + _, err := csvWriterTo.WriteTo(w) + w.CloseWithError(err) + }) + + _, err = svc.UploadJobLogs(ctx, searchJobID, r) + + return err +} + +func deleteRepoJobs(ctx context.Context, db database.DB, searchJobID int64) error { + q := sqlf.Sprintf("DELETE FROM exhaustive_search_repo_jobs WHERE search_job_id = %s", searchJobID) + _, err := db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...) + return err +} diff --git a/cmd/worker/internal/search/janitor_test.go b/cmd/worker/internal/search/janitor_test.go new file mode 100644 index 00000000000..da7ea0b1e80 --- /dev/null +++ b/cmd/worker/internal/search/janitor_test.go @@ -0,0 +1,150 @@ +package search + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/keegancsmith/sqlf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/dbtest" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/service" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" +) + +func TestJanitor(t *testing.T) { + if testing.Short() { + t.Skip() + } + + ctx, db, exhaustiveStore, svc, bucket := setupTest(t) + + cases := []struct { + name string + cascade storetest.StateCascade + wantLogUploaded bool + wantJobState types.JobState + wantColIsAggregated bool + }{ + { + name: "all jobs completed", + cascade: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{ + types.JobStateCompleted, + }, + RepoRevJobs: []types.JobState{ + types.JobStateCompleted, + types.JobStateCompleted, + }, + }, + wantLogUploaded: true, + wantJobState: types.JobStateCompleted, + wantColIsAggregated: true, + }, + { + name: "1 job failed", + cascade: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{ + types.JobStateCompleted, + }, + RepoRevJobs: []types.JobState{ + types.JobStateFailed, // failed is terminal + types.JobStateCompleted, + }, + }, + wantLogUploaded: true, + wantJobState: types.JobStateFailed, + wantColIsAggregated: true, + }, + { + name: "Still processing, don't update job state", + cascade: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{ + types.JobStateCompleted, + }, + RepoRevJobs: []types.JobState{ + types.JobStateErrored, // errored is not terminal + }, + }, + wantLogUploaded: false, + wantJobState: types.JobStateCompleted, + wantColIsAggregated: false, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + searchJobID := storetest.CreateJobCascade(t, ctx, exhaustiveStore, tt.cascade) + + // Use context.Background() to test if the janitor sets the user context + // correctly + err := runJanitor(context.Background(), db, svc) + require.NoError(t, err) + + j, err := exhaustiveStore.GetExhaustiveSearchJob(ctx, searchJobID) + require.NoError(t, err) + assert.Equal(t, tt.wantJobState, j.State) + + logs, ok := bucket[fmt.Sprintf("log-%d.csv", searchJobID)] + require.Equal(t, tt.wantLogUploaded, ok) + if tt.wantLogUploaded { + require.Equal(t, len(strings.Split(logs, "\n")), len(tt.cascade.RepoRevJobs)+2) // 2 = 1 header + 1 final newline + } + + // Ensure that the repo jobs have been deleted. + wantCount := len(tt.cascade.RepoJobs) + if tt.wantLogUploaded { + wantCount = 0 + } + require.Equal(t, wantCount, countRepoJobs(t, db, searchJobID)) + + // Ensure that the job is marked as aggregated + require.Equal(t, tt.wantColIsAggregated, j.IsAggregated) + }) + } +} + +func countRepoJobs(t *testing.T, db database.DB, searchJobID int64) int { + t.Helper() + + q := sqlf.Sprintf("SELECT COUNT(*) FROM exhaustive_search_repo_jobs WHERE search_job_id = %s", searchJobID) + var count int + err := db.QueryRowContext(context.Background(), q.Query(sqlf.PostgresBindVar), q.Args()...).Scan(&count) + require.NoError(t, err) + return count +} + +func setupTest(t *testing.T) (context.Context, database.DB, *store.Store, *service.Service, map[string]string) { + t.Helper() + + observationCtx := observation.TestContextTB(t) + logger := observationCtx.Logger + + mockUploadStore, bucket := newMockUploadStore(t) + db := database.NewDB(logger, dbtest.NewDB(t)) + stor := store.New(db, observation.TestContextTB(t)) + svc := service.New(observationCtx, stor, mockUploadStore, service.NewSearcherFake()) + + bs := basestore.NewWithHandle(db.Handle()) + userID, err := storetest.CreateUser(bs, "user1") + require.NoError(t, err) + + _, err = storetest.CreateRepo(db, "repo1") + require.NoError(t, err) + + ctx := actor.WithActor(context.Background(), actor.FromUser(userID)) + + return ctx, db, stor, svc, bucket +} diff --git a/cmd/worker/internal/search/job.go b/cmd/worker/internal/search/job.go index 4efc8d174a5..0c638420981 100644 --- a/cmd/worker/internal/search/job.go +++ b/cmd/worker/internal/search/job.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/sourcegraph/sourcegraph/cmd/worker/job" + workerjob "github.com/sourcegraph/sourcegraph/cmd/worker/job" workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db" "github.com/sourcegraph/sourcegraph/internal/actor" "github.com/sourcegraph/sourcegraph/internal/conf" @@ -44,7 +44,7 @@ type searchJob struct { workers []goroutine.BackgroundRoutine } -func NewSearchJob() job.Job { +func NewSearchJob() workerjob.Job { return &searchJob{ config: config{ WorkerInterval: 1 * time.Second, @@ -103,6 +103,8 @@ func (j *searchJob) newSearchJobRoutines( repoWorkerStore := store.NewRepoSearchJobWorkerStore(observationCtx, db.Handle()) revWorkerStore := store.NewRevSearchJobWorkerStore(observationCtx, db.Handle()) + svc := service.New(observationCtx, exhaustiveSearchStore, uploadStore, newSearcher) + j.workerStores = append(j.workerStores, searchWorkerStore, repoWorkerStore, @@ -123,6 +125,8 @@ func (j *searchJob) newSearchJobRoutines( newExhaustiveSearchWorkerResetter(observationCtx, searchWorkerStore), newExhaustiveSearchRepoWorkerResetter(observationCtx, repoWorkerStore), newExhaustiveSearchRepoRevisionWorkerResetter(observationCtx, revWorkerStore), + + newJanitorJob(observationCtx, db, svc), } }) diff --git a/internal/database/schema.json b/internal/database/schema.json index 34db3090ee5..f7370e2c912 100644 --- a/internal/database/schema.json +++ b/internal/database/schema.json @@ -11728,6 +11728,19 @@ "GenerationExpression": "", "Comment": "" }, + { + "Name": "is_aggregated", + "Index": 18, + "TypeName": "boolean", + "IsNullable": false, + "Default": "false", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, { "Name": "last_heartbeat_at", "Index": 11, diff --git a/internal/database/schema.md b/internal/database/schema.md index eb382e0ce8b..d98cdcc89c8 100644 --- a/internal/database/schema.md +++ b/internal/database/schema.md @@ -1587,6 +1587,7 @@ Referenced by: created_at | timestamp with time zone | | not null | now() updated_at | timestamp with time zone | | not null | now() queued_at | timestamp with time zone | | | now() + is_aggregated | boolean | | not null | false Indexes: "exhaustive_search_jobs_pkey" PRIMARY KEY, btree (id) "exhaustive_search_jobs_state" btree (state) diff --git a/internal/search/exhaustive/service/service.go b/internal/search/exhaustive/service/service.go index 9b544b10000..5bc35ae6916 100644 --- a/internal/search/exhaustive/service/service.go +++ b/internal/search/exhaustive/service/service.go @@ -232,6 +232,38 @@ func (s *Service) GetSearchJobLogsWriterTo(parentCtx context.Context, id int64) }), nil } +// getLogKey returns the key for the log that is stored in the blobstore. +func getLogKey(searchJobID int64) string { + return fmt.Sprintf("log-%d.csv", searchJobID) +} + +func (s *Service) UploadJobLogs(ctx context.Context, id int64, r io.Reader) (int64, error) { + // 🚨 SECURITY: only someone with access to the job may upload the logs + if err := s.store.UserHasAccess(ctx, id); err != nil { + return 0, err + } + + return s.uploadStore.Upload(ctx, getLogKey(id), r) +} + +func (s *Service) GetJobLogs(ctx context.Context, id int64) (io.ReadCloser, error) { + // 🚨 SECURITY: only someone with access to the job may download the logs + if err := s.store.UserHasAccess(ctx, id); err != nil { + return nil, err + } + + return s.uploadStore.Get(ctx, getLogKey(id)) +} + +func (s *Service) DeleteJobLogs(ctx context.Context, id int64) error { + // 🚨 SECURITY: only someone with access to the job may delete the logs + if err := s.store.UserHasAccess(ctx, id); err != nil { + return err + } + + return s.uploadStore.Delete(ctx, getLogKey(id)) +} + // JobLogsIterLimit is the number of lines the iterator will read from the // database per page. Assuming 100 bytes per line, this will be ~1MB of memory // per 10k repo-rev jobs. @@ -309,6 +341,9 @@ func (s *Service) DeleteSearchJob(ctx context.Context, id int64) (err error) { return err } + // The log file is not guaranteed to exist, so we ignore the error here. + _ = s.uploadStore.Delete(ctx, getLogKey(id)) + return s.store.DeleteExhaustiveSearchJob(ctx, id) } diff --git a/internal/search/exhaustive/store/BUILD.bazel b/internal/search/exhaustive/store/BUILD.bazel index 7e6e5eeb169..dc4448f9fb3 100644 --- a/internal/search/exhaustive/store/BUILD.bazel +++ b/internal/search/exhaustive/store/BUILD.bazel @@ -35,7 +35,6 @@ go_test( "exhaustive_search_jobs_test.go", "exhaustive_search_repo_jobs_test.go", "exhaustive_search_repo_revision_jobs_test.go", - "store_test.go", ], tags = [ TAG_PLATFORM_SEARCH, @@ -45,17 +44,15 @@ go_test( deps = [ ":store", "//internal/actor", - "//internal/api", "//internal/auth", "//internal/database", "//internal/database/basestore", "//internal/database/dbtest", "//internal/observation", + "//internal/search/exhaustive/store/storetest", "//internal/search/exhaustive/types", - "//internal/types", "//lib/errors", "@com_github_google_go_cmp//cmp", - "@com_github_keegancsmith_sqlf//:sqlf", "@com_github_sourcegraph_log//logtest", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/internal/search/exhaustive/store/exhaustive_search_jobs.go b/internal/search/exhaustive/store/exhaustive_search_jobs.go index 6aadc487b34..7740ec1305a 100644 --- a/internal/search/exhaustive/store/exhaustive_search_jobs.go +++ b/internal/search/exhaustive/store/exhaustive_search_jobs.go @@ -60,6 +60,7 @@ var exhaustiveSearchJobColumns = []*sqlf.Query{ sqlf.Sprintf("cancel"), sqlf.Sprintf("created_at"), sqlf.Sprintf("updated_at"), + sqlf.Sprintf("is_aggregated"), } func (s *Store) CreateExhaustiveSearchJob(ctx context.Context, job types.ExhaustiveSearchJob) (_ int64, err error) { @@ -549,6 +550,7 @@ func defaultScanTargets(job *types.ExhaustiveSearchJob) []any { &job.Cancel, &job.CreatedAt, &job.UpdatedAt, + &job.IsAggregated, } } diff --git a/internal/search/exhaustive/store/exhaustive_search_jobs_test.go b/internal/search/exhaustive/store/exhaustive_search_jobs_test.go index 6d28928466c..a45ea945f9d 100644 --- a/internal/search/exhaustive/store/exhaustive_search_jobs_test.go +++ b/internal/search/exhaustive/store/exhaustive_search_jobs_test.go @@ -6,7 +6,6 @@ import ( "testing" "github.com/google/go-cmp/cmp" - "github.com/keegancsmith/sqlf" "github.com/sourcegraph/log/logtest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,6 +17,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/database/dbtest" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" "github.com/sourcegraph/sourcegraph/lib/errors" ) @@ -32,11 +32,11 @@ func TestStore_CreateExhaustiveSearchJob(t *testing.T) { bs := basestore.NewWithHandle(db.Handle()) - userID, err := createUser(bs, "alice") + userID, err := storetest.CreateUser(bs, "alice") require.NoError(t, err) - malloryID, err := createUser(bs, "mallory") + malloryID, err := storetest.CreateUser(bs, "mallory") require.NoError(t, err) - adminID, err := createUser(bs, "admin") + adminID, err := storetest.CreateUser(bs, "admin") require.NoError(t, err) s := store.New(db, observation.TestContextTB(t)) @@ -140,10 +140,10 @@ func TestStore_GetAndListSearchJobs(t *testing.T) { db := database.NewDB(logger, dbtest.NewDB(t)) bs := basestore.NewWithHandle(db.Handle()) - userID, err := createUser(bs, "alice") + userID, err := storetest.CreateUser(bs, "alice") require.NoError(t, err) - adminID, err := createUser(bs, "admin") + adminID, err := storetest.CreateUser(bs, "admin") require.NoError(t, err) ctx := actor.WithActor(context.Background(), actor.FromUser(userID)) @@ -291,31 +291,31 @@ func TestStore_AggregateStatus(t *testing.T) { db := database.NewDB(logger, dbtest.NewDB(t)) bs := basestore.NewWithHandle(db.Handle()) - _, err := createRepo(db, "repo1") + _, err := storetest.CreateRepo(db, "repo1") require.NoError(t, err) s := store.New(db, observation.TestContextTB(t)) tc := []struct { name string - c stateCascade + c storetest.StateCascade want types.JobState }{ { name: "only repo rev jobs running", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateProcessing}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateProcessing}, }, want: types.JobStateProcessing, }, { name: "processing, because at least 1 job is running", - c: stateCascade{ - searchJob: types.JobStateProcessing, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{ + c: storetest.StateCascade{ + SearchJob: types.JobStateProcessing, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{ types.JobStateProcessing, types.JobStateQueued, types.JobStateCompleted, @@ -325,10 +325,10 @@ func TestStore_AggregateStatus(t *testing.T) { }, { name: "processing, although some jobs failed", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{ + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{ types.JobStateProcessing, types.JobStateFailed, }, @@ -337,52 +337,52 @@ func TestStore_AggregateStatus(t *testing.T) { }, { name: "all jobs finished, at least 1 failed", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateFailed}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateFailed}, }, want: types.JobStateFailed, }, { name: "all jobs finished successfully", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateCompleted}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateCompleted}, }, want: types.JobStateCompleted, }, { name: "search job was canceled, but some jobs haven't stopped yet", - c: stateCascade{ - searchJob: types.JobStateCanceled, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateProcessing, types.JobStateFailed}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCanceled, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateProcessing, types.JobStateFailed}, }, want: types.JobStateCanceled, }, { name: "top-level search job finished, but the other jobs haven't started yet", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateQueued}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateQueued}, }, want: types.JobStateProcessing, }, { name: "no job is processing, some are completed, some are queued", - c: stateCascade{ - searchJob: types.JobStateCompleted, - repoJobs: []types.JobState{types.JobStateCompleted}, - repoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateQueued}, + c: storetest.StateCascade{ + SearchJob: types.JobStateCompleted, + RepoJobs: []types.JobState{types.JobStateCompleted}, + RepoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateQueued}, }, want: types.JobStateProcessing, }, { name: "search job is queued, but no other job has been created yet", - c: stateCascade{ - searchJob: types.JobStateQueued, + c: storetest.StateCascade{ + SearchJob: types.JobStateQueued, }, want: types.JobStateQueued, }, @@ -390,11 +390,11 @@ func TestStore_AggregateStatus(t *testing.T) { for i, tt := range tc { t.Run(tt.name, func(t *testing.T) { - userID, err := createUser(bs, fmt.Sprintf("user_%d", i)) + userID, err := storetest.CreateUser(bs, fmt.Sprintf("user_%d", i)) require.NoError(t, err) ctx := actor.WithActor(context.Background(), actor.FromUser(userID)) - jobID := createJobCascade(t, ctx, s, tt.c) + jobID := storetest.CreateJobCascade(t, ctx, s, tt.c) jobs, err := s.ListExhaustiveSearchJobs(ctx, store.ListArgs{}) require.NoError(t, err) @@ -405,78 +405,4 @@ func TestStore_AggregateStatus(t *testing.T) { } } -// createJobCascade creates a cascade of jobs (1 search job -> n repo jobs -> m -// repo rev jobs) with states as defined in stateCascade. -// -// This is a fairly large test helper, because don't want to start the worker -// routines, but instead we want to create a snapshot of the state of the jobs -// at a given point in time. -func createJobCascade( - t *testing.T, - ctx context.Context, - stor *store.Store, - casc stateCascade, -) (searchJobID int64) { - t.Helper() - - searchJob := types.ExhaustiveSearchJob{ - InitiatorID: actor.FromContext(ctx).UID, - Query: "repo:job1", - WorkerJob: types.WorkerJob{State: casc.searchJob}, - } - - repoJobs := make([]types.ExhaustiveSearchRepoJob, len(casc.repoJobs)) - for i, r := range casc.repoJobs { - repoJobs[i] = types.ExhaustiveSearchRepoJob{ - WorkerJob: types.WorkerJob{State: r}, - RepoID: 1, // same repo for all tests - RefSpec: "HEAD", - } - } - - repoRevJobs := make([]types.ExhaustiveSearchRepoRevisionJob, len(casc.repoRevJobs)) - for i, rr := range casc.repoRevJobs { - repoRevJobs[i] = types.ExhaustiveSearchRepoRevisionJob{ - WorkerJob: types.WorkerJob{State: rr}, - Revision: "HEAD", - } - } - - jobID, err := stor.CreateExhaustiveSearchJob(ctx, searchJob) - require.NoError(t, err) - assert.NotZero(t, jobID) - - err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_jobs SET state = %s WHERE id = %s", casc.searchJob, jobID)) - require.NoError(t, err) - - for i, r := range repoJobs { - r.SearchJobID = jobID - repoJobID, err := stor.CreateExhaustiveSearchRepoJob(ctx, r) - require.NoError(t, err) - assert.NotZero(t, repoJobID) - - err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_jobs SET state = %s WHERE id = %s", casc.repoJobs[i], repoJobID)) - require.NoError(t, err) - - for j, rr := range repoRevJobs { - rr.SearchRepoJobID = repoJobID - repoRevJobID, err := stor.CreateExhaustiveSearchRepoRevisionJob(ctx, rr) - require.NoError(t, err) - assert.NotZero(t, repoRevJobID) - require.NoError(t, err) - - err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_revision_jobs SET state = %s WHERE id = %s", casc.repoRevJobs[j], repoRevJobID)) - require.NoError(t, err) - } - } - - return jobID -} - -type stateCascade struct { - searchJob types.JobState - repoJobs []types.JobState - repoRevJobs []types.JobState -} - func intptr(s int) *int { return &s } diff --git a/internal/search/exhaustive/store/exhaustive_search_repo_jobs_test.go b/internal/search/exhaustive/store/exhaustive_search_repo_jobs_test.go index a62b24f1870..67c69b6b4cf 100644 --- a/internal/search/exhaustive/store/exhaustive_search_repo_jobs_test.go +++ b/internal/search/exhaustive/store/exhaustive_search_repo_jobs_test.go @@ -14,6 +14,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/database/dbtest" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" "github.com/sourcegraph/sourcegraph/lib/errors" ) @@ -28,9 +29,9 @@ func TestStore_CreateExhaustiveSearchRepoJob(t *testing.T) { bs := basestore.NewWithHandle(db.Handle()) - userID, err := createUser(bs, "alice") + userID, err := storetest.CreateUser(bs, "alice") require.NoError(t, err) - repoID, err := createRepo(db, "repo-test") + repoID, err := storetest.CreateRepo(db, "repo-test") require.NoError(t, err) ctx := actor.WithActor(context.Background(), &actor.Actor{ diff --git a/internal/search/exhaustive/store/exhaustive_search_repo_revision_jobs_test.go b/internal/search/exhaustive/store/exhaustive_search_repo_revision_jobs_test.go index 2280a64eebd..cffd66538ff 100644 --- a/internal/search/exhaustive/store/exhaustive_search_repo_revision_jobs_test.go +++ b/internal/search/exhaustive/store/exhaustive_search_repo_revision_jobs_test.go @@ -14,6 +14,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/database/dbtest" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest" "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" "github.com/sourcegraph/sourcegraph/lib/errors" ) @@ -28,9 +29,9 @@ func TestStore_CreateExhaustiveSearchRepoRevisionJob(t *testing.T) { bs := basestore.NewWithHandle(db.Handle()) - userID, err := createUser(bs, "alice") + userID, err := storetest.CreateUser(bs, "alice") require.NoError(t, err) - repoID, err := createRepo(db, "repo-test") + repoID, err := storetest.CreateRepo(db, "repo-test") require.NoError(t, err) ctx := actor.WithActor(context.Background(), &actor.Actor{ diff --git a/internal/search/exhaustive/store/store_test.go b/internal/search/exhaustive/store/store_test.go deleted file mode 100644 index bc8001de1a2..00000000000 --- a/internal/search/exhaustive/store/store_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package store_test - -import ( - "context" - - "github.com/keegancsmith/sqlf" - - "github.com/sourcegraph/sourcegraph/internal/api" - "github.com/sourcegraph/sourcegraph/internal/database" - "github.com/sourcegraph/sourcegraph/internal/database/basestore" - "github.com/sourcegraph/sourcegraph/internal/types" -) - -func createUser(store *basestore.Store, username string) (int32, error) { - admin := username == "admin" - q := sqlf.Sprintf(`INSERT INTO users(username, site_admin) VALUES(%s, %s) RETURNING id`, username, admin) - return basestore.ScanAny[int32](store.QueryRow(context.Background(), q)) -} - -func createRepo(db database.DB, name string) (api.RepoID, error) { - repoStore := db.Repos() - repo := types.Repo{Name: api.RepoName(name)} - err := repoStore.Create(context.Background(), &repo) - return repo.ID, err -} diff --git a/internal/search/exhaustive/store/storetest/BUILD.bazel b/internal/search/exhaustive/store/storetest/BUILD.bazel new file mode 100644 index 00000000000..97fa623dbae --- /dev/null +++ b/internal/search/exhaustive/store/storetest/BUILD.bazel @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "storetest", + srcs = ["store.go"], + importpath = "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest", + visibility = ["//:__subpackages__"], + deps = [ + "//internal/actor", + "//internal/api", + "//internal/database", + "//internal/database/basestore", + "//internal/search/exhaustive/store", + "//internal/search/exhaustive/types", + "//internal/types", + "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/internal/search/exhaustive/store/storetest/store.go b/internal/search/exhaustive/store/storetest/store.go new file mode 100644 index 00000000000..9b0d001740b --- /dev/null +++ b/internal/search/exhaustive/store/storetest/store.go @@ -0,0 +1,106 @@ +package storetest + +import ( + "context" + "fmt" + "testing" + + "github.com/keegancsmith/sqlf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store" + "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types" + internaltypes "github.com/sourcegraph/sourcegraph/internal/types" +) + +// CreateJobCascade creates a cascade of jobs (1 search job -> n repo jobs -> m +// repo rev jobs) with states as defined in StateCascade. +// +// This is a fairly large test helper, because don't want to start the worker +// routines, but instead we want to create a snapshot of the state of the jobs +// at a given point in time. +func CreateJobCascade( + t *testing.T, + ctx context.Context, + stor *store.Store, + casc StateCascade, +) (searchJobID int64) { + t.Helper() + + searchJob := types.ExhaustiveSearchJob{ + InitiatorID: actor.FromContext(ctx).UID, + Query: "repo:job1", + WorkerJob: types.WorkerJob{State: casc.SearchJob}, + } + + repoJobs := make([]types.ExhaustiveSearchRepoJob, len(casc.RepoJobs)) + for i, r := range casc.RepoJobs { + repoJobs[i] = types.ExhaustiveSearchRepoJob{ + WorkerJob: types.WorkerJob{State: r}, + RepoID: 1, // same repo for all tests + RefSpec: "HEAD", + } + } + + repoRevJobs := make([]types.ExhaustiveSearchRepoRevisionJob, len(casc.RepoRevJobs)) + for i, rr := range casc.RepoRevJobs { + repoRevJobs[i] = types.ExhaustiveSearchRepoRevisionJob{ + WorkerJob: types.WorkerJob{State: rr, FailureMessage: fmt.Sprintf("repoRevJob-%d", i)}, + Revision: "HEAD", + } + } + + jobID, err := stor.CreateExhaustiveSearchJob(ctx, searchJob) + require.NoError(t, err) + assert.NotZero(t, jobID) + + err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_jobs SET state = %s WHERE id = %s", casc.SearchJob, jobID)) + require.NoError(t, err) + + for i, r := range repoJobs { + r.SearchJobID = jobID + repoJobID, err := stor.CreateExhaustiveSearchRepoJob(ctx, r) + require.NoError(t, err) + assert.NotZero(t, repoJobID) + + err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_jobs SET state = %s WHERE id = %s", casc.RepoJobs[i], repoJobID)) + require.NoError(t, err) + + for j, rr := range repoRevJobs { + rr.SearchRepoJobID = repoJobID + repoRevJobID, err := stor.CreateExhaustiveSearchRepoRevisionJob(ctx, rr) + require.NoError(t, err) + assert.NotZero(t, repoRevJobID) + require.NoError(t, err) + + err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_revision_jobs SET state = %s, failure_message = %s WHERE id = %s", casc.RepoRevJobs[j], repoRevJobs[j].FailureMessage, repoRevJobID)) + require.NoError(t, err) + } + } + + return jobID +} + +type StateCascade struct { + SearchJob types.JobState + RepoJobs []types.JobState + RepoRevJobs []types.JobState +} + +func CreateRepo(db database.DB, name string) (api.RepoID, error) { + repoStore := db.Repos() + repo := internaltypes.Repo{Name: api.RepoName(name)} + err := repoStore.Create(context.Background(), &repo) + return repo.ID, err +} + +func CreateUser(store *basestore.Store, username string) (int32, error) { + admin := username == "admin" + q := sqlf.Sprintf(`INSERT INTO users(username, site_admin) VALUES(%s, %s) RETURNING id`, username, admin) + return basestore.ScanAny[int32](store.QueryRow(context.Background(), q)) +} diff --git a/internal/search/exhaustive/types/exhaustive_search_job.go b/internal/search/exhaustive/types/exhaustive_search_job.go index ed2f8434659..d858bdf6f8d 100644 --- a/internal/search/exhaustive/types/exhaustive_search_job.go +++ b/internal/search/exhaustive/types/exhaustive_search_job.go @@ -25,6 +25,10 @@ type ExhaustiveSearchJob struct { // from ListSearchJobs. This state is different from WorkerJob.State, because it // reflects the combined state of all jobs created as part of the search job. AggState JobState + + // Set to true by the janitor job if it has processed the job. This is used to + // avoid aggregating the same job multiple times. + IsAggregated bool } func (j *ExhaustiveSearchJob) RecordID() int { diff --git a/internal/search/exhaustive/types/worker.go b/internal/search/exhaustive/types/worker.go index aa1cefcee1d..b86677b7d62 100644 --- a/internal/search/exhaustive/types/worker.go +++ b/internal/search/exhaustive/types/worker.go @@ -34,3 +34,14 @@ const ( // ToGraphQL returns the GraphQL representation of the worker state. func (s JobState) ToGraphQL() string { return strings.ToUpper(string(s)) } + +func (s JobState) String() string { return strings.ToLower(string(s)) } + +func (s JobState) IsTerminal() bool { + switch s { + case JobStateCompleted, JobStateFailed, JobStateCanceled: + return true + default: + return false + } +} diff --git a/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/down.sql b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/down.sql new file mode 100644 index 00000000000..750639db07d --- /dev/null +++ b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/down.sql @@ -0,0 +1 @@ +ALTER TABLE exhaustive_search_jobs DROP COLUMN IF EXISTS is_aggregated; diff --git a/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/metadata.yaml b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/metadata.yaml new file mode 100644 index 00000000000..2f15f719ea1 --- /dev/null +++ b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/metadata.yaml @@ -0,0 +1,2 @@ +name: exhaustive search jobs add column +parents: [1721814902] diff --git a/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/up.sql b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/up.sql new file mode 100644 index 00000000000..3eab576b72b --- /dev/null +++ b/migrations/frontend/1722348497_exhaustive_search_jobs_add_column/up.sql @@ -0,0 +1 @@ +ALTER TABLE exhaustive_search_jobs ADD COLUMN IF NOT EXISTS is_aggregated boolean NOT NULL DEFAULT false; diff --git a/migrations/frontend/squashed.sql b/migrations/frontend/squashed.sql index 5e9e1c9706a..c44de16fe64 100644 --- a/migrations/frontend/squashed.sql +++ b/migrations/frontend/squashed.sql @@ -2453,7 +2453,8 @@ CREATE TABLE exhaustive_search_jobs ( cancel boolean DEFAULT false NOT NULL, created_at timestamp with time zone DEFAULT now() NOT NULL, updated_at timestamp with time zone DEFAULT now() NOT NULL, - queued_at timestamp with time zone DEFAULT now() + queued_at timestamp with time zone DEFAULT now(), + is_aggregated boolean DEFAULT false NOT NULL ); CREATE SEQUENCE exhaustive_search_jobs_id_seq