Syntactic indexing: add database schema and dbworker (#60055)

Adds a basic schema for syntactic code intel indexes (up and down migration)
    Sets up everything required to use dbworker interface
    Adds a stub indexing worker implementation
    Adds tests to ensure conformance with dbworker interface (run against real postgres)
This commit is contained in:
Anton Sviridov 2024-02-08 18:00:21 +00:00 committed by GitHub
parent 3e9459c3de
commit d5f53378b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 988 additions and 21 deletions

View File

@ -1,16 +1,24 @@
load("//dev:go_defs.bzl", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "shared",
srcs = [
"config.go",
"indexing_worker.go",
"service.go",
"shared.go",
"store.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/syntactic-code-intel-worker/shared",
visibility = ["//visibility:public"],
deps = [
"//internal/codeintel/shared/lsifuploadstore",
"//internal/authz",
"//internal/conf",
"//internal/conf/conftypes",
"//internal/database/basestore",
"//internal/database/connections/live",
"//internal/database/dbutil",
"//internal/debugserver",
"//internal/encryption/keyring",
"//internal/env",
@ -18,7 +26,28 @@ go_library(
"//internal/httpserver",
"//internal/observation",
"//internal/service",
"//internal/workerutil",
"//internal/workerutil/dbworker",
"//internal/workerutil/dbworker/store",
"//lib/errors",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_sourcegraph_log//:log",
],
)
go_test(
name = "shared_test",
srcs = [
"store_helpers_test.go",
"store_test.go",
],
embed = [":shared"],
tags = ["requires-network"],
deps = [
"//internal/database",
"//internal/database/dbtest",
"//internal/observation",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_stretchr_testify//require",
],
)

View File

@ -5,36 +5,39 @@ import (
"strconv"
"time"
"github.com/sourcegraph/sourcegraph/internal/codeintel/shared/lsifuploadstore"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type IndexingWorkerConfig struct {
env.BaseConfig
PollInterval time.Duration
Concurrency int
MaximumRuntimePerJob time.Duration
CliPath string
}
type Config struct {
env.BaseConfig
WorkerPollInterval time.Duration
WorkerConcurrency int
WorkerBudget int64
MaximumRuntimePerJob time.Duration
SCIPUploadStoreConfig *lsifuploadstore.Config
CliPath string
ListenAddress string
IndexingWorkerConfig *IndexingWorkerConfig
ListenAddress string
}
const DefaultPort = 3188
func (c *Config) Load() {
c.SCIPUploadStoreConfig = &lsifuploadstore.Config{}
c.SCIPUploadStoreConfig.Load()
c.WorkerPollInterval = c.GetInterval("SYNTACTIC_CODE_INTEL_WORKER_POLL_INTERVAL", "1s", "Interval between queries to the repository queue")
c.WorkerConcurrency = c.GetInt("SYNTACTIC_CODE_INTEL_WORKER_CONCURRENCY", "1", "The maximum number of repositories that can be processed concurrently.")
c.WorkerBudget = int64(c.GetInt("SYNTACTIC_CODE_INTEL_WORKER_BUDGET", "0", "The amount of compressed input data (in bytes) a worker can process concurrently. Zero acts as an infinite budget."))
c.MaximumRuntimePerJob = c.GetInterval("SYNTACTIC_CODE_INTEL_WORKER_MAXIMUM_RUNTIME_PER_JOB", "25m", "The maximum time a single repository indexing job can take")
func (c *IndexingWorkerConfig) Load() {
c.PollInterval = c.GetInterval("SYNTACTIC_CODE_INTEL_INDEXING_POLL_INTERVAL", "1s", "Interval between queries to the repository queue")
c.Concurrency = c.GetInt("SYNTACTIC_CODE_INTEL_INDEXING_CONCURRENCY", "1", "The maximum number of repositories that can be processed concurrently.")
c.MaximumRuntimePerJob = c.GetInterval("SYNTACTIC_CODE_INTEL_INDEXING_MAXIMUM_RUNTIME_PER_JOB", "5m", "The maximum time a single repository indexing job can take")
c.CliPath = c.Get("SCIP_SYNTAX_PATH", "scip-syntax", "TODO: fill in description")
}
func (c *Config) Load() {
c.IndexingWorkerConfig = &IndexingWorkerConfig{}
c.IndexingWorkerConfig.Load()
c.ListenAddress = c.GetOptional("SYNTACTIC_CODE_INTEL_WORKER_ADDR", "The address under which the syntactic codeintel worker API listens. Can include a port.")
// Fall back to a reasonable default.
if c.ListenAddress == "" {
@ -50,6 +53,6 @@ func (c *Config) Load() {
func (c *Config) Validate() error {
var errs error
errs = errors.Append(errs, c.BaseConfig.Validate())
errs = errors.Append(errs, c.SCIPUploadStoreConfig.Validate())
errs = errors.Append(errs, c.IndexingWorkerConfig.Validate())
return errs
}

View File

@ -0,0 +1,39 @@
package shared
import (
"context"
"github.com/sourcegraph/log"
"time"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
func NewIndexingWorker(ctx context.Context, observationCtx *observation.Context, workerStore dbworkerstore.Store[*SyntacticIndexingJob], config IndexingWorkerConfig) *workerutil.Worker[*SyntacticIndexingJob] {
name := "syntactic_code_intel_indexing_worker"
return dbworker.NewWorker[*SyntacticIndexingJob](ctx, workerStore, &indexingHandler{}, workerutil.WorkerOptions{
Name: name,
Interval: config.PollInterval,
HeartbeatInterval: 10 * time.Second,
Metrics: workerutil.NewMetrics(observationCtx, name),
NumHandlers: config.Concurrency,
MaximumRuntimePerJob: config.MaximumRuntimePerJob,
})
}
type indexingHandler struct{}
var _ workerutil.Handler[*SyntacticIndexingJob] = &indexingHandler{}
func (i indexingHandler) Handle(ctx context.Context, logger log.Logger, record *SyntacticIndexingJob) error {
logger.Info("Stub indexing worker handling record",
log.Int("id", record.ID),
log.String("repository name", record.RepositoryName),
log.String("commit", record.Commit))
return nil
}

View File

@ -23,9 +23,18 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
}
logger.Info("Syntactic code intel worker running",
log.String("path to scip-syntax CLI", config.CliPath),
log.String("path to scip-syntax CLI", config.IndexingWorkerConfig.CliPath),
log.String("API address", config.ListenAddress))
db := mustInitializeDB(observationCtx, "syntactic-code-intel-indexer")
workerStore, err := NewStore(observationCtx, db)
if err != nil {
return errors.Wrap(err, "initializing worker store")
}
indexingWorker := NewIndexingWorker(ctx, observationCtx, workerStore, *config.IndexingWorkerConfig)
// Initialize health server
server := httpserver.NewFromAddr(config.ListenAddress, &http.Server{
ReadTimeout: 75 * time.Second,
@ -34,7 +43,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
})
// Go!
goroutine.MonitorBackgroundRoutines(ctx, server)
goroutine.MonitorBackgroundRoutines(ctx, server, indexingWorker)
return nil
}

View File

@ -0,0 +1,159 @@
package shared
import (
"database/sql"
"strconv"
"time"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/authz"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
connections "github.com/sourcegraph/sourcegraph/internal/database/connections/live"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)
type recordState string
const (
Queued recordState = "queued"
Errored recordState = "errored"
Processing recordState = "processing"
Completed recordState = "completed"
)
// Unless marked otherwise, the columns in this
// record have a special meaning assigned to them by
// the queries dbworker performs. You can read more
// about the different fields and what they do here:
// https://sourcegraph.com/docs/dev/background-information/workers#database-backed-stores
type SyntacticIndexingJob struct {
ID int `json:"id"`
State recordState `json:"state"`
QueuedAt time.Time `json:"queuedAt"`
StartedAt *time.Time `json:"startedAt"`
FinishedAt *time.Time `json:"finishedAt"`
ProcessAfter *time.Time `json:"processAfter"`
NumResets int `json:"numResets"`
NumFailures int `json:"numFailures"`
FailureMessage *string `json:"failureMessage"`
ShouldReindex bool `json:"shouldReindex"`
// The fields below are not part of the standard dbworker fields
// Which commit to index
Commit string `json:"commit"`
// Which repository id to index
RepositoryID int `json:"repositoryId"`
// Name of repository being indexed
RepositoryName string `json:"repositoryName"`
// Which user scheduled this job
EnqueuerUserID int32 `json:"enqueuerUserID"`
}
var _ workerutil.Record = SyntacticIndexingJob{}
func (i SyntacticIndexingJob) RecordID() int {
return i.ID
}
func (i SyntacticIndexingJob) RecordUID() string {
return strconv.Itoa(i.ID)
}
func ScanSyntacticIndexRecord(s dbutil.Scanner) (*SyntacticIndexingJob, error) {
var job SyntacticIndexingJob
if err := scanSyntacticIndexRecord(&job, s); err != nil {
return nil, err
}
return &job, nil
}
func scanSyntacticIndexRecord(job *SyntacticIndexingJob, s dbutil.Scanner) error {
// Make sure this is in sync with columnExpressions below...
if err := s.Scan(
&job.ID,
&job.Commit,
&job.QueuedAt,
&job.State,
&job.FailureMessage,
&job.StartedAt,
&job.FinishedAt,
&job.ProcessAfter,
&job.NumResets,
&job.NumFailures,
&job.RepositoryID,
&job.RepositoryName,
&job.ShouldReindex,
&job.EnqueuerUserID,
); err != nil {
return err
}
return nil
}
func NewStore(observationCtx *observation.Context, db *sql.DB) (dbworkerstore.Store[*SyntacticIndexingJob], error) {
// Make sure this is in sync with the columns of the
// syntactic_scip_indexing_jobs_with_repository_name view
var columnExpressions = []*sqlf.Query{
sqlf.Sprintf("u.id"),
sqlf.Sprintf("u.commit"),
sqlf.Sprintf("u.queued_at"),
sqlf.Sprintf("u.state"),
sqlf.Sprintf("u.failure_message"),
sqlf.Sprintf("u.started_at"),
sqlf.Sprintf("u.finished_at"),
sqlf.Sprintf("u.process_after"),
sqlf.Sprintf("u.num_resets"),
sqlf.Sprintf("u.num_failures"),
sqlf.Sprintf("u.repository_id"),
sqlf.Sprintf("u.repository_name"),
sqlf.Sprintf("u.should_reindex"),
sqlf.Sprintf("u.enqueuer_user_id"),
}
storeOptions := dbworkerstore.Options[*SyntacticIndexingJob]{
Name: "syntactic_scip_indexing_jobs_store",
TableName: "syntactic_scip_indexing_jobs",
ViewName: "syntactic_scip_indexing_jobs_with_repository_name u",
// Using enqueuer_user_id prioritises manually scheduled indexing
OrderByExpression: sqlf.Sprintf("(u.enqueuer_user_id > 0) DESC, u.queued_at, u.id"),
ColumnExpressions: columnExpressions,
Scan: dbworkerstore.BuildWorkerScan(ScanSyntacticIndexRecord),
}
handle := basestore.NewHandleWithDB(observationCtx.Logger, db, sql.TxOptions{})
return dbworkerstore.New(observationCtx, handle, storeOptions), nil
}
func mustInitializeDB(observationCtx *observation.Context, name string) *sql.DB {
// This is an internal service, so we rely on the
// frontend to do authz checks for user requests.
// Authz checks are enforced by the DB layer
//
// This call to SetProviders is here so that calls to GetProviders don't block.
// Relevant PR: https://github.com/sourcegraph/sourcegraph/pull/15755
// Relevant issue: https://github.com/sourcegraph/sourcegraph/issues/15962
authz.SetProviders(true, []authz.Provider{})
dsn := conf.GetServiceConnectionValueAndRestartOnChange(func(serviceConnections conftypes.ServiceConnections) string {
return serviceConnections.PostgresDSN
})
sqlDB, err := connections.EnsureNewFrontendDB(observationCtx, dsn, name)
if err != nil {
log.Scoped("init db ("+name+")").Fatal("Failed to connect to frontend database", log.Error(err))
}
return sqlDB
}

View File

@ -0,0 +1,100 @@
package shared
import (
"context"
"fmt"
"strings"
"testing"
"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/stretchr/testify/require"
)
func insertIndexRecords(t testing.TB, db database.DB, records ...SyntacticIndexingJob) {
for _, index := range records {
if index.Commit == "" {
index.Commit = makeCommit(index.ID)
}
if index.State == "" {
index.State = Completed
}
if index.RepositoryID == 0 {
index.RepositoryID = 50
}
// Ensure we have a repo for the inner join in select queries
insertRepo(t, db, index.RepositoryID, index.RepositoryName)
query := sqlf.Sprintf(`
INSERT INTO syntactic_scip_indexing_jobs (
id,
commit,
queued_at,
state,
failure_message,
started_at,
finished_at,
process_after,
num_resets,
num_failures,
repository_id,
should_reindex,
enqueuer_user_id
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
`,
index.ID,
index.Commit,
index.QueuedAt,
index.State,
index.FailureMessage,
index.StartedAt,
index.FinishedAt,
index.ProcessAfter,
index.NumResets,
index.NumFailures,
index.RepositoryID,
index.ShouldReindex,
index.EnqueuerUserID,
)
_, err := db.ExecContext(context.Background(), query.Query(sqlf.PostgresBindVar), query.Args()...)
require.NoError(t, err, "unexpected error while inserting index")
}
}
func insertRepo(t testing.TB, db database.DB, id int, name string) {
if name == "" {
name = fmt.Sprintf("n-%d", id)
}
deletedAt := sqlf.Sprintf("NULL")
if strings.HasPrefix(name, "DELETED-") {
deletedAt = sqlf.Sprintf("%s", "2024-02-08 15:06:50.973329+00")
}
insertRepoQuery := sqlf.Sprintf(
`INSERT INTO repo (id, name, deleted_at, private) VALUES (%s, %s, %s, %s) ON CONFLICT (id) DO NOTHING`,
id,
name,
deletedAt,
false,
)
_, err := db.ExecContext(context.Background(), insertRepoQuery.Query(sqlf.PostgresBindVar), insertRepoQuery.Args()...)
require.NoError(t, err, "unexpected error while upserting repository")
status := "cloned"
if strings.HasPrefix(name, "DELETED-") {
status = "not_cloned"
}
updateGitserverRepoQuery := sqlf.Sprintf(
`UPDATE gitserver_repos SET clone_status = %s WHERE repo_id = %s`,
status,
id,
)
_, err = db.ExecContext(context.Background(), updateGitserverRepoQuery.Query(sqlf.PostgresBindVar), updateGitserverRepoQuery.Args()...)
require.NoError(t, err, "unexpected error while upserting gitserver repository")
}
func makeCommit(i int) string {
return fmt.Sprintf("%040d", i)
}

View File

@ -0,0 +1,103 @@
package shared
import (
"context"
"testing"
"time"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/stretchr/testify/require"
)
func TestIndexingWorkerStore(t *testing.T) {
/*
The purpose of this test is to verify that the DB schema
we're using for the syntactic code intel work matches
the requirements of dbworker interface,
and that we can dequeue records through this interface.
The schema is sensitive to column names and types, and to the fact
that we are using a Postgres view to query repository name alongside
indexing records,
so it's important that we use the real Postgres in this test to prevent
schema/implementation drift.
*/
observationContext := &observation.TestContext
sqlDB := dbtest.NewDB(t)
db := database.NewDB(observationContext.Logger, sqlDB)
store, err := NewStore(observationContext, sqlDB)
require.NoError(t, err, "unexpected error creating dbworker stores")
ctx := context.Background()
initCount, _ := store.QueuedCount(ctx, true)
require.Equal(t, 0, initCount)
insertIndexRecords(t, db,
// Even though this record is the oldest in the queue,
// it is associated with a deleted repository.
// The view that we use for dequeuing should not return this
// record at all, and the first one should still be the record with ID=1
SyntacticIndexingJob{
ID: 500,
Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead3333",
RepositoryID: 4,
RepositoryName: "DELETED-org/repo",
State: Queued,
QueuedAt: time.Now().Add(time.Second * -100),
},
SyntacticIndexingJob{
ID: 1,
Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead1111",
RepositoryID: 1,
RepositoryName: "tangy/tacos",
State: Queued,
QueuedAt: time.Now().Add(time.Second * -5),
},
SyntacticIndexingJob{
ID: 2,
Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead2222",
RepositoryID: 2,
RepositoryName: "salty/empanadas",
State: Queued,
QueuedAt: time.Now().Add(time.Second * -2),
},
SyntacticIndexingJob{
ID: 3,
Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead3333",
RepositoryID: 3,
RepositoryName: "juicy/mangoes",
State: Processing,
QueuedAt: time.Now().Add(time.Second * -1),
},
)
afterCount, _ := store.QueuedCount(ctx, true)
require.Equal(t, 3, afterCount)
record1, hasRecord, err := store.Dequeue(ctx, "worker1", nil)
require.NoError(t, err)
require.True(t, hasRecord)
require.Equal(t, 1, record1.ID)
require.Equal(t, "tangy/tacos", record1.RepositoryName)
require.Equal(t, "deadbeefdeadbeefdeadbeefdeadbeefdead1111", record1.Commit)
record2, hasRecord, err := store.Dequeue(ctx, "worker2", nil)
require.NoError(t, err)
require.True(t, hasRecord)
require.Equal(t, 2, record2.ID)
require.Equal(t, "salty/empanadas", record2.RepositoryName)
require.Equal(t, "deadbeefdeadbeefdeadbeefdeadbeefdead2222", record2.Commit)
_, hasRecord, err = store.Dequeue(ctx, "worker2", nil)
require.NoError(t, err)
require.False(t, hasRecord)
}

View File

@ -26,7 +26,7 @@ trap ctrl_c INT
function build_scip_syntax {
cd docker-images/syntax-highlighter/crates/scip-syntax
cargo build --bin scip-syntax --target-dir target
cp ./target/release/scip-syntax "$TARGET"
cp ./target/debug/scip-syntax "$TARGET"
}
build_scip_syntax

View File

@ -1234,6 +1234,15 @@
"Increment": 1,
"CycleOption": "NO"
},
{
"Name": "syntactic_scip_indexing_jobs_id_seq",
"TypeName": "bigint",
"StartValue": 1,
"MinimumValue": 1,
"MaximumValue": 9223372036854775807,
"Increment": 1,
"CycleOption": "NO"
},
{
"Name": "teams_id_seq",
"TypeName": "integer",
@ -26410,6 +26419,308 @@
],
"Triggers": []
},
{
"Name": "syntactic_scip_indexing_jobs",
"Comment": "Stores metadata about a code intel syntactic index job.",
"Columns": [
{
"Name": "cancel",
"Index": 16,
"TypeName": "boolean",
"IsNullable": false,
"Default": "false",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "commit",
"Index": 2,
"TypeName": "text",
"IsNullable": false,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": "A 40-char revhash. Note that this commit may not be resolvable in the future."
},
{
"Name": "commit_last_checked_at",
"Index": 13,
"TypeName": "timestamp with time zone",
"IsNullable": true,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "enqueuer_user_id",
"Index": 18,
"TypeName": "integer",
"IsNullable": false,
"Default": "0",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": "ID of the user who scheduled this index. Records with a non-NULL user ID are prioritised over the rest"
},
{
"Name": "execution_logs",
"Index": 12,
"TypeName": "json[]",
"IsNullable": true,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": "An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution."
},
{
"Name": "failure_message",
"Index": 5,
"TypeName": "text",
"IsNullable": true,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "finished_at",
"Index": 7,
"TypeName": "timestamp with time zone",
"IsNullable": true,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "id",
"Index": 1,
"TypeName": "bigint",
"IsNullable": false,
"Default": "nextval('syntactic_scip_indexing_jobs_id_seq'::regclass)",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "last_heartbeat_at",
"Index": 15,
"TypeName": "timestamp with time zone",
"IsNullable": true,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "num_failures",
"Index": 11,
"TypeName": "integer",
"IsNullable": false,
"Default": "0",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "num_resets",
"Index": 10,
"TypeName": "integer",
"IsNullable": false,
"Default": "0",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "process_after",
"Index": 9,
"TypeName": "timestamp with time zone",
"IsNullable": true,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "queued_at",
"Index": 3,
"TypeName": "timestamp with time zone",
"IsNullable": false,
"Default": "now()",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "repository_id",
"Index": 8,
"TypeName": "integer",
"IsNullable": false,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "should_reindex",
"Index": 17,
"TypeName": "boolean",
"IsNullable": false,
"Default": "false",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "started_at",
"Index": 6,
"TypeName": "timestamp with time zone",
"IsNullable": true,
"Default": "",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "state",
"Index": 4,
"TypeName": "text",
"IsNullable": false,
"Default": "'queued'::text",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
},
{
"Name": "worker_hostname",
"Index": 14,
"TypeName": "text",
"IsNullable": false,
"Default": "''::text",
"CharacterMaximumLength": 0,
"IsIdentity": false,
"IdentityGeneration": "",
"IsGenerated": "NEVER",
"GenerationExpression": "",
"Comment": ""
}
],
"Indexes": [
{
"Name": "syntactic_scip_indexing_jobs_pkey",
"IsPrimaryKey": true,
"IsUnique": true,
"IsExclusion": false,
"IsDeferrable": false,
"IndexDefinition": "CREATE UNIQUE INDEX syntactic_scip_indexing_jobs_pkey ON syntactic_scip_indexing_jobs USING btree (id)",
"ConstraintType": "p",
"ConstraintDefinition": "PRIMARY KEY (id)"
},
{
"Name": "syntactic_scip_indexing_jobs_dequeue_order_idx",
"IsPrimaryKey": false,
"IsUnique": false,
"IsExclusion": false,
"IsDeferrable": false,
"IndexDefinition": "CREATE INDEX syntactic_scip_indexing_jobs_dequeue_order_idx ON syntactic_scip_indexing_jobs USING btree ((enqueuer_user_id \u003e 0) DESC, queued_at DESC, id) WHERE state = 'queued'::text OR state = 'errored'::text",
"ConstraintType": "",
"ConstraintDefinition": ""
},
{
"Name": "syntactic_scip_indexing_jobs_queued_at_id",
"IsPrimaryKey": false,
"IsUnique": false,
"IsExclusion": false,
"IsDeferrable": false,
"IndexDefinition": "CREATE INDEX syntactic_scip_indexing_jobs_queued_at_id ON syntactic_scip_indexing_jobs USING btree (queued_at DESC, id)",
"ConstraintType": "",
"ConstraintDefinition": ""
},
{
"Name": "syntactic_scip_indexing_jobs_repository_id_commit",
"IsPrimaryKey": false,
"IsUnique": false,
"IsExclusion": false,
"IsDeferrable": false,
"IndexDefinition": "CREATE INDEX syntactic_scip_indexing_jobs_repository_id_commit ON syntactic_scip_indexing_jobs USING btree (repository_id, commit)",
"ConstraintType": "",
"ConstraintDefinition": ""
},
{
"Name": "syntactic_scip_indexing_jobs_state",
"IsPrimaryKey": false,
"IsUnique": false,
"IsExclusion": false,
"IsDeferrable": false,
"IndexDefinition": "CREATE INDEX syntactic_scip_indexing_jobs_state ON syntactic_scip_indexing_jobs USING btree (state)",
"ConstraintType": "",
"ConstraintDefinition": ""
}
],
"Constraints": [
{
"Name": "syntactic_scip_indexing_jobs_commit_valid_chars",
"ConstraintType": "c",
"RefTableName": "",
"IsDeferrable": false,
"ConstraintDefinition": "CHECK (commit ~ '^[a-f0-9]{40}$'::text)"
}
],
"Triggers": []
},
{
"Name": "team_members",
"Comment": "",
@ -29610,6 +29921,10 @@
"Name": "site_config",
"Definition": " SELECT global_state.site_id,\n global_state.initialized\n FROM global_state;"
},
{
"Name": "syntactic_scip_indexing_jobs_with_repository_name",
"Definition": " SELECT u.id,\n u.commit,\n u.queued_at,\n u.state,\n u.failure_message,\n u.started_at,\n u.finished_at,\n u.repository_id,\n u.process_after,\n u.num_resets,\n u.num_failures,\n u.execution_logs,\n u.should_reindex,\n u.enqueuer_user_id,\n r.name AS repository_name\n FROM (syntactic_scip_indexing_jobs u\n JOIN repo r ON ((r.id = u.repository_id)))\n WHERE (r.deleted_at IS NULL);"
},
{
"Name": "tracking_changeset_specs_and_changesets",
"Definition": " SELECT changeset_specs.id AS changeset_spec_id,\n COALESCE(changesets.id, (0)::bigint) AS changeset_id,\n changeset_specs.repo_id,\n changeset_specs.batch_spec_id,\n repo.name AS repo_name,\n COALESCE((changesets.metadata -\u003e\u003e 'Title'::text), (changesets.metadata -\u003e\u003e 'title'::text)) AS changeset_name,\n changesets.external_state,\n changesets.publication_state,\n changesets.reconciler_state,\n changesets.computed_state\n FROM ((changeset_specs\n LEFT JOIN changesets ON (((changesets.repo_id = changeset_specs.repo_id) AND (changesets.external_id = changeset_specs.external_id))))\n JOIN repo ON ((changeset_specs.repo_id = repo.id)))\n WHERE ((changeset_specs.external_id IS NOT NULL) AND (repo.deleted_at IS NULL));"

View File

@ -4027,6 +4027,47 @@ Foreign-key constraints:
```
# Table "public.syntactic_scip_indexing_jobs"
```
Column | Type | Collation | Nullable | Default
------------------------+--------------------------+-----------+----------+----------------------------------------------------------
id | bigint | | not null | nextval('syntactic_scip_indexing_jobs_id_seq'::regclass)
commit | text | | not null |
queued_at | timestamp with time zone | | not null | now()
state | text | | not null | 'queued'::text
failure_message | text | | |
started_at | timestamp with time zone | | |
finished_at | timestamp with time zone | | |
repository_id | integer | | not null |
process_after | timestamp with time zone | | |
num_resets | integer | | not null | 0
num_failures | integer | | not null | 0
execution_logs | json[] | | |
commit_last_checked_at | timestamp with time zone | | |
worker_hostname | text | | not null | ''::text
last_heartbeat_at | timestamp with time zone | | |
cancel | boolean | | not null | false
should_reindex | boolean | | not null | false
enqueuer_user_id | integer | | not null | 0
Indexes:
"syntactic_scip_indexing_jobs_pkey" PRIMARY KEY, btree (id)
"syntactic_scip_indexing_jobs_dequeue_order_idx" btree ((enqueuer_user_id > 0) DESC, queued_at DESC, id) WHERE state = 'queued'::text OR state = 'errored'::text
"syntactic_scip_indexing_jobs_queued_at_id" btree (queued_at DESC, id)
"syntactic_scip_indexing_jobs_repository_id_commit" btree (repository_id, commit)
"syntactic_scip_indexing_jobs_state" btree (state)
Check constraints:
"syntactic_scip_indexing_jobs_commit_valid_chars" CHECK (commit ~ '^[a-f0-9]{40}$'::text)
```
Stores metadata about a code intel syntactic index job.
**commit**: A 40-char revhash. Note that this commit may not be resolvable in the future.
**enqueuer_user_id**: ID of the user who scheduled this index. Records with a non-NULL user ID are prioritised over the rest
**execution_logs**: An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution.
# Table "public.team_members"
```
Column | Type | Collation | Nullable | Default
@ -4985,6 +5026,31 @@ Foreign-key constraints:
FROM global_state;
```
# View "public.syntactic_scip_indexing_jobs_with_repository_name"
## View query:
```sql
SELECT u.id,
u.commit,
u.queued_at,
u.state,
u.failure_message,
u.started_at,
u.finished_at,
u.repository_id,
u.process_after,
u.num_resets,
u.num_failures,
u.execution_logs,
u.should_reindex,
u.enqueuer_user_id,
r.name AS repository_name
FROM (syntactic_scip_indexing_jobs u
JOIN repo r ON ((r.id = u.repository_id)))
WHERE (r.deleted_at IS NULL);
```
# View "public.tracking_changeset_specs_and_changesets"
## View query:

View File

@ -0,0 +1,4 @@
-- Undo the changes made in the up migration
DROP VIEW IF EXISTS syntactic_scip_indexing_jobs_with_repository_name;
DROP TABLE IF EXISTS syntactic_scip_indexing_jobs;

View File

@ -0,0 +1,2 @@
name: Add syntactic_scip_indexes table
parents: [1702500918]

View File

@ -0,0 +1,62 @@
CREATE TABLE IF NOT EXISTS syntactic_scip_indexing_jobs (
id bigserial NOT NULL PRIMARY KEY,
commit text NOT NULL,
queued_at timestamp with time zone DEFAULT now() NOT NULL,
state text DEFAULT 'queued'::text NOT NULL,
failure_message text,
started_at timestamp with time zone,
finished_at timestamp with time zone,
repository_id integer NOT NULL,
process_after timestamp with time zone,
num_resets integer DEFAULT 0 NOT NULL,
num_failures integer DEFAULT 0 NOT NULL,
execution_logs json[],
commit_last_checked_at timestamp with time zone,
worker_hostname text DEFAULT ''::text NOT NULL,
last_heartbeat_at timestamp with time zone,
cancel boolean DEFAULT false NOT NULL,
should_reindex boolean DEFAULT false NOT NULL,
enqueuer_user_id integer DEFAULT 0 NOT NULL,
CONSTRAINT syntactic_scip_indexing_jobs_commit_valid_chars CHECK ((commit ~ '^[a-f0-9]{40}$'::text))
);
CREATE INDEX IF NOT EXISTS syntactic_scip_indexing_jobs_dequeue_order_idx
ON syntactic_scip_indexing_jobs
USING btree (((enqueuer_user_id > 0)) DESC, queued_at DESC, id)
WHERE ((state = 'queued'::text) OR (state = 'errored'::text));
CREATE INDEX IF NOT EXISTS syntactic_scip_indexing_jobs_queued_at_id ON syntactic_scip_indexing_jobs USING btree (queued_at DESC, id);
CREATE INDEX IF NOT EXISTS syntactic_scip_indexing_jobs_repository_id_commit ON syntactic_scip_indexing_jobs USING btree (repository_id, commit);
CREATE INDEX IF NOT EXISTS syntactic_scip_indexing_jobs_state ON syntactic_scip_indexing_jobs USING btree (state);
COMMENT ON TABLE syntactic_scip_indexing_jobs IS 'Stores metadata about a code intel syntactic index job.';
COMMENT ON COLUMN syntactic_scip_indexing_jobs.commit IS 'A 40-char revhash. Note that this commit may not be resolvable in the future.';
COMMENT ON COLUMN syntactic_scip_indexing_jobs.execution_logs IS 'An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution.';
COMMENT ON COLUMN syntactic_scip_indexing_jobs.enqueuer_user_id IS 'ID of the user who scheduled this index. Records with a non-NULL user ID are prioritised over the rest';
CREATE OR REPLACE VIEW syntactic_scip_indexing_jobs_with_repository_name AS
SELECT u.id,
u.commit,
u.queued_at,
u.state,
u.failure_message,
u.started_at,
u.finished_at,
u.repository_id,
u.process_after,
u.num_resets,
u.num_failures,
u.execution_logs,
u.should_reindex,
u.enqueuer_user_id,
r.name AS repository_name
FROM (syntactic_scip_indexing_jobs u
JOIN repo r ON ((r.id = u.repository_id)))
WHERE (r.deleted_at IS NULL);

View File

@ -4646,6 +4646,65 @@ CREATE SEQUENCE survey_responses_id_seq
ALTER SEQUENCE survey_responses_id_seq OWNED BY survey_responses.id;
CREATE TABLE syntactic_scip_indexing_jobs (
id bigint NOT NULL,
commit text NOT NULL,
queued_at timestamp with time zone DEFAULT now() NOT NULL,
state text DEFAULT 'queued'::text NOT NULL,
failure_message text,
started_at timestamp with time zone,
finished_at timestamp with time zone,
repository_id integer NOT NULL,
process_after timestamp with time zone,
num_resets integer DEFAULT 0 NOT NULL,
num_failures integer DEFAULT 0 NOT NULL,
execution_logs json[],
commit_last_checked_at timestamp with time zone,
worker_hostname text DEFAULT ''::text NOT NULL,
last_heartbeat_at timestamp with time zone,
cancel boolean DEFAULT false NOT NULL,
should_reindex boolean DEFAULT false NOT NULL,
enqueuer_user_id integer DEFAULT 0 NOT NULL,
CONSTRAINT syntactic_scip_indexing_jobs_commit_valid_chars CHECK ((commit ~ '^[a-f0-9]{40}$'::text))
);
COMMENT ON TABLE syntactic_scip_indexing_jobs IS 'Stores metadata about a code intel syntactic index job.';
COMMENT ON COLUMN syntactic_scip_indexing_jobs.commit IS 'A 40-char revhash. Note that this commit may not be resolvable in the future.';
COMMENT ON COLUMN syntactic_scip_indexing_jobs.execution_logs IS 'An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution.';
COMMENT ON COLUMN syntactic_scip_indexing_jobs.enqueuer_user_id IS 'ID of the user who scheduled this index. Records with a non-NULL user ID are prioritised over the rest';
CREATE SEQUENCE syntactic_scip_indexing_jobs_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE syntactic_scip_indexing_jobs_id_seq OWNED BY syntactic_scip_indexing_jobs.id;
CREATE VIEW syntactic_scip_indexing_jobs_with_repository_name AS
SELECT u.id,
u.commit,
u.queued_at,
u.state,
u.failure_message,
u.started_at,
u.finished_at,
u.repository_id,
u.process_after,
u.num_resets,
u.num_failures,
u.execution_logs,
u.should_reindex,
u.enqueuer_user_id,
r.name AS repository_name
FROM (syntactic_scip_indexing_jobs u
JOIN repo r ON ((r.id = u.repository_id)))
WHERE (r.deleted_at IS NULL);
CREATE TABLE team_members (
team_id integer NOT NULL,
user_id integer NOT NULL,
@ -5251,6 +5310,8 @@ ALTER TABLE ONLY settings ALTER COLUMN id SET DEFAULT nextval('settings_id_seq':
ALTER TABLE ONLY survey_responses ALTER COLUMN id SET DEFAULT nextval('survey_responses_id_seq'::regclass);
ALTER TABLE ONLY syntactic_scip_indexing_jobs ALTER COLUMN id SET DEFAULT nextval('syntactic_scip_indexing_jobs_id_seq'::regclass);
ALTER TABLE ONLY teams ALTER COLUMN id SET DEFAULT nextval('teams_id_seq'::regclass);
ALTER TABLE ONLY temporary_settings ALTER COLUMN id SET DEFAULT nextval('temporary_settings_id_seq'::regclass);
@ -5759,6 +5820,9 @@ ALTER TABLE ONLY settings
ALTER TABLE ONLY survey_responses
ADD CONSTRAINT survey_responses_pkey PRIMARY KEY (id);
ALTER TABLE ONLY syntactic_scip_indexing_jobs
ADD CONSTRAINT syntactic_scip_indexing_jobs_pkey PRIMARY KEY (id);
ALTER TABLE ONLY team_members
ADD CONSTRAINT team_members_team_id_user_id_key PRIMARY KEY (team_id, user_id);
@ -6302,6 +6366,14 @@ CREATE UNIQUE INDEX sub_repo_permissions_repo_id_user_id_version_uindex ON sub_r
CREATE INDEX sub_repo_perms_user_id ON sub_repo_permissions USING btree (user_id);
CREATE INDEX syntactic_scip_indexing_jobs_dequeue_order_idx ON syntactic_scip_indexing_jobs USING btree (((enqueuer_user_id > 0)) DESC, queued_at DESC, id) WHERE ((state = 'queued'::text) OR (state = 'errored'::text));
CREATE INDEX syntactic_scip_indexing_jobs_queued_at_id ON syntactic_scip_indexing_jobs USING btree (queued_at DESC, id);
CREATE INDEX syntactic_scip_indexing_jobs_repository_id_commit ON syntactic_scip_indexing_jobs USING btree (repository_id, commit);
CREATE INDEX syntactic_scip_indexing_jobs_state ON syntactic_scip_indexing_jobs USING btree (state);
CREATE UNIQUE INDEX teams_name ON teams USING btree (name);
CREATE UNIQUE INDEX unique_resource_permission ON namespace_permissions USING btree (namespace, resource_id, user_id);

View File

@ -1246,8 +1246,12 @@ commandsets:
- git
commands:
- frontend
- web
- worker
- blobstore
- repo-updater
- gitserver-0
- gitserver-1
- syntactic-code-intel-worker-0
- syntactic-code-intel-worker-1