From 591d702d290c5a998bc08c4e4e66c8914407d542 Mon Sep 17 00:00:00 2001 From: Noah S-C Date: Fri, 9 Jun 2023 18:51:56 +0100 Subject: [PATCH] migrator: extract non-cli specifics from cli package (#53247) YOINK ## Test plan Tested migrator with an mvu (`upgrade` from 4.5.0 to 5.0.5) and standard (`up` from 4.5.0 to 5.0.5) --- cmd/frontend/graphqlbackend/BUILD.bazel | 3 + cmd/frontend/graphqlbackend/site.go | 21 +- cmd/frontend/internal/cli/serve_cmd.go | 2 +- cmd/migrator/shared/BUILD.bazel | 18 +- cmd/migrator/shared/main.go | 50 +-- cmd/migrator/shared/registration.go | 43 -- dev/sg/sg_migration.go | 13 +- enterprise/cmd/frontend/BUILD.bazel | 1 + enterprise/cmd/frontend/main.go | 6 + .../cmd/sourcegraph/enterprisecmd/BUILD.bazel | 1 - .../enterprisecmd/enterprisecmd.go | 6 - internal/database/migration/BUILD.bazel | 17 + .../database/migration/cliutil/BUILD.bazel | 9 +- .../database/migration/cliutil/downgrade.go | 24 +- internal/database/migration/cliutil/drift.go | 140 +------ internal/database/migration/cliutil/iface.go | 15 +- .../migration/cliutil/multiversion.go | 376 +----------------- .../migration/cliutil/run_oobmigrations.go | 184 +-------- internal/database/migration/cliutil/up.go | 3 +- .../database/migration/cliutil/upgrade.go | 17 +- internal/database/migration/cliutil/util.go | 56 +-- .../database/migration/cliutil/validate.go | 3 +- internal/database/migration/drift/BUILD.bazel | 3 + .../{cliutil/drift_util.go => drift/util.go} | 8 +- .../migration/multiversion/BUILD.bazel | 28 ++ .../database/migration/multiversion/drift.go | 221 ++++++++++ .../database/migration/multiversion/plan.go | 114 ++++++ .../database/migration/multiversion/run.go | 257 ++++++++++++ .../database/migration/multiversion/util.go | 24 ++ internal/database/migration/runner.go | 44 ++ internal/database/migration/runner/runner.go | 2 + .../database/migration/schemas/BUILD.bazel | 1 + .../{cliutil => schemas}/drift_schema.go | 39 +- .../database/migration/schemas/schemas.go | 14 + internal/database/migration/store/BUILD.bazel | 11 + .../database/migration/store/extractor.go | 53 +++ .../database/migration/store/registration.go | 41 +- internal/oobmigration/BUILD.bazel | 2 + internal/oobmigration/migrations/register.go | 2 +- internal/oobmigration/progress.go | 52 +++ internal/oobmigration/validate.go | 2 +- internal/version/upgradestore/store.go | 16 +- internal/version/upgradestore/store_test.go | 42 +- lib/output/BUILD.bazel | 2 + lib/output/emoji.go | 15 + lib/output/logger.go | 31 ++ 46 files changed, 1091 insertions(+), 941 deletions(-) delete mode 100644 cmd/migrator/shared/registration.go create mode 100644 internal/database/migration/BUILD.bazel rename internal/database/migration/{cliutil/drift_util.go => drift/util.go} (88%) create mode 100644 internal/database/migration/multiversion/BUILD.bazel create mode 100644 internal/database/migration/multiversion/drift.go create mode 100644 internal/database/migration/multiversion/plan.go create mode 100644 internal/database/migration/multiversion/run.go create mode 100644 internal/database/migration/multiversion/util.go create mode 100644 internal/database/migration/runner.go rename internal/database/migration/{cliutil => schemas}/drift_schema.go (77%) create mode 100644 internal/database/migration/store/extractor.go rename cmd/migrator/shared/conf.go => internal/database/migration/store/registration.go (59%) create mode 100644 internal/oobmigration/progress.go create mode 100644 lib/output/logger.go diff --git a/cmd/frontend/graphqlbackend/BUILD.bazel b/cmd/frontend/graphqlbackend/BUILD.bazel index 168766f0215..51764a7efd7 100644 --- a/cmd/frontend/graphqlbackend/BUILD.bazel +++ b/cmd/frontend/graphqlbackend/BUILD.bazel @@ -255,8 +255,11 @@ go_library( "//internal/conf/deploy", "//internal/conf/reposource", "//internal/database", + "//internal/database/migration", "//internal/database/migration/cliutil", "//internal/database/migration/drift", + "//internal/database/migration/multiversion", + "//internal/database/migration/runner", "//internal/database/migration/schemas", "//internal/deviceid", "//internal/encryption", diff --git a/cmd/frontend/graphqlbackend/site.go b/cmd/frontend/graphqlbackend/site.go index a72ffa75ddb..58979eccd73 100644 --- a/cmd/frontend/graphqlbackend/site.go +++ b/cmd/frontend/graphqlbackend/site.go @@ -26,8 +26,11 @@ import ( "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/conf/deploy" "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/migration" "github.com/sourcegraph/sourcegraph/internal/database/migration/cliutil" "github.com/sourcegraph/sourcegraph/internal/database/migration/drift" + "github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion" + "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" "github.com/sourcegraph/sourcegraph/internal/env" "github.com/sourcegraph/sourcegraph/internal/insights" @@ -321,16 +324,16 @@ type upgradeReadinessResolver struct { initOnce sync.Once initErr error - runner cliutil.Runner + runner *runner.Runner version string schemaNames []string } -var devSchemaFactory = cliutil.NewExpectedSchemaFactory( +var devSchemaFactory = schemas.NewExpectedSchemaFactory( "Local file", - []cliutil.NamedRegexp{{Regexp: lazyregexp.New(`^dev$`)}}, + []schemas.NamedRegexp{{Regexp: lazyregexp.New(`^dev$`)}}, func(filename, _ string) string { return filename }, - cliutil.ReadSchemaFromFile, + schemas.ReadSchemaFromFile, ) var schemaFactories = append( @@ -341,9 +344,9 @@ var schemaFactories = append( var insidersVersionPattern = lazyregexp.New(`^[\w-]+_\d{4}-\d{2}-\d{2}_\d+\.\d+-(\w+)$`) -func (r *upgradeReadinessResolver) init(ctx context.Context) (_ cliutil.Runner, version string, schemaNames []string, _ error) { +func (r *upgradeReadinessResolver) init(ctx context.Context) (_ *runner.Runner, version string, schemaNames []string, _ error) { r.initOnce.Do(func() { - r.runner, r.version, r.schemaNames, r.initErr = func() (cliutil.Runner, string, []string, error) { + r.runner, r.version, r.schemaNames, r.initErr = func() (*runner.Runner, string, []string, error) { schemaNames := []string{schemas.Frontend.Name, schemas.CodeIntel.Name} schemaList := []*schemas.Schema{schemas.Frontend, schemas.CodeIntel} if insights.IsCodeInsightsEnabled() { @@ -351,7 +354,7 @@ func (r *upgradeReadinessResolver) init(ctx context.Context) (_ cliutil.Runner, schemaList = append(schemaList, schemas.CodeInsights) } observationCtx := observation.NewContext(r.logger) - runner, err := migratorshared.NewRunnerWithSchemas(observationCtx, r.logger, schemaNames, schemaList) + runner, err := migration.NewRunnerWithSchemas(observationCtx, output.OutputFromLogger(r.logger), "frontend-upgradereadiness", schemaNames, schemaList) if err != nil { return nil, "", nil, errors.Wrap(err, "new runner") } @@ -447,12 +450,12 @@ func (r *upgradeReadinessResolver) SchemaDrift(ctx context.Context) ([]*schemaDr var buf bytes.Buffer driftOut := output.NewOutput(&buf, output.OutputOpts{}) - expectedSchema, err := cliutil.FetchExpectedSchema(ctx, schemaName, version, driftOut, schemaFactories) + expectedSchema, err := multiversion.FetchExpectedSchema(ctx, schemaName, version, driftOut, schemaFactories) if err != nil { return nil, err } - for _, summary := range drift.CompareSchemaDescriptions(schemaName, version, cliutil.Canonicalize(schema), cliutil.Canonicalize(expectedSchema)) { + for _, summary := range drift.CompareSchemaDescriptions(schemaName, version, multiversion.Canonicalize(schema), multiversion.Canonicalize(expectedSchema)) { resolvers = append(resolvers, &schemaDriftResolver{ summary: summary, }) diff --git a/cmd/frontend/internal/cli/serve_cmd.go b/cmd/frontend/internal/cli/serve_cmd.go index b96208d5825..7f59f5a5654 100644 --- a/cmd/frontend/internal/cli/serve_cmd.go +++ b/cmd/frontend/internal/cli/serve_cmd.go @@ -77,7 +77,7 @@ func InitDB(logger sglog.Logger) (*sql.DB, error) { return nil, errors.Errorf("failed to connect to frontend database: %s", err) } - if err := upgradestore.New(database.NewDB(logger, sqlDB)).UpdateServiceVersion(context.Background(), "frontend", version.Version()); err != nil { + if err := upgradestore.New(database.NewDB(logger, sqlDB)).UpdateServiceVersion(context.Background(), version.Version()); err != nil { return nil, err } diff --git a/cmd/migrator/shared/BUILD.bazel b/cmd/migrator/shared/BUILD.bazel index 52abd7ad93d..c23568a0b42 100644 --- a/cmd/migrator/shared/BUILD.bazel +++ b/cmd/migrator/shared/BUILD.bazel @@ -2,31 +2,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "shared", - srcs = [ - "conf.go", - "main.go", - "registration.go", - ], + srcs = ["main.go"], importpath = "github.com/sourcegraph/sourcegraph/cmd/migrator/shared", visibility = ["//visibility:public"], deps = [ - "//internal/conf/conftypes", - "//internal/database", - "//internal/database/basestore", - "//internal/database/connections/live", + "//internal/database/migration", "//internal/database/migration/cliutil", + "//internal/database/migration/runner", "//internal/database/migration/schemas", "//internal/database/migration/store", - "//internal/database/postgresdsn", - "//internal/jsonc", "//internal/observation", - "//internal/oobmigration", "//internal/oobmigration/migrations", "//internal/version", - "//lib/errors", "//lib/output", - "//schema", - "@com_github_keegancsmith_sqlf//:sqlf", "@com_github_sourcegraph_log//:log", "@com_github_urfave_cli_v2//:cli", ], diff --git a/cmd/migrator/shared/main.go b/cmd/migrator/shared/main.go index e0ead02fea7..472073d5d5e 100644 --- a/cmd/migrator/shared/main.go +++ b/cmd/migrator/shared/main.go @@ -2,19 +2,17 @@ package shared import ( "context" - "database/sql" "os" - "strings" "github.com/urfave/cli/v2" "github.com/sourcegraph/log" - connections "github.com/sourcegraph/sourcegraph/internal/database/connections/live" + "github.com/sourcegraph/sourcegraph/internal/database/migration" "github.com/sourcegraph/sourcegraph/internal/database/migration/cliutil" + "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" "github.com/sourcegraph/sourcegraph/internal/database/migration/store" - "github.com/sourcegraph/sourcegraph/internal/database/postgresdsn" "github.com/sourcegraph/sourcegraph/internal/observation" ossmigrations "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations" "github.com/sourcegraph/sourcegraph/internal/version" @@ -25,53 +23,27 @@ const appName = "migrator" var out = output.NewOutput(os.Stdout, output.OutputOpts{}) -// NewRunnerWithSchemas returns new migrator runner with given scheme names and -// definitions. -func NewRunnerWithSchemas(observationCtx *observation.Context, logger log.Logger, schemaNames []string, schemas []*schemas.Schema) (cliutil.Runner, error) { - dsns, err := postgresdsn.DSNsBySchema(schemaNames) - if err != nil { - return nil, err - } - - var dsnsStrings []string - for schema, dsn := range dsns { - dsnsStrings = append(dsnsStrings, schema+" => "+dsn) - } - - out.WriteLine(output.Linef(output.EmojiInfo, output.StyleGrey, "Connection DSNs used: %s", strings.Join(dsnsStrings, ", "))) - - storeFactory := func(db *sql.DB, migrationsTable string) connections.Store { - return connections.NewStoreShim(store.NewWithDB(observationCtx, db, migrationsTable)) - } - r, err := connections.RunnerFromDSNsWithSchemas(out, logger, dsns, appName, storeFactory, schemas) - if err != nil { - return nil, err - } - - return cliutil.NewShim(r), nil -} - // DefaultSchemaFactories is a list of schema factories to be used in // non-exceptional cases. -var DefaultSchemaFactories = []cliutil.ExpectedSchemaFactory{ - cliutil.LocalExpectedSchemaFactory, - cliutil.GitHubExpectedSchemaFactory, - cliutil.GCSExpectedSchemaFactory, +var DefaultSchemaFactories = []schemas.ExpectedSchemaFactory{ + schemas.LocalExpectedSchemaFactory, + schemas.GitHubExpectedSchemaFactory, + schemas.GCSExpectedSchemaFactory, } -func Start(logger log.Logger, registerEnterpriseMigrators registerMigratorsUsingConfAndStoreFactoryFunc) error { +func Start(logger log.Logger, registerEnterpriseMigrators store.RegisterMigratorsUsingConfAndStoreFactoryFunc) error { observationCtx := observation.NewContext(logger) outputFactory := func() *output.Output { return out } - newRunnerWithSchemas := func(schemaNames []string, schemas []*schemas.Schema) (cliutil.Runner, error) { - return NewRunnerWithSchemas(observationCtx, logger, schemaNames, schemas) + newRunnerWithSchemas := func(schemaNames []string, schemas []*schemas.Schema) (*runner.Runner, error) { + return migration.NewRunnerWithSchemas(observationCtx, out, "migrator", schemaNames, schemas) } - newRunner := func(schemaNames []string) (cliutil.Runner, error) { + newRunner := func(schemaNames []string) (*runner.Runner, error) { return newRunnerWithSchemas(schemaNames, schemas.Schemas) } - registerMigrators := composeRegisterMigratorsFuncs( + registerMigrators := store.ComposeRegisterMigratorsFuncs( ossmigrations.RegisterOSSMigratorsUsingConfAndStoreFactory, registerEnterpriseMigrators, ) diff --git a/cmd/migrator/shared/registration.go b/cmd/migrator/shared/registration.go deleted file mode 100644 index 36105889de3..00000000000 --- a/cmd/migrator/shared/registration.go +++ /dev/null @@ -1,43 +0,0 @@ -package shared - -import ( - "context" - - "github.com/sourcegraph/sourcegraph/internal/conf/conftypes" - "github.com/sourcegraph/sourcegraph/internal/database" - "github.com/sourcegraph/sourcegraph/internal/oobmigration" - "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations" -) - -type registerMigratorsUsingConfAndStoreFactoryFunc func( - ctx context.Context, - db database.DB, - runner *oobmigration.Runner, - conf conftypes.UnifiedQuerier, - storeFactory migrations.StoreFactory, -) error - -func composeRegisterMigratorsFuncs(fnsFromConfAndStoreFactory ...registerMigratorsUsingConfAndStoreFactoryFunc) func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc { - return func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc { - return func(ctx context.Context, db database.DB, runner *oobmigration.Runner) error { - conf, err := newStaticConf(ctx, db) - if err != nil { - return err - } - - fns := make([]oobmigration.RegisterMigratorsFunc, 0, len(fnsFromConfAndStoreFactory)) - for _, f := range fnsFromConfAndStoreFactory { - f := f // avoid loop capture - if f == nil { - continue - } - - fns = append(fns, func(ctx context.Context, db database.DB, runner *oobmigration.Runner) error { - return f(ctx, db, runner, conf, storeFactory) - }) - } - - return oobmigration.ComposeRegisterMigratorsFuncs(fns...)(ctx, db, runner) - } - } -} diff --git a/dev/sg/sg_migration.go b/dev/sg/sg_migration.go index 6b7fb5e4610..531942760fe 100644 --- a/dev/sg/sg_migration.go +++ b/dev/sg/sg_migration.go @@ -20,6 +20,7 @@ import ( "github.com/sourcegraph/sourcegraph/dev/sg/root" connections "github.com/sourcegraph/sourcegraph/internal/database/connections/live" "github.com/sourcegraph/sourcegraph/internal/database/migration/cliutil" + "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" "github.com/sourcegraph/sourcegraph/internal/database/migration/store" "github.com/sourcegraph/sourcegraph/internal/database/postgresdsn" @@ -114,9 +115,9 @@ var ( // at compile-time in sg. outputFactory = func() *output.Output { return std.Out.Output } - schemaFactories = []cliutil.ExpectedSchemaFactory{ + schemaFactories = []schemas.ExpectedSchemaFactory{ localGitExpectedSchemaFactory, - cliutil.GCSExpectedSchemaFactory, + schemas.GCSExpectedSchemaFactory, } upCommand = cliutil.Up("sg migration", makeRunner, outputFactory, true) @@ -209,7 +210,7 @@ sg migration squash } ) -func makeRunner(schemaNames []string) (cliutil.Runner, error) { +func makeRunner(schemaNames []string) (*runner.Runner, error) { filesystemSchemas, err := getFilesystemSchemas() if err != nil { return nil, err @@ -218,7 +219,7 @@ func makeRunner(schemaNames []string) (cliutil.Runner, error) { return makeRunnerWithSchemas(schemaNames, filesystemSchemas) } -func makeRunnerWithSchemas(schemaNames []string, schemas []*schemas.Schema) (cliutil.Runner, error) { +func makeRunnerWithSchemas(schemaNames []string, schemas []*schemas.Schema) (*runner.Runner, error) { // Try to read the `sg` configuration so we can read ENV vars from the // configuration and use process env as fallback. var getEnv func(string) string @@ -238,13 +239,13 @@ func makeRunnerWithSchemas(schemaNames []string, schemas []*schemas.Schema) (cli return nil, err } - return cliutil.NewShim(r), nil + return r, nil } // localGitExpectedSchemaFactory returns the description of the given schema at the given version via the // (assumed) local git clone. If the version is not resolvable as a git rev-like, or if the file does not // exist at that revision, then a false valued-flag is returned. All other failures are reported as errors. -var localGitExpectedSchemaFactory = cliutil.NewExpectedSchemaFactory( +var localGitExpectedSchemaFactory = schemas.NewExpectedSchemaFactory( "git", nil, func(filename, version string) string { diff --git a/enterprise/cmd/frontend/BUILD.bazel b/enterprise/cmd/frontend/BUILD.bazel index a5c73c5c98f..6c4ebd195a1 100644 --- a/enterprise/cmd/frontend/BUILD.bazel +++ b/enterprise/cmd/frontend/BUILD.bazel @@ -16,6 +16,7 @@ go_library( deps = [ "//enterprise/cmd/frontend/shared", "//enterprise/cmd/sourcegraph/enterprisecmd", + "//internal/oobmigration", "//internal/sanitycheck", "//ui/assets", "//ui/assets/enterprise", diff --git a/enterprise/cmd/frontend/main.go b/enterprise/cmd/frontend/main.go index 45a3fc9a3fe..8b2e674ce75 100644 --- a/enterprise/cmd/frontend/main.go +++ b/enterprise/cmd/frontend/main.go @@ -6,12 +6,18 @@ import ( "github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/shared" "github.com/sourcegraph/sourcegraph/enterprise/cmd/sourcegraph/enterprisecmd" + "github.com/sourcegraph/sourcegraph/internal/oobmigration" "github.com/sourcegraph/sourcegraph/internal/sanitycheck" "github.com/sourcegraph/sourcegraph/ui/assets" _ "github.com/sourcegraph/sourcegraph/ui/assets/enterprise" // Select enterprise assets ) +func init() { + // TODO(sqs): TODO(single-binary): could we move this out of init? + oobmigration.ReturnEnterpriseMigrations = true +} + func main() { sanitycheck.Pass() if os.Getenv("WEBPACK_DEV_SERVER") == "1" { diff --git a/enterprise/cmd/sourcegraph/enterprisecmd/BUILD.bazel b/enterprise/cmd/sourcegraph/enterprisecmd/BUILD.bazel index c801c09fe0a..de030371648 100644 --- a/enterprise/cmd/sourcegraph/enterprisecmd/BUILD.bazel +++ b/enterprise/cmd/sourcegraph/enterprisecmd/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/sourcegraph/sourcegraph/enterprise/cmd/sourcegraph/enterprisecmd", visibility = ["//visibility:public"], deps = [ - "//internal/oobmigration", "//internal/service", "//internal/service/svcmain", ], diff --git a/enterprise/cmd/sourcegraph/enterprisecmd/enterprisecmd.go b/enterprise/cmd/sourcegraph/enterprisecmd/enterprisecmd.go index 3cbe82769fa..f6f07ef4d6a 100644 --- a/enterprise/cmd/sourcegraph/enterprisecmd/enterprisecmd.go +++ b/enterprise/cmd/sourcegraph/enterprisecmd/enterprisecmd.go @@ -4,7 +4,6 @@ package enterprisecmd import ( - "github.com/sourcegraph/sourcegraph/internal/oobmigration" "github.com/sourcegraph/sourcegraph/internal/service" "github.com/sourcegraph/sourcegraph/internal/service/svcmain" ) @@ -21,8 +20,3 @@ func MainEnterprise(services []service.Service, args []string) { func SingleServiceMainEnterprise(service service.Service) { svcmain.SingleServiceMain(service, config) } - -func init() { - // TODO(sqs): TODO(single-binary): could we move this out of init? - oobmigration.ReturnEnterpriseMigrations = true -} diff --git a/internal/database/migration/BUILD.bazel b/internal/database/migration/BUILD.bazel new file mode 100644 index 00000000000..f1f994284e9 --- /dev/null +++ b/internal/database/migration/BUILD.bazel @@ -0,0 +1,17 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "migration", + srcs = ["runner.go"], + importpath = "github.com/sourcegraph/sourcegraph/internal/database/migration", + visibility = ["//:__subpackages__"], + deps = [ + "//internal/database/connections/live", + "//internal/database/migration/runner", + "//internal/database/migration/schemas", + "//internal/database/migration/store", + "//internal/database/postgresdsn", + "//internal/observation", + "//lib/output", + ], +) diff --git a/internal/database/migration/cliutil/BUILD.bazel b/internal/database/migration/cliutil/BUILD.bazel index 13560559e71..9e30d656649 100644 --- a/internal/database/migration/cliutil/BUILD.bazel +++ b/internal/database/migration/cliutil/BUILD.bazel @@ -10,8 +10,6 @@ go_library( "downto.go", "drift.go", "drift_autofix.go", - "drift_schema.go", - "drift_util.go", "help.go", "iface.go", "multiversion.go", @@ -27,13 +25,12 @@ go_library( visibility = ["//:__subpackages__"], deps = [ "//internal/database", - "//internal/database/basestore", "//internal/database/migration/definition", "//internal/database/migration/drift", + "//internal/database/migration/multiversion", "//internal/database/migration/runner", "//internal/database/migration/schemas", - "//internal/database/migration/shared", - "//internal/lazyregexp", + "//internal/database/migration/store", "//internal/observation", "//internal/oobmigration", "//internal/oobmigration/migrations", @@ -41,9 +38,7 @@ go_library( "//internal/version/upgradestore", "//lib/errors", "//lib/output", - "@com_github_google_go_cmp//cmp", "@com_github_sourcegraph_log//:log", "@com_github_urfave_cli_v2//:cli", - "@org_cuelang_go//pkg/strings", ], ) diff --git a/internal/database/migration/cliutil/downgrade.go b/internal/database/migration/cliutil/downgrade.go index 8a52ea6a196..fc6477cd7a5 100644 --- a/internal/database/migration/cliutil/downgrade.go +++ b/internal/database/migration/cliutil/downgrade.go @@ -6,6 +6,10 @@ import ( "github.com/urfave/cli/v2" + "github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion" + "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" + "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" + "github.com/sourcegraph/sourcegraph/internal/database/migration/store" "github.com/sourcegraph/sourcegraph/internal/oobmigration" "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations" "github.com/sourcegraph/sourcegraph/lib/errors" @@ -14,10 +18,10 @@ import ( func Downgrade( commandName string, - runnerFactory RunnerFactoryWithSchemas, + runnerFactory runner.RunnerFactoryWithSchemas, outFactory OutputFactory, registerMigrators func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc, - expectedSchemaFactories ...ExpectedSchemaFactory, + expectedSchemaFactories ...schemas.ExpectedSchemaFactory, ) *cli.Command { fromFlag := &cli.StringFlag{ Name: "from", @@ -118,7 +122,7 @@ func Downgrade( // Find the relevant schema and data migrations to perform (and in what order) // for the given version range. - plan, err := planMigration(from, to, versionRange, interrupts) + plan, err := multiversion.PlanMigration(from, to, versionRange, interrupts) if err != nil { return err } @@ -128,9 +132,21 @@ func Downgrade( return err } + runner, err := runnerFactory(schemas.SchemaNames, schemas.Schemas) + if err != nil { + return errors.Wrap(err, "new runner") + } + + // connect to db and get upgrade readiness state + db, err := store.ExtractDatabase(ctx, runner) + if err != nil { + return errors.Wrap(err, "new db handle") + } + // Perform the downgrade on the configured databases. - return runMigration( + return multiversion.RunMigration( ctx, + db, runnerFactory, plan, privilegedMode, diff --git a/internal/database/migration/cliutil/drift.go b/internal/database/migration/cliutil/drift.go index 95576c30ac3..c9b7ede7896 100644 --- a/internal/database/migration/cliutil/drift.go +++ b/internal/database/migration/cliutil/drift.go @@ -3,20 +3,18 @@ package cliutil import ( "context" "fmt" - "net/url" - "sort" - "cuelang.org/go/pkg/strings" "github.com/urfave/cli/v2" "github.com/sourcegraph/sourcegraph/internal/database/migration/drift" - descriptions "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" + "github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion" + "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" "github.com/sourcegraph/sourcegraph/internal/oobmigration" "github.com/sourcegraph/sourcegraph/lib/errors" "github.com/sourcegraph/sourcegraph/lib/output" ) -func Drift(commandName string, factory RunnerFactory, outFactory OutputFactory, development bool, expectedSchemaFactories ...ExpectedSchemaFactory) *cli.Command { +func Drift(commandName string, factory RunnerFactory, outFactory OutputFactory, development bool, expectedSchemaFactories ...schemas.ExpectedSchemaFactory) *cli.Command { defaultVersion := "" if development { defaultVersion = "HEAD" @@ -126,33 +124,33 @@ func Drift(commandName string, factory RunnerFactory, outFactory OutputFactory, } if file != "" { - expectedSchemaFactories = []ExpectedSchemaFactory{ - NewExplicitFileSchemaFactory(file), + expectedSchemaFactories = []schemas.ExpectedSchemaFactory{ + schemas.NewExplicitFileSchemaFactory(file), } } - expectedSchema, err := FetchExpectedSchema(ctx, schemaName, version, out, expectedSchemaFactories) + expectedSchema, err := multiversion.FetchExpectedSchema(ctx, schemaName, version, out, expectedSchemaFactories) if err != nil { return err } - schemas, err := store.Describe(ctx) + allSchemas, err := store.Describe(ctx) if err != nil { return err } - schema := schemas["public"] - summaries := drift.CompareSchemaDescriptions(schemaName, version, Canonicalize(schema), Canonicalize(expectedSchema)) + schema := allSchemas["public"] + summaries := drift.CompareSchemaDescriptions(schemaName, version, multiversion.Canonicalize(schema), multiversion.Canonicalize(expectedSchema)) if autofixFlag.Get(cmd) { - summaries, err = attemptAutofix(ctx, out, store, summaries, func(schema descriptions.SchemaDescription) []drift.Summary { - return drift.CompareSchemaDescriptions(schemaName, version, Canonicalize(schema), Canonicalize(expectedSchema)) + summaries, err = attemptAutofix(ctx, out, store, summaries, func(schema schemas.SchemaDescription) []drift.Summary { + return drift.CompareSchemaDescriptions(schemaName, version, multiversion.Canonicalize(schema), multiversion.Canonicalize(expectedSchema)) }) if err != nil { return err } } - return displayDriftSummaries(out, summaries) + return drift.DisplaySchemaSummaries(out, summaries) }) flags := []cli.Flag{ @@ -174,117 +172,3 @@ func Drift(commandName string, factory RunnerFactory, outFactory OutputFactory, Flags: flags, } } - -func FetchExpectedSchema( - ctx context.Context, - schemaName string, - version string, - out *output.Output, - expectedSchemaFactories []ExpectedSchemaFactory, -) (descriptions.SchemaDescription, error) { - filename, err := getSchemaJSONFilename(schemaName) - if err != nil { - return descriptions.SchemaDescription{}, err - } - - out.WriteLine(output.Line(output.EmojiInfo, output.StyleReset, "Locating schema description")) - - for i, factory := range expectedSchemaFactories { - matches := false - patterns := factory.VersionPatterns() - for _, pattern := range patterns { - if pattern.MatchString(version) { - matches = true - break - } - } - if len(patterns) > 0 && !matches { - continue - } - - resourcePath := factory.ResourcePath(filename, version) - expectedSchema, err := factory.CreateFromPath(ctx, resourcePath) - if err != nil { - suffix := "" - if i < len(expectedSchemaFactories)-1 { - suffix = " Will attempt a fallback source." - } - - out.WriteLine(output.Linef(output.EmojiInfo, output.StyleReset, "Reading schema definition in %s (%s)... Schema not found (%s).%s", factory.Name(), resourcePath, err, suffix)) - continue - } - - out.WriteLine(output.Linef(output.EmojiSuccess, output.StyleReset, "Schema found in %s (%s).", factory.Name(), resourcePath)) - return expectedSchema, nil - } - - exampleMap := map[string]struct{}{} - failedPaths := map[string]struct{}{} - for _, factory := range expectedSchemaFactories { - for _, pattern := range factory.VersionPatterns() { - if !pattern.MatchString(version) { - exampleMap[pattern.Example()] = struct{}{} - } else { - failedPaths[factory.ResourcePath(filename, version)] = struct{}{} - } - } - } - - versionExamples := make([]string, 0, len(exampleMap)) - for pattern := range exampleMap { - versionExamples = append(versionExamples, pattern) - } - sort.Strings(versionExamples) - - paths := make([]string, 0, len(exampleMap)) - for path := range failedPaths { - if u, err := url.Parse(path); err == nil && (u.Scheme == "http" || u.Scheme == "https") { - paths = append(paths, path) - } - } - sort.Strings(paths) - - if len(paths) > 0 { - var additionalHints string - if len(versionExamples) > 0 { - additionalHints = fmt.Sprintf( - "Alternative, provide a different version that matches one of the following patterns: \n - %s\n", strings.Join(versionExamples, "\n - "), - ) - } - - out.WriteLine(output.Linef( - output.EmojiLightbulb, - output.StyleFailure, - "Schema not found. "+ - "Check if the following resources exist. "+ - "If they do, then the context in which this migrator is being run may not be permitted to reach the public internet."+ - "\n - %s\n%s", - strings.Join(paths, "\n - "), - additionalHints, - )) - } else if len(versionExamples) > 0 { - out.WriteLine(output.Linef( - output.EmojiLightbulb, - output.StyleFailure, - "Schema not found. Ensure your supplied version matches one of the following patterns: \n - %s\n", strings.Join(versionExamples, "\n - "), - )) - } - - return descriptions.SchemaDescription{}, errors.New("failed to locate target schema description") -} - -func Canonicalize(schemaDescription descriptions.SchemaDescription) descriptions.SchemaDescription { - descriptions.Canonicalize(schemaDescription) - - filtered := schemaDescription.Tables[:0] - for i, table := range schemaDescription.Tables { - if table.Name == "migration_logs" { - continue - } - - filtered = append(filtered, schemaDescription.Tables[i]) - } - schemaDescription.Tables = filtered - - return schemaDescription -} diff --git a/internal/database/migration/cliutil/iface.go b/internal/database/migration/cliutil/iface.go index 95b54c5a122..a525796e82e 100644 --- a/internal/database/migration/cliutil/iface.go +++ b/internal/database/migration/cliutil/iface.go @@ -25,17 +25,4 @@ type Store interface { // OutputFactory allows providing global output that might not be instantiated at compile time. type OutputFactory func() *output.Output -type RunnerFactory func(schemaNames []string) (Runner, error) -type RunnerFactoryWithSchemas func(schemaNames []string, schemas []*schemas.Schema) (Runner, error) - -type runnerShim struct { - *runner.Runner -} - -func NewShim(runner *runner.Runner) Runner { - return &runnerShim{Runner: runner} -} - -func (r *runnerShim) Store(ctx context.Context, schemaName string) (Store, error) { - return r.Runner.Store(ctx, schemaName) -} +type RunnerFactory func(schemaNames []string) (*runner.Runner, error) diff --git a/internal/database/migration/cliutil/multiversion.go b/internal/database/migration/cliutil/multiversion.go index 5400bf5a00b..747a41511f5 100644 --- a/internal/database/migration/cliutil/multiversion.go +++ b/internal/database/migration/cliutil/multiversion.go @@ -1,289 +1,28 @@ package cliutil import ( - "bytes" "context" - "fmt" - "github.com/sourcegraph/sourcegraph/internal/database/migration/definition" - "github.com/sourcegraph/sourcegraph/internal/database/migration/drift" "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" - "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" - "github.com/sourcegraph/sourcegraph/internal/database/migration/shared" + "github.com/sourcegraph/sourcegraph/internal/database/migration/store" "github.com/sourcegraph/sourcegraph/internal/oobmigration" - "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations" "github.com/sourcegraph/sourcegraph/internal/version/upgradestore" "github.com/sourcegraph/sourcegraph/lib/errors" - "github.com/sourcegraph/sourcegraph/lib/output" ) -type migrationPlan struct { - // the source and target instance versions - from, to oobmigration.Version - - // the stitched schema migration definitions over the entire version range by schema name - stitchedDefinitionsBySchemaName map[string]*definition.Definitions - - // the sequence of migration steps over the stiched schema migration definitions; we can't - // simply apply all schema migrations as out-of-band migration can only run within a certain - // slice of the schema's definition where that out-of-band migration was defined - steps []migrationStep -} - -type migrationStep struct { - // the target version to migrate to - instanceVersion oobmigration.Version - - // the leaf migrations of this version by schema name - schemaMigrationLeafIDsBySchemaName map[string][]int - - // the set of out-of-band migrations that must complete before schema migrations begin - // for the following minor instance version - outOfBandMigrationIDs []int -} - -// planMigration returns a path to migrate through the given version ranges. Each step corresponds to -// a target instance version to migrate to, and a set of out-of-band migraitons that need to complete. -// Done insequence, it forms a complete multi-version upgrade or migration plan. -func planMigration( - from, to oobmigration.Version, - versionRange []oobmigration.Version, - interrupts []oobmigration.MigrationInterrupt, -) (migrationPlan, error) { - versionTags := make([]string, 0, len(versionRange)) - for _, version := range versionRange { - versionTags = append(versionTags, version.GitTag()) - } - - // Retrieve relevant stitched migrations for this version range - stitchedMigrationBySchemaName, err := filterStitchedMigrationsForTags(versionTags) - if err != nil { - return migrationPlan{}, err - } - - // Extract/rotate stitched migration definitions so we can query them by schem name - stitchedDefinitionsBySchemaName := make(map[string]*definition.Definitions, len(stitchedMigrationBySchemaName)) - for schemaName, stitchedMigration := range stitchedMigrationBySchemaName { - stitchedDefinitionsBySchemaName[schemaName] = stitchedMigration.Definitions - } - - // Extract/rotate leaf identifiers so we can query them by version/git-tag first - leafIDsBySchemaNameByTag := make(map[string]map[string][]int, len(versionRange)) - for schemaName, stitchedMigration := range stitchedMigrationBySchemaName { - for tag, bounds := range stitchedMigration.BoundsByRev { - if _, ok := leafIDsBySchemaNameByTag[tag]; !ok { - leafIDsBySchemaNameByTag[tag] = map[string][]int{} - } - - leafIDsBySchemaNameByTag[tag][schemaName] = bounds.LeafIDs - } - } - - // - // Interleave out-of-band migration interrupts and schema migrations - - steps := make([]migrationStep, 0, len(interrupts)+1) - for _, interrupt := range interrupts { - steps = append(steps, migrationStep{ - instanceVersion: interrupt.Version, - schemaMigrationLeafIDsBySchemaName: leafIDsBySchemaNameByTag[interrupt.Version.GitTag()], - outOfBandMigrationIDs: interrupt.MigrationIDs, - }) - } - steps = append(steps, migrationStep{ - instanceVersion: to, - schemaMigrationLeafIDsBySchemaName: leafIDsBySchemaNameByTag[to.GitTag()], - outOfBandMigrationIDs: nil, // all required out of band migrations have already completed - }) - - return migrationPlan{ - from: from, - to: to, - stitchedDefinitionsBySchemaName: stitchedDefinitionsBySchemaName, - steps: steps, - }, nil -} - -// runMigration initializes a schema and out-of-band migration runner and performs the given migration plan. -func runMigration( - ctx context.Context, - runnerFactory RunnerFactoryWithSchemas, - plan migrationPlan, - privilegedMode runner.PrivilegedMode, - privilegedHashes []string, - skipVersionCheck bool, - skipDriftCheck bool, - dryRun bool, - up bool, - animateProgress bool, - registerMigratorsWithStore func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc, - expectedSchemaFactories []ExpectedSchemaFactory, - out *output.Output, -) error { - var runnerSchemas []*schemas.Schema - for _, schemaName := range schemas.SchemaNames { - runnerSchemas = append(runnerSchemas, &schemas.Schema{ - Name: schemaName, - MigrationsTableName: schemas.MigrationsTableName(schemaName), - Definitions: plan.stitchedDefinitionsBySchemaName[schemaName], - }) - } - - r, err := runnerFactory(schemas.SchemaNames, runnerSchemas) - if err != nil { - return err - } - db, err := extractDatabase(ctx, r) - if err != nil { - return err - } - registerMigrators := registerMigratorsWithStore(basestoreExtractor{r}) - - // Note: Error is correctly checked here; we want to use the return value - // `patch` below but only if we can best-effort fetch it. We want to allow - // the user to skip erroring here if they are explicitly skipping this - // version check. - version, patch, ok, err := GetServiceVersion(ctx, r) - if !skipVersionCheck { - if err != nil { - return err - } - if !ok { - err := errors.Newf("version assertion failed: unknown version != %q", plan.from) - return errors.Newf("%s. Re-invoke with --skip-version-check to ignore this check", err) - } - if oobmigration.CompareVersions(version, plan.from) != oobmigration.VersionOrderEqual { - err := errors.Newf("version assertion failed: %q != %q", version, plan.from) - return errors.Newf("%s. Re-invoke with --skip-version-check to ignore this check", err) - } - } - - if !skipDriftCheck { - if err := CheckDrift(ctx, r, plan.from.GitTagWithPatch(patch), out, false, schemas.SchemaNames, expectedSchemaFactories); err != nil { - return err - } - } - - for i, step := range plan.steps { - out.WriteLine(output.Linef( - output.EmojiFingerPointRight, - output.StyleReset, - "Migrating to v%s (step %d of %d)", - step.instanceVersion, - i+1, - len(plan.steps), - )) - - out.WriteLine(output.Line(output.EmojiFingerPointRight, output.StyleReset, "Running schema migrations")) - - if !dryRun { - operationType := runner.MigrationOperationTypeTargetedUp - if !up { - operationType = runner.MigrationOperationTypeTargetedDown - } - - operations := make([]runner.MigrationOperation, 0, len(step.schemaMigrationLeafIDsBySchemaName)) - for schemaName, leafMigrationIDs := range step.schemaMigrationLeafIDsBySchemaName { - operations = append(operations, runner.MigrationOperation{ - SchemaName: schemaName, - Type: operationType, - TargetVersions: leafMigrationIDs, - }) - } - - if err := r.Run(ctx, runner.Options{ - Operations: operations, - PrivilegedMode: privilegedMode, - MatchPrivilegedHash: func(hash string) bool { - for _, candidate := range privilegedHashes { - if hash == candidate { - return true - } - } - - return false - }, - IgnoreSingleDirtyLog: true, - IgnoreSinglePendingLog: true, - }); err != nil { - return err - } - - out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Schema migrations complete")) - } - - if len(step.outOfBandMigrationIDs) > 0 { - if err := runOutOfBandMigrations( - ctx, - db, - dryRun, - up, - animateProgress, - registerMigrators, - out, - step.outOfBandMigrationIDs, - ); err != nil { - return err - } - } - } - - if !dryRun { - // After successful migration, set the new instance version. The frontend still checks on - // startup that the previously running instance version was only one minor version away. - // If we run the upload without updating that value, the new instance will refuse to - // start without manual modification of the database. - // - // Note that we don't want to get rid of that check entirely from the frontend, as we do - // still want to catch the cases where site-admins "jump forward" several versions while - // using the standard upgrade path (not a multi-version upgrade that handles these cases). - - if err := setServiceVersion(ctx, r, plan.to); err != nil { - return err - } - } - - return nil -} - -// filterStitchedMigrationsForTags returns a copy of the pre-compiled stitchedMap with references -// to tags outside of the given set removed. This allows a migrator instance that knows the migration -// path from X -> Y to also know the path from any partial migration X <= W -> Z <= Y. -func filterStitchedMigrationsForTags(tags []string) (map[string]shared.StitchedMigration, error) { - filteredStitchedMigrationBySchemaName := make(map[string]shared.StitchedMigration, len(schemas.SchemaNames)) - for _, schemaName := range schemas.SchemaNames { - boundsByRev := make(map[string]shared.MigrationBounds, len(tags)) - for _, tag := range tags { - bounds, ok := shared.StitchedMigationsBySchemaName[schemaName].BoundsByRev[tag] - if !ok { - return nil, errors.Newf("unknown tag %q", tag) - } - - boundsByRev[tag] = bounds - } - - filteredStitchedMigrationBySchemaName[schemaName] = shared.StitchedMigration{ - Definitions: shared.StitchedMigationsBySchemaName[schemaName].Definitions, - BoundsByRev: boundsByRev, - } - } - - return filteredStitchedMigrationBySchemaName, nil -} - // GetRawServiceVersion returns the frontend service version information for the given runner as a raw string. -func GetRawServiceVersion(ctx context.Context, r Runner) (_ string, ok bool, _ error) { - db, err := extractDatabase(ctx, r) +func GetRawServiceVersion(ctx context.Context, r *runner.Runner) (_ string, ok bool, _ error) { + db, err := store.ExtractDatabase(ctx, r) if err != nil { return "", false, err } - return upgradestore.New(db).GetServiceVersion(ctx, "frontend") + return upgradestore.New(db).GetServiceVersion(ctx) } // GetServiceVersion returns the frontend service version information for the given runner as a parsed version. // Both of the return values `ok` and `error` should be checked to ensure a valid version is returned. -func GetServiceVersion(ctx context.Context, r Runner) (_ oobmigration.Version, patch int, ok bool, _ error) { +func GetServiceVersion(ctx context.Context, r *runner.Runner) (_ oobmigration.Version, patch int, ok bool, _ error) { versionStr, ok, err := GetRawServiceVersion(ctx, r) if err != nil { return oobmigration.Version{}, 0, false, err @@ -299,108 +38,3 @@ func GetServiceVersion(ctx context.Context, r Runner) (_ oobmigration.Version, p return version, patch, true, nil } - -func setServiceVersion(ctx context.Context, r Runner, version oobmigration.Version) error { - db, err := extractDatabase(ctx, r) - if err != nil { - return err - } - - return upgradestore.New(db).SetServiceVersion( - ctx, - "frontend", - fmt.Sprintf("%d.%d.0", version.Major, version.Minor), - ) -} - -var ErrDatabaseDriftDetected = errors.New("database drift detected") - -// CheckDrift uses given runner to check whether schema drift exists for any -// non-empty database. It returns ErrDatabaseDriftDetected when the schema drift -// exists, and nil error when not. -// -// - The `verbose` indicates whether to collect drift details in the output. -// - The `schemaNames` is the list of schema names to check for drift. -// - The `expectedSchemaFactories` is the means to retrieve the schema. -// definitions at the target version. -func CheckDrift(ctx context.Context, r Runner, version string, out *output.Output, verbose bool, schemaNames []string, expectedSchemaFactories []ExpectedSchemaFactory) error { - type schemaWithDrift struct { - name string - drift *bytes.Buffer - } - schemasWithDrift := make([]*schemaWithDrift, 0, len(schemaNames)) - for _, schemaName := range schemaNames { - store, err := r.Store(ctx, schemaName) - if err != nil { - return errors.Wrap(err, "get migration store") - } - schemaDescriptions, err := store.Describe(ctx) - if err != nil { - return err - } - schema := schemaDescriptions["public"] - - var buf bytes.Buffer - driftOut := output.NewOutput(&buf, output.OutputOpts{}) - - expectedSchema, err := FetchExpectedSchema(ctx, schemaName, version, driftOut, expectedSchemaFactories) - if err != nil { - return err - } - - if err := displayDriftSummaries(driftOut, drift.CompareSchemaDescriptions(schemaName, version, Canonicalize(schema), Canonicalize(expectedSchema))); err != nil { - schemasWithDrift = append(schemasWithDrift, - &schemaWithDrift{ - name: schemaName, - drift: &buf, - }, - ) - } - } - - drift := false - for _, schemaWithDrift := range schemasWithDrift { - empty, err := isEmptySchema(ctx, r, schemaWithDrift.name) - if err != nil { - return err - } - if empty { - continue - } - - drift = true - out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Schema drift detected for %s", schemaWithDrift.name)) - if verbose { - out.Write(schemaWithDrift.drift.String()) - } - } - if !drift { - return nil - } - - out.WriteLine(output.Linef( - output.EmojiLightbulb, - output.StyleItalic, - ""+ - "Before continuing with this operation, run the migrator's drift command and follow instructions to repair the schema to the expected current state."+ - " "+ - "See https://docs.sourcegraph.com/admin/how-to/manual_database_migrations#drift for additional instructions."+ - "\n", - )) - - return ErrDatabaseDriftDetected -} - -func isEmptySchema(ctx context.Context, r Runner, schemaName string) (bool, error) { - store, err := r.Store(ctx, schemaName) - if err != nil { - return false, err - } - - appliedVersions, _, _, err := store.Versions(ctx) - if err != nil { - return false, err - } - - return len(appliedVersions) == 0, nil -} diff --git a/internal/database/migration/cliutil/run_oobmigrations.go b/internal/database/migration/cliutil/run_oobmigrations.go index a10f2f73b83..2a8abdcb127 100644 --- a/internal/database/migration/cliutil/run_oobmigrations.go +++ b/internal/database/migration/cliutil/run_oobmigrations.go @@ -2,22 +2,14 @@ package cliutil import ( "context" - "database/sql" - "fmt" - "os" - "sort" - "time" "github.com/urfave/cli/v2" - "github.com/sourcegraph/log" - - "github.com/sourcegraph/sourcegraph/internal/database" - "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion" "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" + "github.com/sourcegraph/sourcegraph/internal/database/migration/store" "github.com/sourcegraph/sourcegraph/internal/oobmigration" oobmigrations "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations" - "github.com/sourcegraph/sourcegraph/lib/errors" "github.com/sourcegraph/sourcegraph/lib/output" ) @@ -48,13 +40,13 @@ func RunOutOfBandMigrations( if err != nil { return err } - db, err := extractDatabase(ctx, r) + db, err := store.ExtractDatabase(ctx, r) if err != nil { return err } - registerMigrators := registerMigratorsWithStore(basestoreExtractor{r}) + registerMigrators := registerMigratorsWithStore(store.BasestoreExtractor{r}) - if err := runOutOfBandMigrations( + if err := multiversion.RunOutOfBandMigrations( ctx, db, false, // dry-run @@ -82,169 +74,3 @@ func RunOutOfBandMigrations( }, } } - -func runOutOfBandMigrations( - ctx context.Context, - db database.DB, - dryRun bool, - up bool, - animateProgress bool, - registerMigrations oobmigration.RegisterMigratorsFunc, - out *output.Output, - ids []int, -) (err error) { - if len(ids) != 0 { - out.WriteLine(output.Linef(output.EmojiFingerPointRight, output.StyleReset, "Running out of band migrations %v", ids)) - if dryRun { - return nil - } - } - - store := oobmigration.NewStoreWithDB(db) - runner := outOfBandMigrationRunnerWithStore(store) - if err := runner.SynchronizeMetadata(ctx); err != nil { - return err - } - if err := registerMigrations(ctx, db, runner); err != nil { - return err - } - - if len(ids) == 0 { - migrations, err := store.List(ctx) - if err != nil { - return err - } - - for _, migration := range migrations { - ids = append(ids, migration.ID) - } - } - sort.Ints(ids) - - if dryRun { - return nil - } - - if err := runner.UpdateDirection(ctx, ids, !up); err != nil { - return err - } - - go runner.StartPartial(ids) - defer runner.Stop() - defer func() { - if err == nil { - out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Out of band migrations complete")) - } else { - out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Out of band migrations failed: %s", err)) - } - }() - - updateMigrationProgress, cleanup := makeOutOfBandMigrationProgressUpdater(out, ids, animateProgress) - defer cleanup() - - ticker := time.NewTicker(time.Second).C - for { - migrations, err := getMigrations(ctx, store, ids) - if err != nil { - return err - } - sort.Slice(migrations, func(i, j int) bool { return migrations[i].ID < migrations[j].ID }) - - for i, m := range migrations { - updateMigrationProgress(i, m) - } - - complete := true - for _, m := range migrations { - if !m.Complete() { - if m.ApplyReverse && m.NonDestructive { - continue - } - - complete = false - } - } - if complete { - return nil - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-ticker: - } - } -} - -// makeOutOfBandMigrationProgressUpdater returns a two functions: `update` should be called -// when the updates to the progress of an out-of-band migration are made and should be reflected -// in the output; and `cleanup` should be called on defer when the progress object should be -// disposed. -func makeOutOfBandMigrationProgressUpdater(out *output.Output, ids []int, animateProgress bool) ( - update func(i int, m oobmigration.Migration), - cleanup func(), -) { - if !animateProgress || shouldDisableProgressAnimation() { - update = func(i int, m oobmigration.Migration) { - out.WriteLine(output.Linef("", output.StyleReset, "Migration #%d is %.2f%% complete", m.ID, m.Progress*100)) - } - return update, func() {} - } - - bars := make([]output.ProgressBar, 0, len(ids)) - for _, id := range ids { - bars = append(bars, output.ProgressBar{ - Label: fmt.Sprintf("Migration #%d", id), - Max: 1.0, - }) - } - - progress := out.Progress(bars, nil) - return func(i int, m oobmigration.Migration) { progress.SetValue(i, m.Progress) }, progress.Destroy -} - -// shouldDisableProgressAnimation determines if progress bars should be avoided because the log level -// will create output that interferes with a stable canvas. In effect, this adds the -disable-animation -// flag when SRC_LOG_LEVEL is info or debug. -func shouldDisableProgressAnimation() bool { - switch log.Level(os.Getenv(log.EnvLogLevel)) { - case log.LevelDebug: - return true - case log.LevelInfo: - return true - - default: - return false - } -} - -func getMigrations(ctx context.Context, store *oobmigration.Store, ids []int) ([]oobmigration.Migration, error) { - migrations := make([]oobmigration.Migration, 0, len(ids)) - for _, id := range ids { - migration, ok, err := store.GetByID(ctx, id) - if err != nil { - return nil, err - } - if !ok { - return nil, errors.Newf("unknown migration id %d", id) - } - - migrations = append(migrations, migration) - } - sort.Slice(migrations, func(i, j int) bool { return migrations[i].ID < migrations[j].ID }) - - return migrations, nil -} - -type basestoreExtractor struct { - runner Runner -} - -func (r basestoreExtractor) Store(ctx context.Context, schemaName string) (*basestore.Store, error) { - shareableStore, err := extractDB(ctx, r.runner, schemaName) - if err != nil { - return nil, err - } - - return basestore.NewWithHandle(basestore.NewHandleWithDB(log.NoOp(), shareableStore, sql.TxOptions{})), nil -} diff --git a/internal/database/migration/cliutil/up.go b/internal/database/migration/cliutil/up.go index 0713f4a2edb..94c41b28448 100644 --- a/internal/database/migration/cliutil/up.go +++ b/internal/database/migration/cliutil/up.go @@ -7,6 +7,7 @@ import ( "github.com/urfave/cli/v2" "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" + "github.com/sourcegraph/sourcegraph/internal/database/migration/store" "github.com/sourcegraph/sourcegraph/internal/oobmigration" "github.com/sourcegraph/sourcegraph/internal/version" "github.com/sourcegraph/sourcegraph/internal/version/upgradestore" @@ -105,7 +106,7 @@ func Up(commandName string, factory RunnerFactory, outFactory OutputFactory, dev return err } - db, err := extractDatabase(ctx, r) + db, err := store.ExtractDatabase(ctx, r) if err != nil { return err } diff --git a/internal/database/migration/cliutil/upgrade.go b/internal/database/migration/cliutil/upgrade.go index 86528f80ceb..14a153a507c 100644 --- a/internal/database/migration/cliutil/upgrade.go +++ b/internal/database/migration/cliutil/upgrade.go @@ -6,7 +6,10 @@ import ( "github.com/urfave/cli/v2" + "github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion" + "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" + "github.com/sourcegraph/sourcegraph/internal/database/migration/store" "github.com/sourcegraph/sourcegraph/internal/oobmigration" "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations" "github.com/sourcegraph/sourcegraph/internal/version" @@ -17,10 +20,10 @@ import ( func Upgrade( commandName string, - runnerFactory RunnerFactoryWithSchemas, + runnerFactory runner.RunnerFactoryWithSchemas, outFactory OutputFactory, registerMigrators func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc, - expectedSchemaFactories ...ExpectedSchemaFactory, + expectedSchemaFactories ...schemas.ExpectedSchemaFactory, ) *cli.Command { fromFlag := &cli.StringFlag{ Name: "from", @@ -99,12 +102,11 @@ func Upgrade( } // connect to db and get upgrade readiness state - db, err := extractDatabase(ctx, runner) + db, err := store.ExtractDatabase(ctx, runner) if err != nil { return errors.Wrap(err, "new db handle") } - store := upgradestore.New(db) - currentVersion, autoUpgrade, err := store.GetAutoUpgrade(ctx) + currentVersion, autoUpgrade, err := upgradestore.New(db).GetAutoUpgrade(ctx) if err != nil { return errors.Wrap(err, "checking auto upgrade") } @@ -151,7 +153,7 @@ func Upgrade( // Find the relevant schema and data migrations to perform (and in what order) // for the given version range. - plan, err := planMigration(from, to, versionRange, interrupts) + plan, err := multiversion.PlanMigration(from, to, versionRange, interrupts) if err != nil { return err } @@ -162,8 +164,9 @@ func Upgrade( } // Perform the upgrade on the configured databases. - return runMigration( + return multiversion.RunMigration( ctx, + db, runnerFactory, plan, privilegedMode, diff --git a/internal/database/migration/cliutil/util.go b/internal/database/migration/cliutil/util.go index ac1ea3d592d..9c7082d93d5 100644 --- a/internal/database/migration/cliutil/util.go +++ b/internal/database/migration/cliutil/util.go @@ -2,9 +2,7 @@ package cliutil import ( "context" - "database/sql" "flag" - "fmt" "net/http" "strconv" "strings" @@ -12,10 +10,7 @@ import ( "github.com/urfave/cli/v2" - "github.com/sourcegraph/log" - "github.com/sourcegraph/sourcegraph/internal/database" - "github.com/sourcegraph/sourcegraph/internal/database/basestore" "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" "github.com/sourcegraph/sourcegraph/internal/observation" @@ -46,7 +41,7 @@ func flagHelp(out *output.Output, message string, args ...any) error { } // setupRunner initializes and returns the runner associated witht the given schema. -func setupRunner(factory RunnerFactory, schemaNames ...string) (Runner, error) { +func setupRunner(factory RunnerFactory, schemaNames ...string) (*runner.Runner, error) { r, err := factory(schemaNames) if err != nil { return nil, err @@ -56,7 +51,7 @@ func setupRunner(factory RunnerFactory, schemaNames ...string) (Runner, error) { } // setupStore initializes and returns the store associated witht the given schema. -func setupStore(ctx context.Context, factory RunnerFactory, schemaName string) (Store, error) { +func setupStore(ctx context.Context, factory RunnerFactory, schemaName string) (runner.Store, error) { r, err := setupRunner(factory, schemaName) if err != nil { return nil, err @@ -144,33 +139,6 @@ func getPivilegedModeFromFlags(cmd *cli.Context, out *output.Output, unprivilege return runner.ApplyPrivilegedMigrations, nil } -func extractDatabase(ctx context.Context, r Runner) (database.DB, error) { - db, err := extractDB(ctx, r, "frontend") - if err != nil { - return nil, err - } - - return database.NewDB(log.Scoped("migrator", ""), db), nil -} - -func extractDB(ctx context.Context, r Runner, schemaName string) (*sql.DB, error) { - store, err := r.Store(ctx, schemaName) - if err != nil { - return nil, err - } - - // NOTE: The migration runner package cannot import basestore without - // creating a cyclic import in db connection packages. Hence, we cannot - // embed basestore.ShareableStore here and must "backdoor" extract the - // database connection. - shareableStore, ok := basestore.Raw(store) - if !ok { - return nil, errors.New("store does not support direct database handle access") - } - - return shareableStore, nil -} - var migratorObservationCtx = &observation.TestContext func outOfBandMigrationRunner(db database.DB) *oobmigration.Runner { @@ -185,12 +153,12 @@ func outOfBandMigrationRunnerWithStore(store *oobmigration.Store) *oobmigration. // or GCS, to report whether the migrator may be operating in an airgapped environment. func isAirgapped(ctx context.Context) (err error) { // known good version and filename in both GCS and Github - filename, _ := getSchemaJSONFilename("frontend") + filename, _ := schemas.GetSchemaJSONFilename("frontend") const version = "v3.41.1" timedCtx, cancel := context.WithTimeout(ctx, time.Second*3) defer cancel() - url := githubExpectedSchemaPath(filename, version) + url := schemas.GithubExpectedSchemaPath(filename, version) req, _ := http.NewRequestWithContext(timedCtx, http.MethodHead, url, nil) resp, gherr := http.DefaultClient.Do(req) if resp != nil { @@ -200,7 +168,7 @@ func isAirgapped(ctx context.Context) (err error) { timedCtx, cancel = context.WithTimeout(ctx, time.Second*3) defer cancel() - url = gcsExpectedSchemaPath(filename, version) + url = schemas.GcsExpectedSchemaPath(filename, version) req, _ = http.NewRequestWithContext(timedCtx, http.MethodHead, url, nil) resp, gcserr := http.DefaultClient.Do(req) if resp != nil { @@ -220,20 +188,6 @@ func isAirgapped(ctx context.Context) (err error) { return err } -// getSchemaJSONFilename returns the basename of the JSON-serialized schema in the sg/sg repository. -func getSchemaJSONFilename(schemaName string) (string, error) { - switch schemaName { - case "frontend": - return "internal/database/schema.json", nil - case "codeintel": - fallthrough - case "codeinsights": - return fmt.Sprintf("internal/database/schema.%s.json", schemaName), nil - } - - return "", errors.Newf("unknown schema name %q", schemaName) -} - func checkForMigratorUpdate(ctx context.Context) (latest string, hasUpdate bool, err error) { migratorVersion, migratorPatch, ok := oobmigration.NewVersionAndPatchFromString(version.Version()) if !ok || migratorVersion.Dev { diff --git a/internal/database/migration/cliutil/validate.go b/internal/database/migration/cliutil/validate.go index 3647d7ca9d7..b1900e596ec 100644 --- a/internal/database/migration/cliutil/validate.go +++ b/internal/database/migration/cliutil/validate.go @@ -5,6 +5,7 @@ import ( "github.com/urfave/cli/v2" + "github.com/sourcegraph/sourcegraph/internal/database/migration/store" "github.com/sourcegraph/sourcegraph/internal/oobmigration" "github.com/sourcegraph/sourcegraph/lib/output" ) @@ -39,7 +40,7 @@ func Validate(commandName string, factory RunnerFactory, outFactory OutputFactor out.WriteLine(output.Emoji(output.EmojiSuccess, "schema okay!")) if !skipOutOfBandMigrationsFlag.Get(cmd) { - db, err := extractDatabase(ctx, r) + db, err := store.ExtractDatabase(ctx, r) if err != nil { return err } diff --git a/internal/database/migration/drift/BUILD.bazel b/internal/database/migration/drift/BUILD.bazel index ff5a561bd20..efdce760d5b 100644 --- a/internal/database/migration/drift/BUILD.bazel +++ b/internal/database/migration/drift/BUILD.bazel @@ -15,12 +15,15 @@ go_library( "compare_triggers.go", "compare_views.go", "summary.go", + "util.go", "util_search.go", ], importpath = "github.com/sourcegraph/sourcegraph/internal/database/migration/drift", visibility = ["//:__subpackages__"], deps = [ "//internal/database/migration/schemas", + "//lib/errors", + "//lib/output", "@com_github_google_go_cmp//cmp", ], ) diff --git a/internal/database/migration/cliutil/drift_util.go b/internal/database/migration/drift/util.go similarity index 88% rename from internal/database/migration/cliutil/drift_util.go rename to internal/database/migration/drift/util.go index 3ea78f94aac..086bb3a5f3c 100644 --- a/internal/database/migration/cliutil/drift_util.go +++ b/internal/database/migration/drift/util.go @@ -1,4 +1,4 @@ -package cliutil +package drift import ( "fmt" @@ -6,15 +6,15 @@ import ( "github.com/google/go-cmp/cmp" - "github.com/sourcegraph/sourcegraph/internal/database/migration/drift" "github.com/sourcegraph/sourcegraph/lib/errors" "github.com/sourcegraph/sourcegraph/lib/output" ) var errOutOfSync = errors.Newf("database schema is out of sync") -func displayDriftSummaries(rawOut *output.Output, summaries []drift.Summary) (err error) { +func DisplaySchemaSummaries(rawOut *output.Output, summaries []Summary) (err error) { out := &preambledOutput{rawOut, false} + for _, summary := range summaries { displaySummary(out, summary) err = errOutOfSync @@ -26,7 +26,7 @@ func displayDriftSummaries(rawOut *output.Output, summaries []drift.Summary) (er return err } -func displaySummary(out *preambledOutput, summary drift.Summary) { +func displaySummary(out *preambledOutput, summary Summary) { out.WriteLine(output.Line(output.EmojiFailure, output.StyleBold, summary.Problem())) if a, b, ok := summary.Diff(); ok { diff --git a/internal/database/migration/multiversion/BUILD.bazel b/internal/database/migration/multiversion/BUILD.bazel new file mode 100644 index 00000000000..6a732b6ecaf --- /dev/null +++ b/internal/database/migration/multiversion/BUILD.bazel @@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "multiversion", + srcs = [ + "drift.go", + "plan.go", + "run.go", + "util.go", + ], + importpath = "github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion", + visibility = ["//:__subpackages__"], + deps = [ + "//internal/database", + "//internal/database/migration/definition", + "//internal/database/migration/drift", + "//internal/database/migration/runner", + "//internal/database/migration/schemas", + "//internal/database/migration/shared", + "//internal/database/migration/store", + "//internal/observation", + "//internal/oobmigration", + "//internal/oobmigration/migrations", + "//internal/version/upgradestore", + "//lib/errors", + "//lib/output", + ], +) diff --git a/internal/database/migration/multiversion/drift.go b/internal/database/migration/multiversion/drift.go new file mode 100644 index 00000000000..7101400e840 --- /dev/null +++ b/internal/database/migration/multiversion/drift.go @@ -0,0 +1,221 @@ +package multiversion + +import ( + "bytes" + "context" + "fmt" + "net/url" + "sort" + "strings" + + "github.com/sourcegraph/sourcegraph/internal/database/migration/drift" + "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" + "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" + "github.com/sourcegraph/sourcegraph/lib/errors" + "github.com/sourcegraph/sourcegraph/lib/output" +) + +// CheckDrift uses given runner to check whether schema drift exists for any +// non-empty database. It returns ErrDatabaseDriftDetected when the schema drift +// exists, and nil error when not. +// +// - The `verbose` indicates whether to collect drift details in the output. +// - The `schemaNames` is the list of schema names to check for drift. +// - The `expectedSchemaFactories` is the means to retrieve the schema. +// definitions at the target version. +func CheckDrift(ctx context.Context, r *runner.Runner, version string, out *output.Output, verbose bool, schemaNames []string, expectedSchemaFactories []schemas.ExpectedSchemaFactory) error { + type schemaWithDrift struct { + name string + drift *bytes.Buffer + } + schemasWithDrift := make([]*schemaWithDrift, 0, len(schemaNames)) + for _, schemaName := range schemaNames { + store, err := r.Store(ctx, schemaName) + if err != nil { + return errors.Wrap(err, "get migration store") + } + schemaDescriptions, err := store.Describe(ctx) + if err != nil { + return err + } + schema := schemaDescriptions["public"] + + var driftBuff bytes.Buffer + driftOut := output.NewOutput(&driftBuff, output.OutputOpts{}) + + expectedSchema, err := FetchExpectedSchema(ctx, schemaName, version, driftOut, expectedSchemaFactories) + if err != nil { + return err + } + if err := drift.DisplaySchemaSummaries(driftOut, drift.CompareSchemaDescriptions(schemaName, version, Canonicalize(schema), Canonicalize(expectedSchema))); err != nil { + schemasWithDrift = append(schemasWithDrift, + &schemaWithDrift{ + name: schemaName, + drift: &driftBuff, + }, + ) + } + } + + drift := false + for _, schemaWithDrift := range schemasWithDrift { + empty, err := isEmptySchema(ctx, r, schemaWithDrift.name) + if err != nil { + return err + } + if empty { + continue + } + + drift = true + out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Schema drift detected for %s", schemaWithDrift.name)) + if verbose { + out.Write(schemaWithDrift.drift.String()) + } + } + if !drift { + return nil + } + + out.WriteLine(output.Linef( + output.EmojiLightbulb, + output.StyleItalic, + ""+ + "Before continuing with this operation, run the migrator's drift command and follow instructions to repair the schema to the expected current state."+ + " "+ + "See https://docs.sourcegraph.com/admin/how-to/manual_database_migrations#drift for additional instructions."+ + "\n", + )) + + return ErrDatabaseDriftDetected +} + +var ErrDatabaseDriftDetected = errors.New("database drift detected") + +func isEmptySchema(ctx context.Context, r *runner.Runner, schemaName string) (bool, error) { + store, err := r.Store(ctx, schemaName) + if err != nil { + return false, err + } + + appliedVersions, _, _, err := store.Versions(ctx) + if err != nil { + return false, err + } + + return len(appliedVersions) == 0, nil +} + +func FetchExpectedSchema( + ctx context.Context, + schemaName string, + version string, + out *output.Output, + expectedSchemaFactories []schemas.ExpectedSchemaFactory, +) (schemas.SchemaDescription, error) { + filename, err := schemas.GetSchemaJSONFilename(schemaName) + if err != nil { + return schemas.SchemaDescription{}, err + } + + out.WriteLine(output.Line(output.EmojiInfo, output.StyleReset, "Locating schema description")) + + for i, factory := range expectedSchemaFactories { + matches := false + patterns := factory.VersionPatterns() + for _, pattern := range patterns { + if pattern.MatchString(version) { + matches = true + break + } + } + if len(patterns) > 0 && !matches { + continue + } + + resourcePath := factory.ResourcePath(filename, version) + expectedSchema, err := factory.CreateFromPath(ctx, resourcePath) + if err != nil { + suffix := "" + if i < len(expectedSchemaFactories)-1 { + suffix = " Will attempt a fallback source." + } + + out.WriteLine(output.Linef(output.EmojiInfo, output.StyleReset, "Reading schema definition in %s (%s)... Schema not found (%s).%s", factory.Name(), resourcePath, err, suffix)) + continue + } + + out.WriteLine(output.Linef(output.EmojiSuccess, output.StyleReset, "Schema found in %s (%s).", factory.Name(), resourcePath)) + return expectedSchema, nil + } + + exampleMap := map[string]struct{}{} + failedPaths := map[string]struct{}{} + for _, factory := range expectedSchemaFactories { + for _, pattern := range factory.VersionPatterns() { + if !pattern.MatchString(version) { + exampleMap[pattern.Example()] = struct{}{} + } else { + failedPaths[factory.ResourcePath(filename, version)] = struct{}{} + } + } + } + + versionExamples := make([]string, 0, len(exampleMap)) + for pattern := range exampleMap { + versionExamples = append(versionExamples, pattern) + } + sort.Strings(versionExamples) + + paths := make([]string, 0, len(exampleMap)) + for path := range failedPaths { + if u, err := url.Parse(path); err == nil && (u.Scheme == "http" || u.Scheme == "https") { + paths = append(paths, path) + } + } + sort.Strings(paths) + + if len(paths) > 0 { + var additionalHints string + if len(versionExamples) > 0 { + additionalHints = fmt.Sprintf( + "Alternative, provide a different version that matches one of the following patterns: \n - %s\n", strings.Join(versionExamples, "\n - "), + ) + } + + out.WriteLine(output.Linef( + output.EmojiLightbulb, + output.StyleFailure, + "Schema not found. "+ + "Check if the following resources exist. "+ + "If they do, then the context in which this migrator is being run may not be permitted to reach the public internet."+ + "\n - %s\n%s", + strings.Join(paths, "\n - "), + additionalHints, + )) + } else if len(versionExamples) > 0 { + out.WriteLine(output.Linef( + output.EmojiLightbulb, + output.StyleFailure, + "Schema not found. Ensure your supplied version matches one of the following patterns: \n - %s\n", strings.Join(versionExamples, "\n - "), + )) + } + + return schemas.SchemaDescription{}, errors.New("failed to locate target schema description") +} + +func Canonicalize(schemaDescription schemas.SchemaDescription) schemas.SchemaDescription { + schemas.Canonicalize(schemaDescription) + + filtered := schemaDescription.Tables[:0] + for i, table := range schemaDescription.Tables { + if table.Name == "migration_logs" { + continue + } + + filtered = append(filtered, schemaDescription.Tables[i]) + } + schemaDescription.Tables = filtered + + return schemaDescription +} diff --git a/internal/database/migration/multiversion/plan.go b/internal/database/migration/multiversion/plan.go new file mode 100644 index 00000000000..78dececc86d --- /dev/null +++ b/internal/database/migration/multiversion/plan.go @@ -0,0 +1,114 @@ +package multiversion + +import ( + "github.com/sourcegraph/sourcegraph/internal/database/migration/definition" + "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" + "github.com/sourcegraph/sourcegraph/internal/database/migration/shared" + "github.com/sourcegraph/sourcegraph/internal/oobmigration" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +type MigrationPlan struct { + // the source and target instance versions + from, to oobmigration.Version + + // the stitched schema migration definitions over the entire version range by schema name + stitchedDefinitionsBySchemaName map[string]*definition.Definitions + + // the sequence of migration steps over the stiched schema migration definitions; we can't + // simply apply all schema migrations as out-of-band migration can only run within a certain + // slice of the schema's definition where that out-of-band migration was defined + steps []MigrationStep +} + +type MigrationStep struct { + // the target version to migrate to + instanceVersion oobmigration.Version + + // the leaf migrations of this version by schema name + schemaMigrationLeafIDsBySchemaName map[string][]int + + // the set of out-of-band migrations that must complete before schema migrations begin + // for the following minor instance version + outOfBandMigrationIDs []int +} + +func PlanMigration(from, to oobmigration.Version, versionRange []oobmigration.Version, interrupts []oobmigration.MigrationInterrupt) (MigrationPlan, error) { + versionTags := make([]string, 0, len(versionRange)) + for _, version := range versionRange { + versionTags = append(versionTags, version.GitTag()) + } + + // Retrieve relevant stitched migrations for this version range + stitchedMigrationBySchemaName, err := filterStitchedMigrationsForTags(versionTags) + if err != nil { + return MigrationPlan{}, err + } + + // Extract/rotate stitched migration definitions so we can query them by schem name + stitchedDefinitionsBySchemaName := make(map[string]*definition.Definitions, len(stitchedMigrationBySchemaName)) + for schemaName, stitchedMigration := range stitchedMigrationBySchemaName { + stitchedDefinitionsBySchemaName[schemaName] = stitchedMigration.Definitions + } + + // Extract/rotate leaf identifiers so we can query them by version/git-tag first + leafIDsBySchemaNameByTag := make(map[string]map[string][]int, len(versionRange)) + for schemaName, stitchedMigration := range stitchedMigrationBySchemaName { + for tag, bounds := range stitchedMigration.BoundsByRev { + if _, ok := leafIDsBySchemaNameByTag[tag]; !ok { + leafIDsBySchemaNameByTag[tag] = map[string][]int{} + } + + leafIDsBySchemaNameByTag[tag][schemaName] = bounds.LeafIDs + } + } + + // + // Interleave out-of-band migration interrupts and schema migrations + + steps := make([]MigrationStep, 0, len(interrupts)+1) + for _, interrupt := range interrupts { + steps = append(steps, MigrationStep{ + instanceVersion: interrupt.Version, + schemaMigrationLeafIDsBySchemaName: leafIDsBySchemaNameByTag[interrupt.Version.GitTag()], + outOfBandMigrationIDs: interrupt.MigrationIDs, + }) + } + steps = append(steps, MigrationStep{ + instanceVersion: to, + schemaMigrationLeafIDsBySchemaName: leafIDsBySchemaNameByTag[to.GitTag()], + outOfBandMigrationIDs: nil, // all required out of band migrations have already completed + }) + + return MigrationPlan{ + from: from, + to: to, + stitchedDefinitionsBySchemaName: stitchedDefinitionsBySchemaName, + steps: steps, + }, nil +} + +// filterStitchedMigrationsForTags returns a copy of the pre-compiled stitchedMap with references +// to tags outside of the given set removed. This allows a migrator instance that knows the migration +// path from X -> Y to also know the path from any partial migration X <= W -> Z <= Y. +func filterStitchedMigrationsForTags(tags []string) (map[string]shared.StitchedMigration, error) { + filteredStitchedMigrationBySchemaName := make(map[string]shared.StitchedMigration, len(schemas.SchemaNames)) + for _, schemaName := range schemas.SchemaNames { + boundsByRev := make(map[string]shared.MigrationBounds, len(tags)) + for _, tag := range tags { + bounds, ok := shared.StitchedMigationsBySchemaName[schemaName].BoundsByRev[tag] + if !ok { + return nil, errors.Newf("unknown tag %q", tag) + } + + boundsByRev[tag] = bounds + } + + filteredStitchedMigrationBySchemaName[schemaName] = shared.StitchedMigration{ + Definitions: shared.StitchedMigationsBySchemaName[schemaName].Definitions, + BoundsByRev: boundsByRev, + } + } + + return filteredStitchedMigrationBySchemaName, nil +} diff --git a/internal/database/migration/multiversion/run.go b/internal/database/migration/multiversion/run.go new file mode 100644 index 00000000000..abb3b9f0ce6 --- /dev/null +++ b/internal/database/migration/multiversion/run.go @@ -0,0 +1,257 @@ +package multiversion + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/sourcegraph/sourcegraph/lib/errors" + "github.com/sourcegraph/sourcegraph/lib/output" + + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/migration/definition" + "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" + "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" + "github.com/sourcegraph/sourcegraph/internal/database/migration/store" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/oobmigration" + "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations" + "github.com/sourcegraph/sourcegraph/internal/version/upgradestore" +) + +type Store interface { + WithMigrationLog(ctx context.Context, definition definition.Definition, up bool, f func() error) error + Describe(ctx context.Context) (map[string]schemas.SchemaDescription, error) + Versions(ctx context.Context) (appliedVersions, pendingVersions, failedVersions []int, _ error) +} + +func RunMigration( + ctx context.Context, + db database.DB, + runnerFactory runner.RunnerFactoryWithSchemas, + plan MigrationPlan, + privilegedMode runner.PrivilegedMode, + privilegedHashes []string, + skipVersionCheck bool, + skipDriftCheck bool, + dryRun bool, + up bool, + animateProgress bool, + registerMigratorsWithStore func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc, + expectedSchemaFactories []schemas.ExpectedSchemaFactory, + out *output.Output, +) error { + var runnerSchemas []*schemas.Schema + for _, schemaName := range schemas.SchemaNames { + runnerSchemas = append(runnerSchemas, &schemas.Schema{ + Name: schemaName, + MigrationsTableName: schemas.MigrationsTableName(schemaName), + Definitions: plan.stitchedDefinitionsBySchemaName[schemaName], + }) + } + + r, err := runnerFactory(schemas.SchemaNames, runnerSchemas) + if err != nil { + return err + } + + registerMigrators := registerMigratorsWithStore(store.BasestoreExtractor{Runner: r}) + + // Note: Error is correctly checked here; we want to use the return value + // `patch` below but only if we can best-effort fetch it. We want to allow + // the user to skip erroring here if they are explicitly skipping this + // version check. + version, patch, ok, err := GetServiceVersion(ctx, db) + if !skipVersionCheck { + if err != nil { + return err + } + if !ok { + return errors.Newf("version assertion failed: unknown version != %q. Re-invoke with --skip-version-check to ignore this check", plan.from) + } + if oobmigration.CompareVersions(version, plan.from) != oobmigration.VersionOrderEqual { + return errors.Newf("version assertion failed: %q != %q. Re-invoke with --skip-version-check to ignore this check", version, plan.from) + } + } + + if !skipDriftCheck { + if err := CheckDrift(ctx, r, plan.from.GitTagWithPatch(patch), out, false, schemas.SchemaNames, expectedSchemaFactories); err != nil { + return err + } + } + + for i, step := range plan.steps { + out.WriteLine(output.Linef( + output.EmojiFingerPointRight, + output.StyleReset, + "Migrating to v%s (step %d of %d)", + step.instanceVersion, + i+1, + len(plan.steps), + )) + + out.WriteLine(output.Line(output.EmojiFingerPointRight, output.StyleReset, "Running schema migrations")) + + if !dryRun { + operationType := runner.MigrationOperationTypeTargetedUp + if !up { + operationType = runner.MigrationOperationTypeTargetedDown + } + + operations := make([]runner.MigrationOperation, 0, len(step.schemaMigrationLeafIDsBySchemaName)) + for schemaName, leafMigrationIDs := range step.schemaMigrationLeafIDsBySchemaName { + operations = append(operations, runner.MigrationOperation{ + SchemaName: schemaName, + Type: operationType, + TargetVersions: leafMigrationIDs, + }) + } + + if err := r.Run(ctx, runner.Options{ + Operations: operations, + PrivilegedMode: privilegedMode, + MatchPrivilegedHash: func(hash string) bool { + for _, candidate := range privilegedHashes { + if hash == candidate { + return true + } + } + + return false + }, + IgnoreSingleDirtyLog: true, + IgnoreSinglePendingLog: true, + }); err != nil { + return err + } + + out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Schema migrations complete")) + } + + if len(step.outOfBandMigrationIDs) > 0 { + if err := RunOutOfBandMigrations( + ctx, + db, + dryRun, + up, + animateProgress, + registerMigrators, + out, + step.outOfBandMigrationIDs, + ); err != nil { + return err + } + } + } + + if !dryRun { + // After successful migration, set the new instance version. The frontend still checks on + // startup that the previously running instance version was only one minor version away. + // If we run the upload without updating that value, the new instance will refuse to + // start without manual modification of the database. + // + // Note that we don't want to get rid of that check entirely from the frontend, as we do + // still want to catch the cases where site-admins "jump forward" several versions while + // using the standard upgrade path (not a multi-version upgrade that handles these cases). + + if err := upgradestore.New(db).SetServiceVersion(ctx, fmt.Sprintf("%d.%d.0", plan.to.Major, plan.to.Minor)); err != nil { + return err + } + } + + return nil +} + +func RunOutOfBandMigrations( + ctx context.Context, + db database.DB, + dryRun bool, + up bool, + animateProgress bool, + registerMigrations oobmigration.RegisterMigratorsFunc, + out *output.Output, + ids []int, +) (err error) { + if len(ids) != 0 { + out.WriteLine(output.Linef(output.EmojiFingerPointRight, output.StyleReset, "Running out of band migrations %v", ids)) + if dryRun { + return nil + } + } + + store := oobmigration.NewStoreWithDB(db) + runner := oobmigration.NewRunnerWithDB(&observation.TestContext, db, time.Second) + if err := runner.SynchronizeMetadata(ctx); err != nil { + return err + } + if err := registerMigrations(ctx, db, runner); err != nil { + return err + } + + if len(ids) == 0 { + migrations, err := store.List(ctx) + if err != nil { + return err + } + + for _, migration := range migrations { + ids = append(ids, migration.ID) + } + } + sort.Ints(ids) + + if dryRun { + return nil + } + + if err := runner.UpdateDirection(ctx, ids, !up); err != nil { + return err + } + + go runner.StartPartial(ids) + defer runner.Stop() + defer func() { + if err == nil { + out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Out of band migrations complete")) + } else { + out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Out of band migrations failed: %s", err)) + } + }() + + updateMigrationProgress, cleanup := oobmigration.MakeProgressUpdater(out, ids, animateProgress) + defer cleanup() + + ticker := time.NewTicker(time.Second).C + for { + migrations, err := store.GetByIDs(ctx, ids) + if err != nil { + return err + } + sort.Slice(migrations, func(i, j int) bool { return migrations[i].ID < migrations[j].ID }) + + for i, m := range migrations { + updateMigrationProgress(i, m) + } + + complete := true + for _, m := range migrations { + if !m.Complete() { + if m.ApplyReverse && m.NonDestructive { + continue + } + + complete = false + } + } + if complete { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker: + } + } +} diff --git a/internal/database/migration/multiversion/util.go b/internal/database/migration/multiversion/util.go new file mode 100644 index 00000000000..4af7a952581 --- /dev/null +++ b/internal/database/migration/multiversion/util.go @@ -0,0 +1,24 @@ +package multiversion + +import ( + "context" + + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/oobmigration" + "github.com/sourcegraph/sourcegraph/internal/version/upgradestore" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +func GetServiceVersion(ctx context.Context, db database.DB) (oobmigration.Version, int, bool, error) { + versionStr, ok, err := upgradestore.New(db).GetServiceVersion(ctx) + if err != nil || !ok { + return oobmigration.Version{}, 0, ok, err + } + + version, patch, ok := oobmigration.NewVersionAndPatchFromString(versionStr) + if !ok { + return oobmigration.Version{}, 0, ok, errors.Newf("cannot parse version: %q - expected [v]X.Y[.Z]", versionStr) + } + + return version, patch, true, nil +} diff --git a/internal/database/migration/runner.go b/internal/database/migration/runner.go new file mode 100644 index 00000000000..9d203a856eb --- /dev/null +++ b/internal/database/migration/runner.go @@ -0,0 +1,44 @@ +package migration + +import ( + "database/sql" + "strings" + + connections "github.com/sourcegraph/sourcegraph/internal/database/connections/live" + "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" + "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" + "github.com/sourcegraph/sourcegraph/internal/database/migration/store" + "github.com/sourcegraph/sourcegraph/internal/database/postgresdsn" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/lib/output" +) + +func NewRunnerWithSchemas( + observationCtx *observation.Context, + out *output.Output, + appName string, + schemaNames []string, + schemas []*schemas.Schema, +) (*runner.Runner, error) { + dsns, err := postgresdsn.DSNsBySchema(schemaNames) + if err != nil { + return nil, err + } + + var dsnsStrings []string + for schema, dsn := range dsns { + dsnsStrings = append(dsnsStrings, schema+" => "+dsn) + } + + out.WriteLine(output.Linef(output.EmojiInfo, output.StyleGrey, "Connection DSNs used: %s", strings.Join(dsnsStrings, ", "))) + + storeFactory := func(db *sql.DB, migrationsTable string) connections.Store { + return connections.NewStoreShim(store.NewWithDB(observationCtx, db, migrationsTable)) + } + r, err := connections.RunnerFromDSNsWithSchemas(out, observationCtx.Logger, dsns, appName, storeFactory, schemas) + if err != nil { + return nil, err + } + + return r, nil +} diff --git a/internal/database/migration/runner/runner.go b/internal/database/migration/runner/runner.go index 63752e42098..fe9f8f35d4a 100644 --- a/internal/database/migration/runner/runner.go +++ b/internal/database/migration/runner/runner.go @@ -14,6 +14,8 @@ import ( "github.com/sourcegraph/sourcegraph/lib/errors" ) +type RunnerFactoryWithSchemas func(schemaNames []string, schemas []*schemas.Schema) (*Runner, error) + type Runner struct { logger log.Logger storeFactoryCaches map[string]*storeFactoryCache diff --git a/internal/database/migration/schemas/BUILD.bazel b/internal/database/migration/schemas/BUILD.bazel index 4c4390e0d7d..18f67884fad 100644 --- a/internal/database/migration/schemas/BUILD.bazel +++ b/internal/database/migration/schemas/BUILD.bazel @@ -5,6 +5,7 @@ go_library( name = "schemas", srcs = [ "description.go", + "drift_schema.go", "formatter.go", "formatter_json.go", "formatter_psql.go", diff --git a/internal/database/migration/cliutil/drift_schema.go b/internal/database/migration/schemas/drift_schema.go similarity index 77% rename from internal/database/migration/cliutil/drift_schema.go rename to internal/database/migration/schemas/drift_schema.go index 850b92d7d76..5fbc966a3e5 100644 --- a/internal/database/migration/cliutil/drift_schema.go +++ b/internal/database/migration/schemas/drift_schema.go @@ -1,4 +1,4 @@ -package cliutil +package schemas import ( "context" @@ -9,7 +9,6 @@ import ( "path/filepath" "strings" - descriptions "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" "github.com/sourcegraph/sourcegraph/internal/lazyregexp" "github.com/sourcegraph/sourcegraph/lib/errors" ) @@ -22,7 +21,7 @@ type ExpectedSchemaFactory interface { Name() string VersionPatterns() []NamedRegexp ResourcePath(filename, version string) string - CreateFromPath(ctx context.Context, path string) (descriptions.SchemaDescription, error) + CreateFromPath(ctx context.Context, path string) (SchemaDescription, error) } type NamedRegexp struct { @@ -43,9 +42,9 @@ var ( ) // GitHubExpectedSchemaFactory reads schema definitions from the sourcegraph/sourcegraph repository via GitHub's API. -var GitHubExpectedSchemaFactory = NewExpectedSchemaFactory("GitHub", allPatterns, githubExpectedSchemaPath, fetchSchema) +var GitHubExpectedSchemaFactory = NewExpectedSchemaFactory("GitHub", allPatterns, GithubExpectedSchemaPath, fetchSchema) -func githubExpectedSchemaPath(filename, version string) string { +func GithubExpectedSchemaPath(filename, version string) string { return fmt.Sprintf("https://raw.githubusercontent.com/sourcegraph/sourcegraph/%s/%s", version, filename) } @@ -54,18 +53,18 @@ func githubExpectedSchemaPath(filename, version string) string { // been backfilled to this bucket by hand. // // See the ./drift-schemas directory for more details on how this data was generated. -var GCSExpectedSchemaFactory = NewExpectedSchemaFactory("GCS", []NamedRegexp{tagPattern}, gcsExpectedSchemaPath, fetchSchema) +var GCSExpectedSchemaFactory = NewExpectedSchemaFactory("GCS", []NamedRegexp{tagPattern}, GcsExpectedSchemaPath, fetchSchema) -func gcsExpectedSchemaPath(filename, version string) string { +func GcsExpectedSchemaPath(filename, version string) string { return fmt.Sprintf("https://storage.googleapis.com/sourcegraph-assets/migrations/drift/%s-%s", version, strings.ReplaceAll(filename, "/", "_")) } // LocalExpectedSchemaFactory reads schema definitions from a local directory baked into the migrator image. -var LocalExpectedSchemaFactory = NewExpectedSchemaFactory("Local file", []NamedRegexp{tagPattern}, localSchemaPath, ReadSchemaFromFile) +var LocalExpectedSchemaFactory = NewExpectedSchemaFactory("Local file", []NamedRegexp{tagPattern}, LocalSchemaPath, ReadSchemaFromFile) const migratorImageDescriptionPrefix = "/schema-descriptions" -func localSchemaPath(filename, version string) string { +func LocalSchemaPath(filename, version string) string { return filepath.Join(migratorImageDescriptionPrefix, fmt.Sprintf("%s-%s", version, strings.ReplaceAll(filename, "/", "_"))) } @@ -76,32 +75,32 @@ func NewExplicitFileSchemaFactory(filename string) ExpectedSchemaFactory { } // fetchSchema makes an HTTP GET request to the given URL and reads the schema description from the response. -func fetchSchema(ctx context.Context, url string) (descriptions.SchemaDescription, error) { +func fetchSchema(ctx context.Context, url string) (SchemaDescription, error) { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { - return descriptions.SchemaDescription{}, err + return SchemaDescription{}, err } resp, err := http.DefaultClient.Do(req) if err != nil { - return descriptions.SchemaDescription{}, err + return SchemaDescription{}, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return descriptions.SchemaDescription{}, errors.Newf("HTTP %d: %s", resp.StatusCode, url) + return SchemaDescription{}, errors.Newf("HTTP %d: %s", resp.StatusCode, url) } - var schemaDescription descriptions.SchemaDescription + var schemaDescription SchemaDescription err = json.NewDecoder(resp.Body).Decode(&schemaDescription) return schemaDescription, err } // ReadSchemaFromFile reads a schema description from the given filename. -func ReadSchemaFromFile(ctx context.Context, filename string) (descriptions.SchemaDescription, error) { +func ReadSchemaFromFile(ctx context.Context, filename string) (SchemaDescription, error) { f, err := os.Open(filename) if err != nil { - return descriptions.SchemaDescription{}, err + return SchemaDescription{}, err } defer func() { if closeErr := f.Close(); closeErr != nil { @@ -109,7 +108,7 @@ func ReadSchemaFromFile(ctx context.Context, filename string) (descriptions.Sche } }() - var schemaDescription descriptions.SchemaDescription + var schemaDescription SchemaDescription err = json.NewDecoder(f).Decode(&schemaDescription) return schemaDescription, err } @@ -121,14 +120,14 @@ type expectedSchemaFactory struct { name string versionPatterns []NamedRegexp resourcePathFunc func(filename, version string) string - createFromPathFunc func(ctx context.Context, path string) (descriptions.SchemaDescription, error) + createFromPathFunc func(ctx context.Context, path string) (SchemaDescription, error) } func NewExpectedSchemaFactory( name string, versionPatterns []NamedRegexp, resourcePathFunc func(filename, version string) string, - createFromPathFunc func(ctx context.Context, path string) (descriptions.SchemaDescription, error), + createFromPathFunc func(ctx context.Context, path string) (SchemaDescription, error), ) ExpectedSchemaFactory { return &expectedSchemaFactory{ name: name, @@ -150,6 +149,6 @@ func (f expectedSchemaFactory) ResourcePath(filename, version string) string { return f.resourcePathFunc(filename, version) } -func (f expectedSchemaFactory) CreateFromPath(ctx context.Context, path string) (descriptions.SchemaDescription, error) { +func (f expectedSchemaFactory) CreateFromPath(ctx context.Context, path string) (SchemaDescription, error) { return f.createFromPathFunc(ctx, path) } diff --git a/internal/database/migration/schemas/schemas.go b/internal/database/migration/schemas/schemas.go index b4bef80514b..149a48f5da3 100644 --- a/internal/database/migration/schemas/schemas.go +++ b/internal/database/migration/schemas/schemas.go @@ -86,3 +86,17 @@ func FilterSchemasByName(schemas []*Schema, targetNames []string) []*Schema { return filtered } + +// getSchemaJSONFilename returns the basename of the JSON-serialized schema in the sg/sg repository. +func GetSchemaJSONFilename(schemaName string) (string, error) { + switch schemaName { + case "frontend": + return "internal/database/schema.json", nil + case "codeintel": + fallthrough + case "codeinsights": + return fmt.Sprintf("internal/database/schema.%s.json", schemaName), nil + } + + return "", errors.Newf("unknown schema name %q", schemaName) +} diff --git a/internal/database/migration/store/BUILD.bazel b/internal/database/migration/store/BUILD.bazel index 4cbe194be3b..fbfb08495cb 100644 --- a/internal/database/migration/store/BUILD.bazel +++ b/internal/database/migration/store/BUILD.bazel @@ -6,24 +6,35 @@ go_library( srcs = [ "describe.go", "describe_scan.go", + "extractor.go", "observability.go", + "registration.go", "store.go", ], importpath = "github.com/sourcegraph/sourcegraph/internal/database/migration/store", visibility = ["//:__subpackages__"], deps = [ + "//internal/conf/conftypes", + "//internal/database", "//internal/database/basestore", "//internal/database/dbutil", "//internal/database/locker", "//internal/database/migration/definition", + "//internal/database/migration/runner", "//internal/database/migration/schemas", "//internal/database/migration/shared", + "//internal/database/postgresdsn", + "//internal/jsonc", "//internal/metrics", "//internal/observation", + "//internal/oobmigration", + "//internal/oobmigration/migrations", "//lib/errors", + "//schema", "@com_github_jackc_pgconn//:pgconn", "@com_github_keegancsmith_sqlf//:sqlf", "@com_github_lib_pq//:pq", + "@com_github_sourcegraph_log//:log", "@io_opentelemetry_go_otel//attribute", ], ) diff --git a/internal/database/migration/store/extractor.go b/internal/database/migration/store/extractor.go new file mode 100644 index 00000000000..a93d703bac2 --- /dev/null +++ b/internal/database/migration/store/extractor.go @@ -0,0 +1,53 @@ +package store + +import ( + "context" + "database/sql" + + "github.com/sourcegraph/log" + + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/migration/runner" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +type BasestoreExtractor struct { + Runner *runner.Runner +} + +func (r BasestoreExtractor) Store(ctx context.Context, schemaName string) (*basestore.Store, error) { + shareableStore, err := ExtractDB(ctx, r.Runner, schemaName) + if err != nil { + return nil, err + } + + return basestore.NewWithHandle(basestore.NewHandleWithDB(log.NoOp(), shareableStore, sql.TxOptions{})), nil +} + +func ExtractDatabase(ctx context.Context, r *runner.Runner) (database.DB, error) { + db, err := ExtractDB(ctx, r, "frontend") + if err != nil { + return nil, err + } + + return database.NewDB(log.Scoped("migrator", ""), db), nil +} + +func ExtractDB(ctx context.Context, r *runner.Runner, schemaName string) (*sql.DB, error) { + store, err := r.Store(ctx, schemaName) + if err != nil { + return nil, err + } + + // NOTE: The migration runner package cannot import basestore without + // creating a cyclic import in db connection packages. Hence, we cannot + // embed basestore.ShareableStore here and must "backdoor" extract the + // database connection. + shareableStore, ok := basestore.Raw(store) + if !ok { + return nil, errors.New("store does not support direct database handle access") + } + + return shareableStore, nil +} diff --git a/cmd/migrator/shared/conf.go b/internal/database/migration/store/registration.go similarity index 59% rename from cmd/migrator/shared/conf.go rename to internal/database/migration/store/registration.go index affe3497f70..e5b9973f1b3 100644 --- a/cmd/migrator/shared/conf.go +++ b/internal/database/migration/store/registration.go @@ -1,4 +1,4 @@ -package shared +package store import ( "context" @@ -11,17 +11,52 @@ import ( "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas" "github.com/sourcegraph/sourcegraph/internal/database/postgresdsn" "github.com/sourcegraph/sourcegraph/internal/jsonc" + "github.com/sourcegraph/sourcegraph/internal/oobmigration" + "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations" "github.com/sourcegraph/sourcegraph/lib/errors" "github.com/sourcegraph/sourcegraph/schema" ) +type RegisterMigratorsUsingConfAndStoreFactoryFunc func( + ctx context.Context, + db database.DB, + runner *oobmigration.Runner, + conf conftypes.UnifiedQuerier, + storeFactory migrations.StoreFactory, +) error + +func ComposeRegisterMigratorsFuncs(fnsFromConfAndStoreFactory ...RegisterMigratorsUsingConfAndStoreFactoryFunc) func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc { + return func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc { + return func(ctx context.Context, db database.DB, runner *oobmigration.Runner) error { + conf, err := newStaticConf(ctx, db) + if err != nil { + return err + } + + fns := make([]oobmigration.RegisterMigratorsFunc, 0, len(fnsFromConfAndStoreFactory)) + for _, f := range fnsFromConfAndStoreFactory { + f := f // avoid loop capture + if f == nil { + continue + } + + fns = append(fns, func(ctx context.Context, db database.DB, runner *oobmigration.Runner) error { + return f(ctx, db, runner, conf, storeFactory) + }) + } + + return oobmigration.ComposeRegisterMigratorsFuncs(fns...)(ctx, db, runner) + } + } +} + type staticConf struct { serviceConnections conftypes.ServiceConnections siteConfig schema.SiteConfiguration } func newStaticConf(ctx context.Context, db database.DB) (*staticConf, error) { - serviceConnections, err := serviceConnections() + serviceConnections, err := migrationServiceConnections() if err != nil { return nil, err } @@ -40,7 +75,7 @@ func newStaticConf(ctx context.Context, db database.DB) (*staticConf, error) { func (c staticConf) ServiceConnections() conftypes.ServiceConnections { return c.serviceConnections } func (c staticConf) SiteConfig() schema.SiteConfiguration { return c.siteConfig } -func serviceConnections() (conftypes.ServiceConnections, error) { +func migrationServiceConnections() (conftypes.ServiceConnections, error) { dsns, err := postgresdsn.DSNsBySchema(schemas.SchemaNames) if err != nil { return conftypes.ServiceConnections{}, err diff --git a/internal/oobmigration/BUILD.bazel b/internal/oobmigration/BUILD.bazel index 1119a4e32b6..d74678cbdac 100644 --- a/internal/oobmigration/BUILD.bazel +++ b/internal/oobmigration/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "interrupts.go", "migrator.go", "observability.go", + "progress.go", "registration.go", "runner.go", "store.go", @@ -31,6 +32,7 @@ go_library( "//internal/version", "//internal/version/upgradestore", "//lib/errors", + "//lib/output", "@com_github_derision_test_glock//:glock", "@com_github_jackc_pgconn//:pgconn", "@com_github_keegancsmith_sqlf//:sqlf", diff --git a/internal/oobmigration/migrations/register.go b/internal/oobmigration/migrations/register.go index fda7c0ab39d..b8f8ae291fc 100644 --- a/internal/oobmigration/migrations/register.go +++ b/internal/oobmigration/migrations/register.go @@ -30,7 +30,7 @@ func RegisterOSSMigratorsUsingConfAndStoreFactory( db database.DB, runner *oobmigration.Runner, conf conftypes.UnifiedQuerier, - storeFactory StoreFactory, + _ StoreFactory, ) error { keys, err := keyring.NewRing(ctx, conf.SiteConfig().EncryptionKeys) if err != nil { diff --git a/internal/oobmigration/progress.go b/internal/oobmigration/progress.go new file mode 100644 index 00000000000..eae2de9e2d1 --- /dev/null +++ b/internal/oobmigration/progress.go @@ -0,0 +1,52 @@ +package oobmigration + +import ( + "fmt" + "os" + + "github.com/sourcegraph/log" + + "github.com/sourcegraph/sourcegraph/lib/output" +) + +// makeOutOfBandMigrationProgressUpdater returns a two functions: `update` should be called +// when the updates to the progress of an out-of-band migration are made and should be reflected +// in the output; and `cleanup` should be called on defer when the progress object should be +// disposed. +func MakeProgressUpdater(out *output.Output, ids []int, animateProgress bool) ( + update func(i int, m Migration), + cleanup func(), +) { + if !animateProgress || shouldDisableProgressAnimation() { + update = func(i int, m Migration) { + out.WriteLine(output.Linef("", output.StyleReset, "Migration #%d is %.2f%% complete", m.ID, m.Progress*100)) + } + return update, func() {} + } + + bars := make([]output.ProgressBar, 0, len(ids)) + for _, id := range ids { + bars = append(bars, output.ProgressBar{ + Label: fmt.Sprintf("Migration #%d", id), + Max: 1.0, + }) + } + + progress := out.Progress(bars, nil) + return func(i int, m Migration) { progress.SetValue(i, m.Progress) }, progress.Destroy +} + +// shouldDisableProgressAnimation determines if progress bars should be avoided because the log level +// will create output that interferes with a stable canvas. In effect, this adds the -disable-animation +// flag when SRC_LOG_LEVEL is info or debug. +func shouldDisableProgressAnimation() bool { + switch log.Level(os.Getenv(log.EnvLogLevel)) { + case log.LevelDebug: + return true + case log.LevelInfo: + return true + + default: + return false + } +} diff --git a/internal/oobmigration/validate.go b/internal/oobmigration/validate.go index 36e7732bce1..2f8065cffab 100644 --- a/internal/oobmigration/validate.go +++ b/internal/oobmigration/validate.go @@ -29,7 +29,7 @@ func ValidateOutOfBandMigrationRunner(ctx context.Context, db database.DB, runne return nil } - firstSemverString, ok, err := upgradestore.New(db).GetFirstServiceVersion(ctx, "frontend") + firstSemverString, ok, err := upgradestore.New(db).GetFirstServiceVersion(ctx) if err != nil { return errors.Wrap(err, "failed to retrieve first instance version") } diff --git a/internal/version/upgradestore/store.go b/internal/version/upgradestore/store.go index 2540730ad4f..52a0c0933c6 100644 --- a/internal/version/upgradestore/store.go +++ b/internal/version/upgradestore/store.go @@ -35,8 +35,8 @@ func NewWith(db basestore.TransactableHandle) *store { // GetFirstServiceVersion returns the first version registered for the given Sourcegraph service. This // method will return a false-valued flag if UpdateServiceVersion has never been called for the given // service. -func (s *store) GetFirstServiceVersion(ctx context.Context, service string) (string, bool, error) { - version, ok, err := basestore.ScanFirstString(s.db.Query(ctx, sqlf.Sprintf(getFirstServiceVersionQuery, service))) +func (s *store) GetFirstServiceVersion(ctx context.Context) (string, bool, error) { + version, ok, err := basestore.ScanFirstString(s.db.Query(ctx, sqlf.Sprintf(getFirstServiceVersionQuery, "frontend"))) return version, ok, filterMissingRelationError(err) } @@ -47,8 +47,8 @@ SELECT first_version FROM versions WHERE service = %s // GetServiceVersion returns the previous version registered for the given Sourcegraph service. This // method will return a false-valued flag if UpdateServiceVersion has never been called for the given // service. -func (s *store) GetServiceVersion(ctx context.Context, service string) (string, bool, error) { - version, ok, err := basestore.ScanFirstString(s.db.Query(ctx, sqlf.Sprintf(getServiceVersionQuery, service))) +func (s *store) GetServiceVersion(ctx context.Context) (string, bool, error) { + version, ok, err := basestore.ScanFirstString(s.db.Query(ctx, sqlf.Sprintf(getServiceVersionQuery, "frontend"))) return version, ok, filterMissingRelationError(err) } @@ -65,8 +65,8 @@ func (s *store) ValidateUpgrade(ctx context.Context, service, version string) er // UpdateServiceVersion updates the latest version for the given Sourcegraph service. This method also enforces // our documented upgrade policy and will return an error (performing no side-effects) if the upgrade is between // two unsupported versions. See https://docs.sourcegraph.com/#upgrading-sourcegraph. -func (s *store) UpdateServiceVersion(ctx context.Context, service, version string) error { - return s.updateServiceVersion(ctx, service, version, true) +func (s *store) UpdateServiceVersion(ctx context.Context, version string) error { + return s.updateServiceVersion(ctx, "frontend", version, true) } func (s *store) updateServiceVersion(ctx context.Context, service, version string, update bool) error { @@ -114,8 +114,8 @@ WHERE versions.version = %s // SetServiceVersion updates the latest version for the given Sourcegraph service. This method also enforces // our documented upgrade policy and will return an error (performing no side-effects) if the upgrade is between // two unsupported versions. See https://docs.sourcegraph.com/#upgrading-sourcegraph. -func (s *store) SetServiceVersion(ctx context.Context, service, version string) error { - return s.db.Exec(ctx, sqlf.Sprintf(setServiceVersionQuery, version, time.Now().UTC(), service)) +func (s *store) SetServiceVersion(ctx context.Context, version string) error { + return s.db.Exec(ctx, sqlf.Sprintf(setServiceVersionQuery, version, time.Now().UTC(), "frontend")) } const setServiceVersionQuery = ` diff --git a/internal/version/upgradestore/store_test.go b/internal/version/upgradestore/store_test.go index bfac7c767b8..9563c0bdad9 100644 --- a/internal/version/upgradestore/store_test.go +++ b/internal/version/upgradestore/store_test.go @@ -22,7 +22,7 @@ func TestGetServiceVersion(t *testing.T) { store := New(db) t.Run("fresh db", func(t *testing.T) { - _, ok, err := store.GetServiceVersion(ctx, "service") + _, ok, err := store.GetServiceVersion(ctx) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -32,17 +32,17 @@ func TestGetServiceVersion(t *testing.T) { }) t.Run("after updates", func(t *testing.T) { - if err := store.UpdateServiceVersion(ctx, "service", "1.2.3"); err != nil { + if err := store.UpdateServiceVersion(ctx, "1.2.3"); err != nil { t.Fatalf("unexpected error: %s", err) } - if err := store.UpdateServiceVersion(ctx, "service", "1.2.4"); err != nil { + if err := store.UpdateServiceVersion(ctx, "1.2.4"); err != nil { t.Fatalf("unexpected error: %s", err) } - if err := store.UpdateServiceVersion(ctx, "service", "1.3.0"); err != nil { + if err := store.UpdateServiceVersion(ctx, "1.3.0"); err != nil { t.Fatalf("unexpected error: %s", err) } - version, ok, err := store.GetServiceVersion(ctx, "service") + version, ok, err := store.GetServiceVersion(ctx) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -59,7 +59,7 @@ func TestGetServiceVersion(t *testing.T) { t.Fatalf("unexpected error: %s", err) } - _, ok, err := store.GetServiceVersion(ctx, "service") + _, ok, err := store.GetServiceVersion(ctx) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -75,15 +75,15 @@ func TestSetServiceVersion(t *testing.T) { db := database.NewDB(logger, dbtest.NewDB(logger, t)) store := New(db) - if err := store.UpdateServiceVersion(ctx, "service", "1.2.3"); err != nil { + if err := store.UpdateServiceVersion(ctx, "1.2.3"); err != nil { t.Fatalf("unexpected error: %s", err) } - if err := store.SetServiceVersion(ctx, "service", "1.2.5"); err != nil { + if err := store.SetServiceVersion(ctx, "1.2.5"); err != nil { t.Fatalf("unexpected error: %s", err) } - version, _, err := store.GetServiceVersion(ctx, "service") + version, _, err := store.GetServiceVersion(ctx) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -99,7 +99,7 @@ func TestGetFirstServiceVersion(t *testing.T) { store := New(db) t.Run("fresh db", func(t *testing.T) { - _, ok, err := store.GetFirstServiceVersion(ctx, "service") + _, ok, err := store.GetFirstServiceVersion(ctx) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -109,17 +109,17 @@ func TestGetFirstServiceVersion(t *testing.T) { }) t.Run("after updates", func(t *testing.T) { - if err := store.UpdateServiceVersion(ctx, "service", "1.2.3"); err != nil { + if err := store.UpdateServiceVersion(ctx, "1.2.3"); err != nil { t.Fatalf("unexpected error: %s", err) } - if err := store.UpdateServiceVersion(ctx, "service", "1.2.4"); err != nil { + if err := store.UpdateServiceVersion(ctx, "1.2.4"); err != nil { t.Fatalf("unexpected error: %s", err) } - if err := store.UpdateServiceVersion(ctx, "service", "1.3.0"); err != nil { + if err := store.UpdateServiceVersion(ctx, "1.3.0"); err != nil { t.Fatalf("unexpected error: %s", err) } - firstVersion, ok, err := store.GetFirstServiceVersion(ctx, "service") + firstVersion, ok, err := store.GetFirstServiceVersion(ctx) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -136,7 +136,7 @@ func TestGetFirstServiceVersion(t *testing.T) { t.Fatalf("unexpected error: %s", err) } - _, ok, err := store.GetFirstServiceVersion(ctx, "service") + _, ok, err := store.GetFirstServiceVersion(ctx) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -163,12 +163,12 @@ func TestUpdateServiceVersion(t *testing.T) { {"0.2.0", nil}, {"1.0.0", nil}, {"1.2.0", &UpgradeError{ - Service: "service", + Service: "frontend", Previous: semver.MustParse("1.0.0"), Latest: semver.MustParse("1.2.0"), }}, {"2.1.0", &UpgradeError{ - Service: "service", + Service: "frontend", Previous: semver.MustParse("1.0.0"), Latest: semver.MustParse("2.1.0"), }}, @@ -176,12 +176,12 @@ func TestUpdateServiceVersion(t *testing.T) { {"non-semantic-version-is-always-valid", nil}, {"1.0.0", nil}, // back to semantic version is allowed {"2.1.0", &UpgradeError{ - Service: "service", + Service: "frontend", Previous: semver.MustParse("1.0.0"), Latest: semver.MustParse("2.1.0"), }}, // upgrade policy violation returns } { - have := store.UpdateServiceVersion(ctx, "service", tc.version) + have := store.UpdateServiceVersion(ctx, tc.version) want := tc.err if !errors.Is(have, want) { @@ -197,7 +197,7 @@ func TestUpdateServiceVersion(t *testing.T) { t.Fatalf("unexpected error: %s", err) } - if err := store.UpdateServiceVersion(ctx, "service", "0.0.1"); err == nil { + if err := store.UpdateServiceVersion(ctx, "0.0.1"); err == nil { t.Fatalf("expected error, got none") } }) @@ -214,7 +214,7 @@ func TestValidateUpgrade(t *testing.T) { t.Fatalf("unexpected error: %s", err) } - if err := store.ValidateUpgrade(ctx, "service", "0.0.1"); err != nil { + if err := store.ValidateUpgrade(ctx, "frontend", "0.0.1"); err != nil { t.Fatalf("unexpected error: %s", err) } }) diff --git a/lib/output/BUILD.bazel b/lib/output/BUILD.bazel index 7f7960385b9..7c61a510425 100644 --- a/lib/output/BUILD.bazel +++ b/lib/output/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "capabilities.go", "emoji.go", "line.go", + "logger.go", "noop_writer.go", "output.go", "output_unix.go", @@ -37,6 +38,7 @@ go_library( "@com_github_mattn_go_runewidth//:go-runewidth", "@com_github_moby_term//:term", "@com_github_muesli_termenv//:termenv", + "@com_github_sourcegraph_log//:log", "@org_golang_x_term//:term", ] + select({ "@io_bazel_rules_go//go/platform:windows": [ diff --git a/lib/output/emoji.go b/lib/output/emoji.go index 468d05808a7..469e9eccb8f 100644 --- a/lib/output/emoji.go +++ b/lib/output/emoji.go @@ -1,5 +1,20 @@ package output +var allEmojis = [...]string{ + EmojiFailure, + EmojiWarning, + EmojiSuccess, + EmojiInfo, + EmojiLightbulb, + EmojiAsterisk, + EmojiWarningSign, + EmojiFingerPointRight, + EmojiHourglass, + EmojiShrug, + EmojiOk, + EmojiQuestionMark, +} + // Standard emoji for use in output. const ( EmojiFailure = "❌" diff --git a/lib/output/logger.go b/lib/output/logger.go new file mode 100644 index 00000000000..76f4cf1111e --- /dev/null +++ b/lib/output/logger.go @@ -0,0 +1,31 @@ +package output + +import ( + "bytes" + + "github.com/sourcegraph/log" +) + +type logFacade struct { + logger log.Logger +} + +func OutputFromLogger(logger log.Logger) *Output { + return NewOutput(&logFacade{logger}, OutputOpts{}) +} + +func (l *logFacade) Write(p []byte) (n int, err error) { + for _, emoji := range allEmojis { + if bytes.HasPrefix(p, []byte(emoji)) { + switch emoji { + case EmojiWarningSign: + l.logger.Warn(string(p[len(emoji):])) + case EmojiFailure, EmojiWarning: + l.logger.Error(string(p[len(emoji):])) + default: + l.logger.Info(string(p[len(emoji):])) + } + } + } + return len(p), nil +}