chore(search_jobs): add janitor job (#64186)

Fixes SPLF-119

This adds a background job to Search Jobs that periodically scans for
finished jobs to aggregate the status, upload logs, and clean up the
tables. This drastically reduces the size of the tables and improves the
performance of the Search Jobs GQL API.

For example, with this change, a finished search job on .com only has 1
entry in the database, whereas before it could have several millions if
we searched each repository.

Notes:
- the diff seems larger than it actually is. I left a couple of comments
to help the reviewers.

## Test plan:
- new unit tests
- manual testing:

I ran a couple of search jobs locally (with the janitor job interval set
to 1 min) and checked that
 - logs are uploaded to `blobstore-go/buckets/search-jobs`
 - repo jobs are deleted from `exhaustive_repo_jobs`
 - logs are served from the blobstore after the janitor ran
 - downloading logs while the job is running still works

## Changelog
The new background job drastically reduces the size of the
`exhaustive_*` tables and improves performance of the Search Jobs GQL
API.
This commit is contained in:
Stefan Hengl 2024-08-01 15:29:10 +02:00 committed by GitHub
parent c966d942f7
commit cd38adb4a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 624 additions and 161 deletions

View File

@ -1,6 +1,7 @@
package httpapi
import (
"context"
"fmt"
"io"
"net/http"
@ -8,7 +9,6 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/auth"
@ -18,7 +18,7 @@ import (
)
// search-jobs_<job-id>_2020-07-01_150405
func filenamePrefix(jobID int) string {
func filenamePrefix(jobID int64) string {
return fmt.Sprintf("search-jobs_%d_%s", jobID, time.Now().Format("2006-01-02_150405"))
}
@ -27,7 +27,7 @@ func ServeSearchJobDownload(logger log.Logger, svc *service.Service) http.Handle
return func(w http.ResponseWriter, r *http.Request) {
jobIDStr := mux.Vars(r)["id"]
jobID, err := strconv.Atoi(jobIDStr)
jobID, err := strconv.ParseInt(jobIDStr, 10, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
@ -40,7 +40,7 @@ func ServeSearchJobDownload(logger log.Logger, svc *service.Service) http.Handle
}
filename := filenamePrefix(jobID) + ".jsonl"
writeJSON(logger.With(log.Int("jobID", jobID)), w, filename, writerTo)
writeJSON(logger.With(log.Int64("jobID", jobID)), w, filename, writerTo)
}
}
@ -48,21 +48,58 @@ func ServeSearchJobLogs(logger log.Logger, svc *service.Service) http.HandlerFun
logger = logger.With(log.String("handler", "ServeSearchJobLogs"))
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
jobIDStr := mux.Vars(r)["id"]
jobID, err := strconv.Atoi(jobIDStr)
jobID, err := strconv.ParseInt(jobIDStr, 10, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
csvWriterTo, err := svc.GetSearchJobLogsWriterTo(r.Context(), int64(jobID))
filename := filenamePrefix(jobID) + ".log.csv"
// Jobs in a terminal state are aggregated. As part of the aggregation, the logs
// are stored in the blobstore. If the job is not in a terminal state, the logs
// are still in the database. It's possible that this call races with the
// aggregation process, but the chances are slim and the user can always retry
// to download the logs.
job, err := svc.GetSearchJob(ctx, jobID)
if err != nil {
httpError(w, err)
return
}
if job.IsAggregated {
serveLogFromBlobstore(ctx, logger, svc, filename, jobID, w)
} else {
serveLogFromDB(ctx, logger, svc, filename, jobID, w)
}
}
}
filename := filenamePrefix(jobID) + ".log.csv"
writeCSV(logger.With(log.Int("jobID", jobID)), w, filename, csvWriterTo)
func serveLogFromDB(ctx context.Context, logger log.Logger, svc *service.Service, filename string, jobID int64, w http.ResponseWriter) {
csvWriterTo, err := svc.GetSearchJobLogsWriterTo(ctx, jobID)
if err != nil {
httpError(w, err)
return
}
writeCSV(logger.With(log.Int64("jobID", jobID)), w, filename, csvWriterTo)
}
func serveLogFromBlobstore(ctx context.Context, logger log.Logger, svc *service.Service, filenameNoQuotes string, jobID int64, w http.ResponseWriter) {
rc, err := svc.GetJobLogs(ctx, jobID)
if err != nil {
httpError(w, err)
return
}
defer rc.Close()
w.Header().Set("Content-Type", "text/csv")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filenameNoQuotes))
w.WriteHeader(200)
n, err := io.Copy(w, rc)
if err != nil {
logger.Warn("failed while writing search job csv response", log.String("filename", filenameNoQuotes), log.Int64("bytesWritten", n), log.Error(err))
}
}

