mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 19:21:50 +00:00
migration: Add additional logging (#31429)
This commit is contained in:
parent
f6bb82cab5
commit
f9ef00ff4b
@ -61,7 +61,7 @@ The `migration_logs` table can also be queried directly. The following query giv
|
||||
WITH ranked_migration_logs AS (
|
||||
SELECT
|
||||
migration_logs.*,
|
||||
ROW_NUMBER() OVER (PARTITION BY schema, version ORDER BY finished_at DESC) AS row_number
|
||||
ROW_NUMBER() OVER (PARTITION BY schema, version ORDER BY started_at DESC) AS row_number
|
||||
FROM migration_logs
|
||||
)
|
||||
SELECT *
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/inconshreveable/log15"
|
||||
"github.com/peterbourgon/ff/v3/ffcli"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/migration/definition"
|
||||
@ -50,6 +51,7 @@ func AddLog(commandName string, factory RunnerFactory, out *output.Output) *ffcl
|
||||
return err
|
||||
}
|
||||
|
||||
log15.Info("Writing new completed migration log", "schema", *schemaNameFlag, "version", version, "up", *upFlag)
|
||||
return store.WithMigrationLog(ctx, definition.Definition{ID: version}, *upFlag, noop)
|
||||
}
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ func (e *dirtySchemaError) Error() string {
|
||||
return (instructionalError{
|
||||
class: "dirty database",
|
||||
description: fmt.Sprintf(
|
||||
"schema %q marked the following migrations as failed: %s\n`",
|
||||
"schema %q marked the following migrations as failed: %s\n",
|
||||
e.schemaName,
|
||||
strings.Join(intsToStrings(extractIDs(e.dirtyVersions)), ", "),
|
||||
),
|
||||
|
||||
@ -2,13 +2,11 @@ package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgconn"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/migration/definition"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/migration/storetypes"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
)
|
||||
|
||||
@ -71,19 +69,54 @@ func (r *Runner) runSchema(ctx context.Context, operation MigrationOperation, sc
|
||||
return err
|
||||
}
|
||||
|
||||
// Filter out any unlisted migrations (most likely future upgrades) and group them by status.
|
||||
byState := groupByState(schemaContext.initialSchemaVersion, definitions)
|
||||
|
||||
logger.Info(
|
||||
"Checked current schema state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
"appliedVersions", extractIDs(byState.applied),
|
||||
"pendingVersions", extractIDs(byState.pending),
|
||||
"failedVersions", extractIDs(byState.failed),
|
||||
)
|
||||
|
||||
// Before we commit to performing an upgrade (which takes locks), determine if there is anything to do
|
||||
// and early out if not. We'll no-op if there are no definitions with pending or failed attempts, and
|
||||
// all migrations are applied (when migrating up) or unapplied (when migrating down).
|
||||
if byState := groupByState(schemaContext.initialSchemaVersion, definitions); len(byState.pending)+len(byState.failed) == 0 {
|
||||
|
||||
if len(byState.pending)+len(byState.failed) == 0 {
|
||||
if operation.Type == MigrationOperationTypeTargetedUp && len(byState.applied) == len(definitions) {
|
||||
logger.Info(
|
||||
"Schema is in the expected state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if operation.Type == MigrationOperationTypeTargetedDown && len(byState.applied) == 0 {
|
||||
logger.Info(
|
||||
"Schema is in the expected state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
logger.Warn(
|
||||
"Schema not in expected state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
"appliedVersions", extractIDs(byState.applied),
|
||||
"pendingVersions", extractIDs(byState.pending),
|
||||
"failedVersions", extractIDs(byState.failed),
|
||||
"targetDefinitions", extractIDs(definitions),
|
||||
)
|
||||
logger.Info(
|
||||
"Checking for active migrations",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
|
||||
for {
|
||||
// Attempt to apply as many migrations as possible. We do this iteratively in chunks as we are unable
|
||||
// to hold a consistent advisory lock in the presence of migrations utilizing concurrent index creation.
|
||||
@ -97,6 +130,11 @@ func (r *Runner) runSchema(ctx context.Context, operation MigrationOperation, sc
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info(
|
||||
"Schema is in the expected state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -158,7 +196,7 @@ func (r *Runner) applyMigrations(
|
||||
} else if retry {
|
||||
// There are active index creation operations ongoing; wait a short time before requerying
|
||||
// the state of the migrations so we don't flood the database with constant queries to the
|
||||
// system catalog. We check here instead of in the caller because we dont' want a delay when
|
||||
// system catalog. We check here instead of in the caller because we don't want a delay when
|
||||
// we drop the lock to create an index concurrently (returning `droppedLock = true` below).
|
||||
return true, wait(ctx, indexPollInterval)
|
||||
}
|
||||
@ -197,7 +235,7 @@ func (r *Runner) applyMigration(
|
||||
return nil
|
||||
}
|
||||
|
||||
const indexPollInterval = time.Second
|
||||
const indexPollInterval = time.Second * 5
|
||||
|
||||
// createIndexConcurrently deals with the special case of `CREATE INDEX CONCURRENTLY` migrations. We cannot
|
||||
// hold an advisory lock during concurrent index creation without trivially deadlocking concurrent migrator
|
||||
@ -219,30 +257,17 @@ func (r *Runner) createIndexConcurrently(
|
||||
pollIndexStatusLoop:
|
||||
for {
|
||||
// Query the current status of the target index
|
||||
status, exists, err := schemaContext.store.IndexStatus(ctx, tableName, indexName)
|
||||
indexStatus, exists, err := getAndLogIndexStatus(ctx, schemaContext, tableName, indexName)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "failed to query state of index")
|
||||
}
|
||||
|
||||
logger.Info(
|
||||
"Checked progress of index creation",
|
||||
append(
|
||||
[]interface{}{
|
||||
"tableName", tableName,
|
||||
"indexName", indexName,
|
||||
"exists", exists,
|
||||
"isValid", status.IsValid,
|
||||
},
|
||||
renderIndexStatus(status)...,
|
||||
)...,
|
||||
)
|
||||
|
||||
if exists && status.IsValid {
|
||||
if exists && indexStatus.IsValid {
|
||||
// Index exists and is valid; nothing to do
|
||||
return unlocked, nil
|
||||
}
|
||||
|
||||
if exists && status.Phase == nil {
|
||||
if exists && indexStatus.Phase == nil {
|
||||
// Index is invalid but no creation operation is in-progress. We can try to repair this
|
||||
// state automatically by dropping the index and re-create it as if it never existed.
|
||||
// Assuming that the down migration drops the index created in the up direction, we'll
|
||||
@ -280,14 +305,12 @@ pollIndexStatusLoop:
|
||||
|
||||
// Index is currently being created. Wait a small time and check the index status again. We don't
|
||||
// want to take any action here while the other proceses is working.
|
||||
if exists && status.Phase != nil {
|
||||
select {
|
||||
case <-time.After(indexPollInterval):
|
||||
continue pollIndexStatusLoop
|
||||
|
||||
case <-ctx.Done():
|
||||
return unlocked, ctx.Err()
|
||||
if exists && indexStatus.Phase != nil {
|
||||
if err := wait(ctx, indexPollInterval); err != nil {
|
||||
return true, err
|
||||
}
|
||||
|
||||
continue pollIndexStatusLoop
|
||||
}
|
||||
|
||||
// Create the index. Ignore duplicate table/index already exists errors. This can happen if there
|
||||
@ -323,6 +346,21 @@ pollIndexStatusLoop:
|
||||
)
|
||||
|
||||
createIndex := func() error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
if err := wait(ctx, indexPollInterval); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if _, _, err := getAndLogIndexStatus(ctx, schemaContext, tableName, indexName); err != nil {
|
||||
logger.Error("Failed to retrieve index status", "error", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return errorFilter(schemaContext.store.Up(ctx, definition))
|
||||
}
|
||||
if err := schemaContext.store.WithMigrationLog(ctx, definition, true, createIndex); err != nil {
|
||||
@ -331,7 +369,7 @@ pollIndexStatusLoop:
|
||||
continue
|
||||
}
|
||||
|
||||
return unlocked, nil
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -363,31 +401,3 @@ func filterAppliedDefinitions(
|
||||
|
||||
return filtered
|
||||
}
|
||||
|
||||
// renderIndexStatus returns a slice of interface pairs describing the given index status for use in a
|
||||
// call to logger. If the index is currently being created, the progress of the create operation will be
|
||||
// summarized.
|
||||
func renderIndexStatus(progress storetypes.IndexStatus) (logPairs []interface{}) {
|
||||
if progress.Phase == nil {
|
||||
return []interface{}{
|
||||
"in-progress", false,
|
||||
}
|
||||
}
|
||||
|
||||
index := -1
|
||||
for i, phase := range storetypes.CreateIndexConcurrentlyPhases {
|
||||
if phase == *progress.Phase {
|
||||
index = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return []interface{}{
|
||||
"in-progress", true,
|
||||
"phase", *progress.Phase,
|
||||
"phases", fmt.Sprintf("%d of %d", index, len(storetypes.CreateIndexConcurrentlyPhases)),
|
||||
"lockers", fmt.Sprintf("%d of %d", progress.LockersDone, progress.LockersTotal),
|
||||
"blocks", fmt.Sprintf("%d of %d", progress.BlocksDone, progress.BlocksTotal),
|
||||
"tuples", fmt.Sprintf("%d of %d", progress.TuplesDone, progress.TuplesTotal),
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,11 +2,13 @@ package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/migration/definition"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/migration/storetypes"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
)
|
||||
|
||||
@ -168,14 +170,6 @@ func (r *Runner) fetchVersion(ctx context.Context, schemaName string, store Stor
|
||||
return schemaVersion{}, err
|
||||
}
|
||||
|
||||
logger.Info(
|
||||
"Checked current version",
|
||||
"schema", schemaName,
|
||||
"appliedVersions", appliedVersions,
|
||||
"pendingVersions", pendingVersions,
|
||||
"failedVersions", failedVersions,
|
||||
)
|
||||
|
||||
return schemaVersion{
|
||||
appliedVersions,
|
||||
pendingVersions,
|
||||
@ -183,28 +177,6 @@ func (r *Runner) fetchVersion(ctx context.Context, schemaName string, store Stor
|
||||
}, nil
|
||||
}
|
||||
|
||||
const lockPollInterval = time.Second
|
||||
|
||||
// pollLock will attempt to acquire a session-level advisory lock while the given context has not
|
||||
// been canceled. The caller must eventually invoke the unlock function on successful acquisition
|
||||
// of the lock.
|
||||
func (r *Runner) pollLock(ctx context.Context, store Store) (unlock func(err error) error, _ error) {
|
||||
for {
|
||||
if acquired, unlock, err := store.TryLock(ctx); err != nil {
|
||||
return nil, err
|
||||
} else if acquired {
|
||||
return unlock, nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(lockPollInterval):
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type lockedVersionCallback func(
|
||||
schemaVersion schemaVersion,
|
||||
byState definitionsByState,
|
||||
@ -227,7 +199,7 @@ func (r *Runner) withLockedSchemaState(
|
||||
// Take an advisory lock to determine if there are any migrator instances currently
|
||||
// running queries unrelated to non-concurrent index creation. This will block until
|
||||
// we are able to gain the lock.
|
||||
unlock, err := r.pollLock(ctx, schemaContext.store)
|
||||
unlock, err := r.pollLock(ctx, schemaContext)
|
||||
if err != nil {
|
||||
return false, err
|
||||
} else {
|
||||
@ -240,14 +212,24 @@ func (r *Runner) withLockedSchemaState(
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Filter out any unlisted migrations (most likely future upgrades) and group them by status.
|
||||
byState := groupByState(schemaVersion, definitions)
|
||||
|
||||
logger.Info(
|
||||
"Checked current schema state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
"appliedVersions", extractIDs(byState.applied),
|
||||
"pendingVersions", extractIDs(byState.pending),
|
||||
"failedVersions", extractIDs(byState.failed),
|
||||
)
|
||||
|
||||
// Detect failed migrations, and determine if we need to wait longer for concurrent migrator
|
||||
// instances to finish their current work.
|
||||
if retry, err := validateSchemaState(ctx, schemaContext, byState); err != nil {
|
||||
return false, err
|
||||
} else if retry {
|
||||
// An index is currently being created. WE return true here to flag to the caller that
|
||||
// An index is currently being created. We return true here to flag to the caller that
|
||||
// we should wait a small time, then be re-invoked. We don't want to take any action
|
||||
// here while the other proceses is working.
|
||||
return true, nil
|
||||
@ -256,3 +238,215 @@ func (r *Runner) withLockedSchemaState(
|
||||
// Invoke the callback with the current schema state
|
||||
return false, f(schemaVersion, byState, unlock)
|
||||
}
|
||||
|
||||
const lockPollInterval = time.Second
|
||||
const lockPollLogRatio = 5
|
||||
|
||||
// pollLock will attempt to acquire a session-level advisory lock while the given context has not
|
||||
// been canceled. The caller must eventually invoke the unlock function on successful acquisition
|
||||
// of the lock.
|
||||
func (r *Runner) pollLock(ctx context.Context, schemaContext schemaContext) (unlock func(err error) error, _ error) {
|
||||
numWaits := 0
|
||||
|
||||
for {
|
||||
if acquired, unlock, err := schemaContext.store.TryLock(ctx); err != nil {
|
||||
return nil, err
|
||||
} else if acquired {
|
||||
logger.Info(
|
||||
"Acquired schema migration lock",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
|
||||
var logOnce sync.Once
|
||||
|
||||
loggedUnlock := func(err error) error {
|
||||
logOnce.Do(func() {
|
||||
logger.Info(
|
||||
"Released schema migration lock",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
})
|
||||
|
||||
return unlock(err)
|
||||
}
|
||||
|
||||
return loggedUnlock, nil
|
||||
}
|
||||
|
||||
if numWaits%lockPollLogRatio == 0 {
|
||||
logger.Info(
|
||||
"Schema migration lock is currently held - will re-attempt to acquire lock",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
}
|
||||
|
||||
if err := wait(ctx, lockPollInterval); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
numWaits++
|
||||
}
|
||||
}
|
||||
|
||||
type definitionsByState struct {
|
||||
applied []definition.Definition
|
||||
pending []definition.Definition
|
||||
failed []definition.Definition
|
||||
}
|
||||
|
||||
// groupByState returns the the given definitions grouped by their status (applied, pending, failed) as
|
||||
// indicated by the current schema.
|
||||
func groupByState(schemaVersion schemaVersion, definitions []definition.Definition) definitionsByState {
|
||||
appliedVersionsMap := intSet(schemaVersion.appliedVersions)
|
||||
failedVersionsMap := intSet(schemaVersion.failedVersions)
|
||||
pendingVersionsMap := intSet(schemaVersion.pendingVersions)
|
||||
|
||||
states := definitionsByState{}
|
||||
for _, definition := range definitions {
|
||||
if _, ok := appliedVersionsMap[definition.ID]; ok {
|
||||
states.applied = append(states.applied, definition)
|
||||
}
|
||||
if _, ok := pendingVersionsMap[definition.ID]; ok {
|
||||
states.pending = append(states.pending, definition)
|
||||
}
|
||||
if _, ok := failedVersionsMap[definition.ID]; ok {
|
||||
states.failed = append(states.failed, definition)
|
||||
}
|
||||
}
|
||||
|
||||
return states
|
||||
}
|
||||
|
||||
// validateSchemaState inspects the given definitions grouped by state and determines if the schema
|
||||
// state should be re-queried (when `retry` is true). This function returns an error if the database
|
||||
// is in a dirty state (contains failed migrations or pending migrations without a backing query).
|
||||
func validateSchemaState(
|
||||
ctx context.Context,
|
||||
schemaContext schemaContext,
|
||||
byState definitionsByState,
|
||||
) (retry bool, _ error) {
|
||||
if len(byState.failed) > 0 {
|
||||
// Explicit failures require administrator intervention
|
||||
return false, newDirtySchemaError(schemaContext.schema.Name, byState.failed)
|
||||
}
|
||||
|
||||
if len(byState.pending) > 0 {
|
||||
// We are currently holding the lock, so any migrations that are "pending" are either
|
||||
// dead and the migrator instance has died before finishing the operation, or they're
|
||||
// active concurrent index creation operations. We'll partition this set into those two
|
||||
// groups and determine what to do.
|
||||
if pendingDefinitions, failedDefinitions, err := partitionPendingMigrations(ctx, schemaContext, byState.pending); err != nil {
|
||||
return false, err
|
||||
} else if len(failedDefinitions) > 0 {
|
||||
// Explicit failures require administrator intervention
|
||||
return false, newDirtySchemaError(schemaContext.schema.Name, failedDefinitions)
|
||||
} else if len(pendingDefinitions) > 0 {
|
||||
for _, definitionWithStatus := range pendingDefinitions {
|
||||
logIndexStatus(
|
||||
schemaContext,
|
||||
definitionWithStatus.definition.IndexMetadata.TableName,
|
||||
definitionWithStatus.definition.IndexMetadata.IndexName,
|
||||
definitionWithStatus.indexStatus,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
type definitionWithStatus struct {
|
||||
definition definition.Definition
|
||||
indexStatus storetypes.IndexStatus
|
||||
}
|
||||
|
||||
// partitionPendingMigrations partitions the given migrations into two sets: the set of pending
|
||||
// migration definitions, which includes migrations with visible and active create index operation
|
||||
// running in the database, and the set of filed migration definitions, which includes migrations
|
||||
// which are marked as pending but do not appear as active.
|
||||
//
|
||||
// This function assumes that the migration advisory lock is held.
|
||||
func partitionPendingMigrations(
|
||||
ctx context.Context,
|
||||
schemaContext schemaContext,
|
||||
definitions []definition.Definition,
|
||||
) (pendingDefinitions []definitionWithStatus, failedDefinitions []definition.Definition, _ error) {
|
||||
for _, definition := range definitions {
|
||||
if definition.IsCreateIndexConcurrently {
|
||||
tableName := definition.IndexMetadata.TableName
|
||||
indexName := definition.IndexMetadata.IndexName
|
||||
|
||||
if indexStatus, ok, err := schemaContext.store.IndexStatus(ctx, tableName, indexName); err != nil {
|
||||
return nil, nil, err
|
||||
} else if ok && indexStatus.Phase != nil {
|
||||
pendingDefinitions = append(pendingDefinitions, definitionWithStatus{definition, indexStatus})
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
failedDefinitions = append(failedDefinitions, definition)
|
||||
}
|
||||
|
||||
return pendingDefinitions, failedDefinitions, nil
|
||||
}
|
||||
|
||||
// getAndLogIndexStatus calls IndexStatus on the given store and returns the results. The result
|
||||
// is logged to the package-level logger.
|
||||
func getAndLogIndexStatus(ctx context.Context, schemaContext schemaContext, tableName, indexName string) (storetypes.IndexStatus, bool, error) {
|
||||
indexStatus, exists, err := schemaContext.store.IndexStatus(ctx, tableName, indexName)
|
||||
if err != nil {
|
||||
return storetypes.IndexStatus{}, false, errors.Wrap(err, "failed to query state of index")
|
||||
}
|
||||
|
||||
logIndexStatus(schemaContext, tableName, indexName, indexStatus, exists)
|
||||
return indexStatus, exists, nil
|
||||
}
|
||||
|
||||
// logIndexStatus logs the result of IndexStatus to the package-level logger.
|
||||
func logIndexStatus(schemaContext schemaContext, tableName, indexName string, indexStatus storetypes.IndexStatus, exists bool) {
|
||||
logger.Info(
|
||||
"Checked progress of index creation",
|
||||
append(
|
||||
[]interface{}{
|
||||
"schema", schemaContext.schema.Name,
|
||||
"tableName", tableName,
|
||||
"indexName", indexName,
|
||||
"exists", exists,
|
||||
"isValid", indexStatus.IsValid,
|
||||
},
|
||||
renderIndexStatus(indexStatus)...,
|
||||
)...,
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
// renderIndexStatus returns a slice of interface pairs describing the given index status for use in a
|
||||
// call to logger. If the index is currently being created, the progress of the create operation will be
|
||||
// summarized.
|
||||
func renderIndexStatus(progress storetypes.IndexStatus) (logPairs []interface{}) {
|
||||
if progress.Phase == nil {
|
||||
return []interface{}{
|
||||
"in-progress", false,
|
||||
}
|
||||
}
|
||||
|
||||
index := -1
|
||||
for i, phase := range storetypes.CreateIndexConcurrentlyPhases {
|
||||
if phase == *progress.Phase {
|
||||
index = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return []interface{}{
|
||||
"in-progress", true,
|
||||
"phase", *progress.Phase,
|
||||
"phases", fmt.Sprintf("%d of %d", index, len(storetypes.CreateIndexConcurrentlyPhases)),
|
||||
"lockers", fmt.Sprintf("%d of %d", progress.LockersDone, progress.LockersTotal),
|
||||
"blocks", fmt.Sprintf("%d of %d", progress.BlocksDone, progress.BlocksTotal),
|
||||
"tuples", fmt.Sprintf("%d of %d", progress.TuplesDone, progress.TuplesTotal),
|
||||
}
|
||||
}
|
||||
|
||||
@ -8,96 +8,6 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/internal/database/migration/definition"
|
||||
)
|
||||
|
||||
type definitionsByState struct {
|
||||
applied []definition.Definition
|
||||
pending []definition.Definition
|
||||
failed []definition.Definition
|
||||
}
|
||||
|
||||
// groupByState returns the the given definitions grouped by their status (applied, pending, failed) as
|
||||
// indicated by the current schema.
|
||||
func groupByState(schemaVersion schemaVersion, definitions []definition.Definition) definitionsByState {
|
||||
appliedVersionsMap := intSet(schemaVersion.appliedVersions)
|
||||
failedVersionsMap := intSet(schemaVersion.failedVersions)
|
||||
pendingVersionsMap := intSet(schemaVersion.pendingVersions)
|
||||
|
||||
states := definitionsByState{}
|
||||
for _, definition := range definitions {
|
||||
if _, ok := appliedVersionsMap[definition.ID]; ok {
|
||||
states.applied = append(states.applied, definition)
|
||||
}
|
||||
if _, ok := pendingVersionsMap[definition.ID]; ok {
|
||||
states.pending = append(states.pending, definition)
|
||||
}
|
||||
if _, ok := failedVersionsMap[definition.ID]; ok {
|
||||
states.failed = append(states.failed, definition)
|
||||
}
|
||||
}
|
||||
|
||||
return states
|
||||
}
|
||||
|
||||
// validateSchemaState inspects the given definitions grouped by state and determines if the schema
|
||||
// state should be re-queried (when `retry` is true). This function returns an error if the database
|
||||
// is in a dirty state (contains failed migrations or pending migrations without a backing query).
|
||||
func validateSchemaState(
|
||||
ctx context.Context,
|
||||
schemaContext schemaContext,
|
||||
byState definitionsByState,
|
||||
) (retry bool, _ error) {
|
||||
if len(byState.failed) > 0 {
|
||||
// Explicit failures require administrator intervention
|
||||
return false, newDirtySchemaError(schemaContext.schema.Name, byState.failed)
|
||||
}
|
||||
|
||||
if len(byState.pending) > 0 {
|
||||
// We are currently holding the lock, so any migrations that are "pending" are either
|
||||
// dead and the migrator instance has died before finishing the operation, or they're
|
||||
// active concurrent index creation operations. We'll partition this set into those two
|
||||
// groups and determine what to do.
|
||||
if pendingDefinitions, failedDefinitions, err := partitionPendingMigrations(ctx, schemaContext, byState.pending); err != nil {
|
||||
return false, err
|
||||
} else if len(failedDefinitions) > 0 {
|
||||
// Explicit failures require administrator intervention
|
||||
return false, newDirtySchemaError(schemaContext.schema.Name, failedDefinitions)
|
||||
} else if len(pendingDefinitions) > 0 {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// partitionPendingMigrations partitions the given migrations into two sets: the set of pending
|
||||
// migration definitions, which includes migrations with visible and active create index operation
|
||||
// running in the database, and the set of filed migration definitions, which includes migrations
|
||||
// which are marked as pending but do not appear as active.
|
||||
//
|
||||
// This function assumes that the migration advisory lock is held.
|
||||
func partitionPendingMigrations(
|
||||
ctx context.Context,
|
||||
schemaContext schemaContext,
|
||||
definitions []definition.Definition,
|
||||
) (pendingDefinitions, failedDefinitions []definition.Definition, _ error) {
|
||||
for _, definition := range definitions {
|
||||
if definition.IsCreateIndexConcurrently {
|
||||
tableName := definition.IndexMetadata.TableName
|
||||
indexName := definition.IndexMetadata.IndexName
|
||||
|
||||
if status, ok, err := schemaContext.store.IndexStatus(ctx, tableName, indexName); err != nil {
|
||||
return nil, nil, err
|
||||
} else if ok && status.Phase != nil {
|
||||
pendingDefinitions = append(pendingDefinitions, definition)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
failedDefinitions = append(failedDefinitions, definition)
|
||||
}
|
||||
|
||||
return pendingDefinitions, failedDefinitions, nil
|
||||
}
|
||||
|
||||
func extractIDs(definitions []definition.Definition) []int {
|
||||
ids := make([]int, 0, len(definitions))
|
||||
for _, definition := range definitions {
|
||||
|
||||
@ -16,15 +16,50 @@ func (r *Runner) Validate(ctx context.Context, schemaNames ...string) error {
|
||||
// expected by the given schema context. This method will block if there are relevant migrations
|
||||
// in progress.
|
||||
func (r *Runner) validateSchema(ctx context.Context, schemaContext schemaContext) error {
|
||||
definitions := schemaContext.schema.Definitions.All()
|
||||
// Get the set of migrations that need to be applied.
|
||||
definitions, err := schemaContext.schema.Definitions.Up(
|
||||
schemaContext.initialSchemaVersion.appliedVersions,
|
||||
extractIDs(schemaContext.schema.Definitions.Leaves()),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Filter out any unlisted migrations (most likely future upgrades) and group them by status.
|
||||
byState := groupByState(schemaContext.initialSchemaVersion, definitions)
|
||||
|
||||
logger.Info(
|
||||
"Checked current schema state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
"appliedVersions", extractIDs(byState.applied),
|
||||
"pendingVersions", extractIDs(byState.pending),
|
||||
"failedVersions", extractIDs(byState.failed),
|
||||
)
|
||||
|
||||
// Quickly determine with our initial schema version if we are up to date. If so, we won't need
|
||||
// to take an advisory lock and poll index creation status below.
|
||||
byState := groupByState(schemaContext.initialSchemaVersion, definitions)
|
||||
if len(byState.pending) == 0 && len(byState.failed) == 0 && len(byState.applied) == len(definitions) {
|
||||
logger.Info(
|
||||
"Schema is in the expected state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Warn(
|
||||
"Schema not in expected state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
"appliedVersions", extractIDs(byState.applied),
|
||||
"pendingVersions", extractIDs(byState.pending),
|
||||
"failedVersions", extractIDs(byState.failed),
|
||||
"targetDefinitions", extractIDs(definitions),
|
||||
)
|
||||
logger.Info(
|
||||
"Checking for active migrations",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
|
||||
for {
|
||||
// Attempt to validate the given definitions. We may have to call this several times as
|
||||
// we are unable to hold a consistent advisory lock in the presence of migrations utilizing
|
||||
@ -40,11 +75,17 @@ func (r *Runner) validateSchema(ctx context.Context, schemaContext schemaContext
|
||||
// There are active index creation operations ongoing; wait a short time before requerying
|
||||
// the state of the migrations so we don't flood the database with constant queries to the
|
||||
// system catalog.
|
||||
|
||||
if err := wait(ctx, indexPollInterval); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info(
|
||||
"Schema is in the expected state",
|
||||
"schema", schemaContext.schema.Name,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -187,7 +187,7 @@ const versionsQuery = `
|
||||
WITH ranked_migration_logs AS (
|
||||
SELECT
|
||||
migration_logs.*,
|
||||
ROW_NUMBER() OVER (PARTITION BY version ORDER BY finished_at DESC) AS row_number
|
||||
ROW_NUMBER() OVER (PARTITION BY version ORDER BY started_at DESC) AS row_number
|
||||
FROM migration_logs
|
||||
WHERE schema = %s
|
||||
)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user