View File

@ -7,6 +7,7 @@ go_library(
"exhaustive_search.go",
"exhaustive_search_repo.go",
"exhaustive_search_repo_revision.go",
"janitor.go",
"job.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/worker/internal/search",
@ -21,6 +22,7 @@ go_library(
"//internal/env",
"//internal/gitserver",
"//internal/goroutine",
"//internal/metrics",
"//internal/object",
"//internal/observation",
"//internal/search",
@ -33,6 +35,8 @@ go_library(
"//internal/workerutil/dbworker",
"//internal/workerutil/dbworker/store",
"//lib/errors",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_sourcegraph_conc//:conc",
"@com_github_sourcegraph_log//:log",
],
)
@ -41,6 +45,7 @@ go_test(
name = "search_test",
srcs = [
"exhaustive_search_test.go",
"janitor_test.go",
"job_test.go",
],
embed = [":search"],
@ -60,10 +65,12 @@ go_test(
"//internal/observation",
"//internal/search/exhaustive/service",
"//internal/search/exhaustive/store",
"//internal/search/exhaustive/store/storetest",
"//internal/search/exhaustive/types",
"//lib/iterator",
"//schema",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

View File

@ -0,0 +1,168 @@
package search
import (
"context"
"io"
"time"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/conc"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/metrics"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/service"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func newJanitorJob(observationCtx *observation.Context, db database.DB, svc *service.Service) goroutine.BackgroundRoutine {
handler := goroutine.HandlerFunc(func(ctx context.Context) error {
return runJanitor(ctx, db, svc)
})
operation := observationCtx.Operation(observation.Op{
Name: "search.jobs.janitor",
Metrics: metrics.NewREDMetrics(
observationCtx.Registerer,
"search_jobs_janitor",
metrics.WithCountHelp("Total number of search_jobs_janitor executions"),
),
})
return goroutine.NewPeriodicGoroutine(
context.Background(),
handler,
goroutine.WithName("search_jobs_janitor"),
goroutine.WithDescription("refresh analytics cache"),
goroutine.WithInterval(8*time.Hour),
goroutine.WithOperation(operation),
)
}
func runJanitor(ctx context.Context, db database.DB, svc *service.Service) error {
jobs, err := listSearchJobs(ctx, db)
if err != nil {
return err
}
var errs error
for _, job := range jobs {
// Use the initiator as the actor for this operation
ctx = actor.WithActor(ctx, actor.FromUser(job.Initiator))
aggStatus, err := getAggregateStatus(ctx, svc, job.ID)
if err != nil {
return err
}
if aggStatus.IsTerminal() {
err = db.WithTransact(ctx, func(tx database.DB) error {
if err := updateSearchJobStatus(ctx, tx, job.ID, aggStatus); err != nil {
return err
}
if err := uploadLogsToBlobstore(ctx, svc, job.ID); err != nil {
return err
}
if err := deleteRepoJobs(ctx, tx, job.ID); err != nil {
return err
}
if err := setJobAsAggregated(ctx, tx, job.ID); err != nil {
return err
}
return nil
})
if err != nil {
errs = errors.Append(errs, err)
// best effort cleanup
_ = svc.DeleteJobLogs(ctx, job.ID)
continue
}
}
}
return errs
}
func setJobAsAggregated(ctx context.Context, db database.DB, searchJobID int64) error {
q := sqlf.Sprintf("UPDATE exhaustive_search_jobs SET is_aggregated = true WHERE id = %s", searchJobID)
_, err := db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...)
return err
}
type job struct {
ID int64
Initiator int32
}
// listSearchJobs returns a list of search jobs that haven't been aggregated
// yet.
func listSearchJobs(ctx context.Context, db database.DB) ([]job, error) {
q := sqlf.Sprintf("SELECT id, initiator_id FROM exhaustive_search_jobs WHERE is_aggregated = false")
rows, err := db.QueryContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...)
if err != nil {
return nil, err
}
defer rows.Close()
var jobs []job
for rows.Next() {
var j job
if err := rows.Scan(
&j.ID,
&j.Initiator,
); err != nil {
return nil, err
}
jobs = append(jobs, j)
}
return jobs, nil
}
func getAggregateStatus(ctx context.Context, service *service.Service, searchJobID int64) (types.JobState, error) {
job, err := service.GetSearchJob(ctx, searchJobID)
if err != nil {
return "", err
}
return job.AggState, nil
}
func updateSearchJobStatus(ctx context.Context, db database.DB, searchJobID int64, status types.JobState) error {
q := sqlf.Sprintf("UPDATE exhaustive_search_jobs SET state = %s WHERE id = %s", status.String(), searchJobID)
_, err := db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...)
return err
}
func uploadLogsToBlobstore(ctx context.Context, svc *service.Service, searchJobID int64) error {
csvWriterTo, err := svc.GetSearchJobLogsWriterTo(ctx, searchJobID)
if err != nil {
return err
}
r, w := io.Pipe()
var g conc.WaitGroup
defer g.Wait()
g.Go(func() {
_, err := csvWriterTo.WriteTo(w)
w.CloseWithError(err)
})
_, err = svc.UploadJobLogs(ctx, searchJobID, r)
return err
}
func deleteRepoJobs(ctx context.Context, db database.DB, searchJobID int64) error {
q := sqlf.Sprintf("DELETE FROM exhaustive_search_repo_jobs WHERE search_job_id = %s", searchJobID)
_, err := db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...)
return err
}

View File

@ -0,0 +1,150 @@
package search
import (
"context"
"fmt"
"strings"
"testing"
"github.com/keegancsmith/sqlf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/service"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types"
)
func TestJanitor(t *testing.T) {
if testing.Short() {
t.Skip()
}
ctx, db, exhaustiveStore, svc, bucket := setupTest(t)
cases := []struct {
name string
cascade storetest.StateCascade
wantLogUploaded bool
wantJobState types.JobState
wantColIsAggregated bool
}{
{
name: "all jobs completed",
cascade: storetest.StateCascade{
SearchJob: types.JobStateCompleted,
RepoJobs: []types.JobState{
types.JobStateCompleted,
},
RepoRevJobs: []types.JobState{
types.JobStateCompleted,
types.JobStateCompleted,
},
},
wantLogUploaded: true,
wantJobState: types.JobStateCompleted,
wantColIsAggregated: true,
},
{
name: "1 job failed",
cascade: storetest.StateCascade{
SearchJob: types.JobStateCompleted,
RepoJobs: []types.JobState{
types.JobStateCompleted,
},
RepoRevJobs: []types.JobState{
types.JobStateFailed, // failed is terminal
types.JobStateCompleted,
},
},
wantLogUploaded: true,
wantJobState: types.JobStateFailed,
wantColIsAggregated: true,
},
{
name: "Still processing, don't update job state",
cascade: storetest.StateCascade{
SearchJob: types.JobStateCompleted,
RepoJobs: []types.JobState{
types.JobStateCompleted,
},
RepoRevJobs: []types.JobState{
types.JobStateErrored, // errored is not terminal
},
},
wantLogUploaded: false,
wantJobState: types.JobStateCompleted,
wantColIsAggregated: false,
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
searchJobID := storetest.CreateJobCascade(t, ctx, exhaustiveStore, tt.cascade)
// Use context.Background() to test if the janitor sets the user context
// correctly
err := runJanitor(context.Background(), db, svc)
require.NoError(t, err)
j, err := exhaustiveStore.GetExhaustiveSearchJob(ctx, searchJobID)
require.NoError(t, err)
assert.Equal(t, tt.wantJobState, j.State)
logs, ok := bucket[fmt.Sprintf("log-%d.csv", searchJobID)]
require.Equal(t, tt.wantLogUploaded, ok)
if tt.wantLogUploaded {
require.Equal(t, len(strings.Split(logs, "\n")), len(tt.cascade.RepoRevJobs)+2) // 2 = 1 header + 1 final newline
}
// Ensure that the repo jobs have been deleted.
wantCount := len(tt.cascade.RepoJobs)
if tt.wantLogUploaded {
wantCount = 0
}
require.Equal(t, wantCount, countRepoJobs(t, db, searchJobID))
// Ensure that the job is marked as aggregated
require.Equal(t, tt.wantColIsAggregated, j.IsAggregated)
})
}
}
func countRepoJobs(t *testing.T, db database.DB, searchJobID int64) int {
t.Helper()
q := sqlf.Sprintf("SELECT COUNT(*) FROM exhaustive_search_repo_jobs WHERE search_job_id = %s", searchJobID)
var count int
err := db.QueryRowContext(context.Background(), q.Query(sqlf.PostgresBindVar), q.Args()...).Scan(&count)
require.NoError(t, err)
return count
}
func setupTest(t *testing.T) (context.Context, database.DB, *store.Store, *service.Service, map[string]string) {
t.Helper()
observationCtx := observation.TestContextTB(t)
logger := observationCtx.Logger
mockUploadStore, bucket := newMockUploadStore(t)
db := database.NewDB(logger, dbtest.NewDB(t))
stor := store.New(db, observation.TestContextTB(t))
svc := service.New(observationCtx, stor, mockUploadStore, service.NewSearcherFake())
bs := basestore.NewWithHandle(db.Handle())
userID, err := storetest.CreateUser(bs, "user1")
require.NoError(t, err)
_, err = storetest.CreateRepo(db, "repo1")
require.NoError(t, err)
ctx := actor.WithActor(context.Background(), actor.FromUser(userID))
return ctx, db, stor, svc, bucket
}

View File

@ -5,7 +5,7 @@ import (
"sync"
"time"
"github.com/sourcegraph/sourcegraph/cmd/worker/job"
workerjob "github.com/sourcegraph/sourcegraph/cmd/worker/job"
workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/conf"
@ -44,7 +44,7 @@ type searchJob struct {
workers []goroutine.BackgroundRoutine
}
func NewSearchJob() job.Job {
func NewSearchJob() workerjob.Job {
return &searchJob{
config: config{
WorkerInterval: 1 * time.Second,
@ -103,6 +103,8 @@ func (j *searchJob) newSearchJobRoutines(
repoWorkerStore := store.NewRepoSearchJobWorkerStore(observationCtx, db.Handle())
revWorkerStore := store.NewRevSearchJobWorkerStore(observationCtx, db.Handle())
svc := service.New(observationCtx, exhaustiveSearchStore, uploadStore, newSearcher)
j.workerStores = append(j.workerStores,
searchWorkerStore,
repoWorkerStore,
@ -123,6 +125,8 @@ func (j *searchJob) newSearchJobRoutines(
newExhaustiveSearchWorkerResetter(observationCtx, searchWorkerStore),
newExhaustiveSearchRepoWorkerResetter(observationCtx, repoWorkerStore),
newExhaustiveSearchRepoRevisionWorkerResetter(observationCtx, revWorkerStore),
newJanitorJob(observationCtx, db, svc),
}
})

View File

@ -11728,6 +11728,19 @@
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "is_aggregated",
"Index": 18,
"TypeName": "boolean",
"IsNullable": false,
"Default": "false",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "last_heartbeat_at",
"Index": 11,

View File

@ -1587,6 +1587,7 @@ Referenced by:
created_at | timestamp with time zone | | not null | now()
updated_at | timestamp with time zone | | not null | now()
queued_at | timestamp with time zone | | | now()
is_aggregated | boolean | | not null | false
Indexes:
"exhaustive_search_jobs_pkey" PRIMARY KEY, btree (id)
"exhaustive_search_jobs_state" btree (state)

View File

@ -232,6 +232,38 @@ func (s *Service) GetSearchJobLogsWriterTo(parentCtx context.Context, id int64)
}), nil
}
// getLogKey returns the key for the log that is stored in the blobstore.
func getLogKey(searchJobID int64) string {
return fmt.Sprintf("log-%d.csv", searchJobID)
}
func (s *Service) UploadJobLogs(ctx context.Context, id int64, r io.Reader) (int64, error) {
// 🚨 SECURITY: only someone with access to the job may upload the logs
if err := s.store.UserHasAccess(ctx, id); err != nil {
return 0, err
}
return s.uploadStore.Upload(ctx, getLogKey(id), r)
}
func (s *Service) GetJobLogs(ctx context.Context, id int64) (io.ReadCloser, error) {
// 🚨 SECURITY: only someone with access to the job may download the logs
if err := s.store.UserHasAccess(ctx, id); err != nil {
return nil, err
}
return s.uploadStore.Get(ctx, getLogKey(id))
}
func (s *Service) DeleteJobLogs(ctx context.Context, id int64) error {
// 🚨 SECURITY: only someone with access to the job may delete the logs
if err := s.store.UserHasAccess(ctx, id); err != nil {
return err
}
return s.uploadStore.Delete(ctx, getLogKey(id))
}
// JobLogsIterLimit is the number of lines the iterator will read from the
// database per page. Assuming 100 bytes per line, this will be ~1MB of memory
// per 10k repo-rev jobs.
@ -309,6 +341,9 @@ func (s *Service) DeleteSearchJob(ctx context.Context, id int64) (err error) {
return err
}
// The log file is not guaranteed to exist, so we ignore the error here.
_ = s.uploadStore.Delete(ctx, getLogKey(id))
return s.store.DeleteExhaustiveSearchJob(ctx, id)
}

View File

@ -35,7 +35,6 @@ go_test(
"exhaustive_search_jobs_test.go",
"exhaustive_search_repo_jobs_test.go",
"exhaustive_search_repo_revision_jobs_test.go",
"store_test.go",
],
tags = [
TAG_PLATFORM_SEARCH,
@ -45,17 +44,15 @@ go_test(
deps = [
":store",
"//internal/actor",
"//internal/api",
"//internal/auth",
"//internal/database",
"//internal/database/basestore",
"//internal/database/dbtest",
"//internal/observation",
"//internal/search/exhaustive/store/storetest",
"//internal/search/exhaustive/types",
"//internal/types",
"//lib/errors",
"@com_github_google_go_cmp//cmp",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",

View File

@ -60,6 +60,7 @@ var exhaustiveSearchJobColumns = []*sqlf.Query{
sqlf.Sprintf("cancel"),
sqlf.Sprintf("created_at"),
sqlf.Sprintf("updated_at"),
sqlf.Sprintf("is_aggregated"),
}
func (s *Store) CreateExhaustiveSearchJob(ctx context.Context, job types.ExhaustiveSearchJob) (_ int64, err error) {
@ -549,6 +550,7 @@ func defaultScanTargets(job *types.ExhaustiveSearchJob) []any {
&job.Cancel,
&job.CreatedAt,
&job.UpdatedAt,
&job.IsAggregated,
}
}

View File

@ -6,7 +6,6 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/log/logtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -18,6 +17,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -32,11 +32,11 @@ func TestStore_CreateExhaustiveSearchJob(t *testing.T) {
bs := basestore.NewWithHandle(db.Handle())
userID, err := createUser(bs, "alice")
userID, err := storetest.CreateUser(bs, "alice")
require.NoError(t, err)
malloryID, err := createUser(bs, "mallory")
malloryID, err := storetest.CreateUser(bs, "mallory")
require.NoError(t, err)
adminID, err := createUser(bs, "admin")
adminID, err := storetest.CreateUser(bs, "admin")
require.NoError(t, err)
s := store.New(db, observation.TestContextTB(t))
@ -140,10 +140,10 @@ func TestStore_GetAndListSearchJobs(t *testing.T) {
db := database.NewDB(logger, dbtest.NewDB(t))
bs := basestore.NewWithHandle(db.Handle())
userID, err := createUser(bs, "alice")
userID, err := storetest.CreateUser(bs, "alice")
require.NoError(t, err)
adminID, err := createUser(bs, "admin")
adminID, err := storetest.CreateUser(bs, "admin")
require.NoError(t, err)
ctx := actor.WithActor(context.Background(), actor.FromUser(userID))
@ -291,31 +291,31 @@ func TestStore_AggregateStatus(t *testing.T) {
db := database.NewDB(logger, dbtest.NewDB(t))
bs := basestore.NewWithHandle(db.Handle())
_, err := createRepo(db, "repo1")
_, err := storetest.CreateRepo(db, "repo1")
require.NoError(t, err)
s := store.New(db, observation.TestContextTB(t))
tc := []struct {
name string
c stateCascade
c storetest.StateCascade
want types.JobState
}{
{
name: "only repo rev jobs running",
c: stateCascade{
searchJob: types.JobStateCompleted,
repoJobs: []types.JobState{types.JobStateCompleted},
repoRevJobs: []types.JobState{types.JobStateProcessing},
c: storetest.StateCascade{
SearchJob: types.JobStateCompleted,
RepoJobs: []types.JobState{types.JobStateCompleted},
RepoRevJobs: []types.JobState{types.JobStateProcessing},
},
want: types.JobStateProcessing,
},
{
name: "processing, because at least 1 job is running",
c: stateCascade{
searchJob: types.JobStateProcessing,
repoJobs: []types.JobState{types.JobStateCompleted},
repoRevJobs: []types.JobState{
c: storetest.StateCascade{
SearchJob: types.JobStateProcessing,
RepoJobs: []types.JobState{types.JobStateCompleted},
RepoRevJobs: []types.JobState{
types.JobStateProcessing,
types.JobStateQueued,
types.JobStateCompleted,
@ -325,10 +325,10 @@ func TestStore_AggregateStatus(t *testing.T) {
},
{
name: "processing, although some jobs failed",
c: stateCascade{
searchJob: types.JobStateCompleted,
repoJobs: []types.JobState{types.JobStateCompleted},
repoRevJobs: []types.JobState{
c: storetest.StateCascade{
SearchJob: types.JobStateCompleted,
RepoJobs: []types.JobState{types.JobStateCompleted},
RepoRevJobs: []types.JobState{
types.JobStateProcessing,
types.JobStateFailed,
},
@ -337,52 +337,52 @@ func TestStore_AggregateStatus(t *testing.T) {
},
{
name: "all jobs finished, at least 1 failed",
c: stateCascade{
searchJob: types.JobStateCompleted,
repoJobs: []types.JobState{types.JobStateCompleted},
repoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateFailed},
c: storetest.StateCascade{
SearchJob: types.JobStateCompleted,
RepoJobs: []types.JobState{types.JobStateCompleted},
RepoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateFailed},
},
want: types.JobStateFailed,
},
{
name: "all jobs finished successfully",
c: stateCascade{
searchJob: types.JobStateCompleted,
repoJobs: []types.JobState{types.JobStateCompleted},
repoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateCompleted},
c: storetest.StateCascade{
SearchJob: types.JobStateCompleted,
RepoJobs: []types.JobState{types.JobStateCompleted},
RepoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateCompleted},
},
want: types.JobStateCompleted,
},
{
name: "search job was canceled, but some jobs haven't stopped yet",
c: stateCascade{
searchJob: types.JobStateCanceled,
repoJobs: []types.JobState{types.JobStateCompleted},
repoRevJobs: []types.JobState{types.JobStateProcessing, types.JobStateFailed},
c: storetest.StateCascade{
SearchJob: types.JobStateCanceled,
RepoJobs: []types.JobState{types.JobStateCompleted},
RepoRevJobs: []types.JobState{types.JobStateProcessing, types.JobStateFailed},
},
want: types.JobStateCanceled,
},
{
name: "top-level search job finished, but the other jobs haven't started yet",
c: stateCascade{
searchJob: types.JobStateCompleted,
repoJobs: []types.JobState{types.JobStateQueued},
c: storetest.StateCascade{
SearchJob: types.JobStateCompleted,
RepoJobs: []types.JobState{types.JobStateQueued},
},
want: types.JobStateProcessing,
},
{
name: "no job is processing, some are completed, some are queued",
c: stateCascade{
searchJob: types.JobStateCompleted,
repoJobs: []types.JobState{types.JobStateCompleted},
repoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateQueued},
c: storetest.StateCascade{
SearchJob: types.JobStateCompleted,
RepoJobs: []types.JobState{types.JobStateCompleted},
RepoRevJobs: []types.JobState{types.JobStateCompleted, types.JobStateQueued},
},
want: types.JobStateProcessing,
},
{
name: "search job is queued, but no other job has been created yet",
c: stateCascade{
searchJob: types.JobStateQueued,
c: storetest.StateCascade{
SearchJob: types.JobStateQueued,
},
want: types.JobStateQueued,
},
@ -390,11 +390,11 @@ func TestStore_AggregateStatus(t *testing.T) {
for i, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
userID, err := createUser(bs, fmt.Sprintf("user_%d", i))
userID, err := storetest.CreateUser(bs, fmt.Sprintf("user_%d", i))
require.NoError(t, err)
ctx := actor.WithActor(context.Background(), actor.FromUser(userID))
jobID := createJobCascade(t, ctx, s, tt.c)
jobID := storetest.CreateJobCascade(t, ctx, s, tt.c)
jobs, err := s.ListExhaustiveSearchJobs(ctx, store.ListArgs{})
require.NoError(t, err)
@ -405,78 +405,4 @@ func TestStore_AggregateStatus(t *testing.T) {
}
}
// createJobCascade creates a cascade of jobs (1 search job -> n repo jobs -> m
// repo rev jobs) with states as defined in stateCascade.
//
// This is a fairly large test helper, because don't want to start the worker
// routines, but instead we want to create a snapshot of the state of the jobs
// at a given point in time.
func createJobCascade(
t *testing.T,
ctx context.Context,
stor *store.Store,
casc stateCascade,
) (searchJobID int64) {
t.Helper()
searchJob := types.ExhaustiveSearchJob{
InitiatorID: actor.FromContext(ctx).UID,
Query: "repo:job1",
WorkerJob: types.WorkerJob{State: casc.searchJob},
}
repoJobs := make([]types.ExhaustiveSearchRepoJob, len(casc.repoJobs))
for i, r := range casc.repoJobs {
repoJobs[i] = types.ExhaustiveSearchRepoJob{
WorkerJob: types.WorkerJob{State: r},
RepoID: 1, // same repo for all tests
RefSpec: "HEAD",
}
}
repoRevJobs := make([]types.ExhaustiveSearchRepoRevisionJob, len(casc.repoRevJobs))
for i, rr := range casc.repoRevJobs {
repoRevJobs[i] = types.ExhaustiveSearchRepoRevisionJob{
WorkerJob: types.WorkerJob{State: rr},
Revision: "HEAD",
}
}
jobID, err := stor.CreateExhaustiveSearchJob(ctx, searchJob)
require.NoError(t, err)
assert.NotZero(t, jobID)
err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_jobs SET state = %s WHERE id = %s", casc.searchJob, jobID))
require.NoError(t, err)
for i, r := range repoJobs {
r.SearchJobID = jobID
repoJobID, err := stor.CreateExhaustiveSearchRepoJob(ctx, r)
require.NoError(t, err)
assert.NotZero(t, repoJobID)
err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_jobs SET state = %s WHERE id = %s", casc.repoJobs[i], repoJobID))
require.NoError(t, err)
for j, rr := range repoRevJobs {
rr.SearchRepoJobID = repoJobID
repoRevJobID, err := stor.CreateExhaustiveSearchRepoRevisionJob(ctx, rr)
require.NoError(t, err)
assert.NotZero(t, repoRevJobID)
require.NoError(t, err)
err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_revision_jobs SET state = %s WHERE id = %s", casc.repoRevJobs[j], repoRevJobID))
require.NoError(t, err)
}
}
return jobID
}
type stateCascade struct {
searchJob types.JobState
repoJobs []types.JobState
repoRevJobs []types.JobState
}
func intptr(s int) *int { return &s }

View File

@ -14,6 +14,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -28,9 +29,9 @@ func TestStore_CreateExhaustiveSearchRepoJob(t *testing.T) {
bs := basestore.NewWithHandle(db.Handle())
userID, err := createUser(bs, "alice")
userID, err := storetest.CreateUser(bs, "alice")
require.NoError(t, err)
repoID, err := createRepo(db, "repo-test")
repoID, err := storetest.CreateRepo(db, "repo-test")
require.NoError(t, err)
ctx := actor.WithActor(context.Background(), &actor.Actor{

View File

@ -14,6 +14,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -28,9 +29,9 @@ func TestStore_CreateExhaustiveSearchRepoRevisionJob(t *testing.T) {
bs := basestore.NewWithHandle(db.Handle())
userID, err := createUser(bs, "alice")
userID, err := storetest.CreateUser(bs, "alice")
require.NoError(t, err)
repoID, err := createRepo(db, "repo-test")
repoID, err := storetest.CreateRepo(db, "repo-test")
require.NoError(t, err)
ctx := actor.WithActor(context.Background(), &actor.Actor{

View File

@ -1,25 +0,0 @@
package store_test
import (
"context"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/types"
)
func createUser(store *basestore.Store, username string) (int32, error) {
admin := username == "admin"
q := sqlf.Sprintf(`INSERT INTO users(username, site_admin) VALUES(%s, %s) RETURNING id`, username, admin)
return basestore.ScanAny[int32](store.QueryRow(context.Background(), q))
}
func createRepo(db database.DB, name string) (api.RepoID, error) {
repoStore := db.Repos()
repo := types.Repo{Name: api.RepoName(name)}
err := repoStore.Create(context.Background(), &repo)
return repo.ID, err
}

View File

@ -0,0 +1,20 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "storetest",
srcs = ["store.go"],
importpath = "github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store/storetest",
visibility = ["//:__subpackages__"],
deps = [
"//internal/actor",
"//internal/api",
"//internal/database",
"//internal/database/basestore",
"//internal/search/exhaustive/store",
"//internal/search/exhaustive/types",
"//internal/types",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

View File

@ -0,0 +1,106 @@
package storetest
import (
"context"
"fmt"
"testing"
"github.com/keegancsmith/sqlf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store"
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types"
internaltypes "github.com/sourcegraph/sourcegraph/internal/types"
)
// CreateJobCascade creates a cascade of jobs (1 search job -> n repo jobs -> m
// repo rev jobs) with states as defined in StateCascade.
//
// This is a fairly large test helper, because don't want to start the worker
// routines, but instead we want to create a snapshot of the state of the jobs
// at a given point in time.
func CreateJobCascade(
t *testing.T,
ctx context.Context,
stor *store.Store,
casc StateCascade,
) (searchJobID int64) {
t.Helper()
searchJob := types.ExhaustiveSearchJob{
InitiatorID: actor.FromContext(ctx).UID,
Query: "repo:job1",
WorkerJob: types.WorkerJob{State: casc.SearchJob},
}
repoJobs := make([]types.ExhaustiveSearchRepoJob, len(casc.RepoJobs))
for i, r := range casc.RepoJobs {
repoJobs[i] = types.ExhaustiveSearchRepoJob{
WorkerJob: types.WorkerJob{State: r},
RepoID: 1, // same repo for all tests
RefSpec: "HEAD",
}
}
repoRevJobs := make([]types.ExhaustiveSearchRepoRevisionJob, len(casc.RepoRevJobs))
for i, rr := range casc.RepoRevJobs {
repoRevJobs[i] = types.ExhaustiveSearchRepoRevisionJob{
WorkerJob: types.WorkerJob{State: rr, FailureMessage: fmt.Sprintf("repoRevJob-%d", i)},
Revision: "HEAD",
}
}
jobID, err := stor.CreateExhaustiveSearchJob(ctx, searchJob)
require.NoError(t, err)
assert.NotZero(t, jobID)
err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_jobs SET state = %s WHERE id = %s", casc.SearchJob, jobID))
require.NoError(t, err)
for i, r := range repoJobs {
r.SearchJobID = jobID
repoJobID, err := stor.CreateExhaustiveSearchRepoJob(ctx, r)
require.NoError(t, err)
assert.NotZero(t, repoJobID)
err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_jobs SET state = %s WHERE id = %s", casc.RepoJobs[i], repoJobID))
require.NoError(t, err)
for j, rr := range repoRevJobs {
rr.SearchRepoJobID = repoJobID
repoRevJobID, err := stor.CreateExhaustiveSearchRepoRevisionJob(ctx, rr)
require.NoError(t, err)
assert.NotZero(t, repoRevJobID)
require.NoError(t, err)
err = stor.Exec(ctx, sqlf.Sprintf("UPDATE exhaustive_search_repo_revision_jobs SET state = %s, failure_message = %s WHERE id = %s", casc.RepoRevJobs[j], repoRevJobs[j].FailureMessage, repoRevJobID))
require.NoError(t, err)
}
}
return jobID
}
type StateCascade struct {
SearchJob types.JobState
RepoJobs []types.JobState
RepoRevJobs []types.JobState
}
func CreateRepo(db database.DB, name string) (api.RepoID, error) {
repoStore := db.Repos()
repo := internaltypes.Repo{Name: api.RepoName(name)}
err := repoStore.Create(context.Background(), &repo)
return repo.ID, err
}
func CreateUser(store *basestore.Store, username string) (int32, error) {
admin := username == "admin"
q := sqlf.Sprintf(`INSERT INTO users(username, site_admin) VALUES(%s, %s) RETURNING id`, username, admin)
return basestore.ScanAny[int32](store.QueryRow(context.Background(), q))
}

View File

@ -25,6 +25,10 @@ type ExhaustiveSearchJob struct {
// from ListSearchJobs. This state is different from WorkerJob.State, because it
// reflects the combined state of all jobs created as part of the search job.
AggState JobState
// Set to true by the janitor job if it has processed the job. This is used to
// avoid aggregating the same job multiple times.
IsAggregated bool
}
func (j *ExhaustiveSearchJob) RecordID() int {

View File

@ -34,3 +34,14 @@ const (
// ToGraphQL returns the GraphQL representation of the worker state.
func (s JobState) ToGraphQL() string { return strings.ToUpper(string(s)) }
func (s JobState) String() string { return strings.ToLower(string(s)) }
func (s JobState) IsTerminal() bool {
switch s {
case JobStateCompleted, JobStateFailed, JobStateCanceled:
return true
default:
return false
}
}

View File

@ -0,0 +1 @@
ALTER TABLE exhaustive_search_jobs DROP COLUMN IF EXISTS is_aggregated;

View File

@ -0,0 +1,2 @@
name: exhaustive search jobs add column
parents: [1721814902]

View File

@ -0,0 +1 @@
ALTER TABLE exhaustive_search_jobs ADD COLUMN IF NOT EXISTS is_aggregated boolean NOT NULL DEFAULT false;

View File

@ -2453,7 +2453,8 @@ CREATE TABLE exhaustive_search_jobs (
cancel boolean DEFAULT false NOT NULL,
created_at timestamp with time zone DEFAULT now() NOT NULL,
updated_at timestamp with time zone DEFAULT now() NOT NULL,
queued_at timestamp with time zone DEFAULT now()
queued_at timestamp with time zone DEFAULT now(),
is_aggregated boolean DEFAULT false NOT NULL
);
CREATE SEQUENCE exhaustive_search_jobs_id_seq