migrator: extract non-cli specifics from cli package (#53247)

YOINK

## Test plan

Tested migrator with an mvu (`upgrade` from 4.5.0 to 5.0.5) and standard
(`up` from 4.5.0 to 5.0.5)
This commit is contained in:
Noah S-C 2023-06-09 18:51:56 +01:00 committed by GitHub
parent 75cf2f5c2d
commit 591d702d29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 1091 additions and 941 deletions

View File

@ -255,8 +255,11 @@ go_library(
"//internal/conf/deploy",
"//internal/conf/reposource",
"//internal/database",
"//internal/database/migration",
"//internal/database/migration/cliutil",
"//internal/database/migration/drift",
"//internal/database/migration/multiversion",
"//internal/database/migration/runner",
"//internal/database/migration/schemas",
"//internal/deviceid",
"//internal/encryption",

View File

@ -26,8 +26,11 @@ import (
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/conf/deploy"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/migration"
"github.com/sourcegraph/sourcegraph/internal/database/migration/cliutil"
"github.com/sourcegraph/sourcegraph/internal/database/migration/drift"
"github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/insights"
@ -321,16 +324,16 @@ type upgradeReadinessResolver struct {
initOnce sync.Once
initErr error
runner cliutil.Runner
runner *runner.Runner
version string
schemaNames []string
}
var devSchemaFactory = cliutil.NewExpectedSchemaFactory(
var devSchemaFactory = schemas.NewExpectedSchemaFactory(
"Local file",
[]cliutil.NamedRegexp{{Regexp: lazyregexp.New(`^dev$`)}},
[]schemas.NamedRegexp{{Regexp: lazyregexp.New(`^dev$`)}},
func(filename, _ string) string { return filename },
cliutil.ReadSchemaFromFile,
schemas.ReadSchemaFromFile,
)
var schemaFactories = append(
@ -341,9 +344,9 @@ var schemaFactories = append(
var insidersVersionPattern = lazyregexp.New(`^[\w-]+_\d{4}-\d{2}-\d{2}_\d+\.\d+-(\w+)$`)
func (r *upgradeReadinessResolver) init(ctx context.Context) (_ cliutil.Runner, version string, schemaNames []string, _ error) {
func (r *upgradeReadinessResolver) init(ctx context.Context) (_ *runner.Runner, version string, schemaNames []string, _ error) {
r.initOnce.Do(func() {
r.runner, r.version, r.schemaNames, r.initErr = func() (cliutil.Runner, string, []string, error) {
r.runner, r.version, r.schemaNames, r.initErr = func() (*runner.Runner, string, []string, error) {
schemaNames := []string{schemas.Frontend.Name, schemas.CodeIntel.Name}
schemaList := []*schemas.Schema{schemas.Frontend, schemas.CodeIntel}
if insights.IsCodeInsightsEnabled() {
@ -351,7 +354,7 @@ func (r *upgradeReadinessResolver) init(ctx context.Context) (_ cliutil.Runner,
schemaList = append(schemaList, schemas.CodeInsights)
}
observationCtx := observation.NewContext(r.logger)
runner, err := migratorshared.NewRunnerWithSchemas(observationCtx, r.logger, schemaNames, schemaList)
runner, err := migration.NewRunnerWithSchemas(observationCtx, output.OutputFromLogger(r.logger), "frontend-upgradereadiness", schemaNames, schemaList)
if err != nil {
return nil, "", nil, errors.Wrap(err, "new runner")
}
@ -447,12 +450,12 @@ func (r *upgradeReadinessResolver) SchemaDrift(ctx context.Context) ([]*schemaDr
var buf bytes.Buffer
driftOut := output.NewOutput(&buf, output.OutputOpts{})
expectedSchema, err := cliutil.FetchExpectedSchema(ctx, schemaName, version, driftOut, schemaFactories)
expectedSchema, err := multiversion.FetchExpectedSchema(ctx, schemaName, version, driftOut, schemaFactories)
if err != nil {
return nil, err
}
for _, summary := range drift.CompareSchemaDescriptions(schemaName, version, cliutil.Canonicalize(schema), cliutil.Canonicalize(expectedSchema)) {
for _, summary := range drift.CompareSchemaDescriptions(schemaName, version, multiversion.Canonicalize(schema), multiversion.Canonicalize(expectedSchema)) {
resolvers = append(resolvers, &schemaDriftResolver{
summary: summary,
})

View File

@ -77,7 +77,7 @@ func InitDB(logger sglog.Logger) (*sql.DB, error) {
return nil, errors.Errorf("failed to connect to frontend database: %s", err)
}
if err := upgradestore.New(database.NewDB(logger, sqlDB)).UpdateServiceVersion(context.Background(), "frontend", version.Version()); err != nil {
if err := upgradestore.New(database.NewDB(logger, sqlDB)).UpdateServiceVersion(context.Background(), version.Version()); err != nil {
return nil, err
}

View File

@ -2,31 +2,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "shared",
srcs = [
"conf.go",
"main.go",
"registration.go",
],
srcs = ["main.go"],
importpath = "github.com/sourcegraph/sourcegraph/cmd/migrator/shared",
visibility = ["//visibility:public"],
deps = [
"//internal/conf/conftypes",
"//internal/database",
"//internal/database/basestore",
"//internal/database/connections/live",
"//internal/database/migration",
"//internal/database/migration/cliutil",
"//internal/database/migration/runner",
"//internal/database/migration/schemas",
"//internal/database/migration/store",
"//internal/database/postgresdsn",
"//internal/jsonc",
"//internal/observation",
"//internal/oobmigration",
"//internal/oobmigration/migrations",
"//internal/version",
"//lib/errors",
"//lib/output",
"//schema",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_sourcegraph_log//:log",
"@com_github_urfave_cli_v2//:cli",
],

View File

@ -2,19 +2,17 @@ package shared
import (
"context"
"database/sql"
"os"
"strings"
"github.com/urfave/cli/v2"
"github.com/sourcegraph/log"
connections "github.com/sourcegraph/sourcegraph/internal/database/connections/live"
"github.com/sourcegraph/sourcegraph/internal/database/migration"
"github.com/sourcegraph/sourcegraph/internal/database/migration/cliutil"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/database/postgresdsn"
"github.com/sourcegraph/sourcegraph/internal/observation"
ossmigrations "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations"
"github.com/sourcegraph/sourcegraph/internal/version"
@ -25,53 +23,27 @@ const appName = "migrator"
var out = output.NewOutput(os.Stdout, output.OutputOpts{})
// NewRunnerWithSchemas returns new migrator runner with given scheme names and
// definitions.
func NewRunnerWithSchemas(observationCtx *observation.Context, logger log.Logger, schemaNames []string, schemas []*schemas.Schema) (cliutil.Runner, error) {
dsns, err := postgresdsn.DSNsBySchema(schemaNames)
if err != nil {
return nil, err
}
var dsnsStrings []string
for schema, dsn := range dsns {
dsnsStrings = append(dsnsStrings, schema+" => "+dsn)
}
out.WriteLine(output.Linef(output.EmojiInfo, output.StyleGrey, "Connection DSNs used: %s", strings.Join(dsnsStrings, ", ")))
storeFactory := func(db *sql.DB, migrationsTable string) connections.Store {
return connections.NewStoreShim(store.NewWithDB(observationCtx, db, migrationsTable))
}
r, err := connections.RunnerFromDSNsWithSchemas(out, logger, dsns, appName, storeFactory, schemas)
if err != nil {
return nil, err
}
return cliutil.NewShim(r), nil
}
// DefaultSchemaFactories is a list of schema factories to be used in
// non-exceptional cases.
var DefaultSchemaFactories = []cliutil.ExpectedSchemaFactory{
cliutil.LocalExpectedSchemaFactory,
cliutil.GitHubExpectedSchemaFactory,
cliutil.GCSExpectedSchemaFactory,
var DefaultSchemaFactories = []schemas.ExpectedSchemaFactory{
schemas.LocalExpectedSchemaFactory,
schemas.GitHubExpectedSchemaFactory,
schemas.GCSExpectedSchemaFactory,
}
func Start(logger log.Logger, registerEnterpriseMigrators registerMigratorsUsingConfAndStoreFactoryFunc) error {
func Start(logger log.Logger, registerEnterpriseMigrators store.RegisterMigratorsUsingConfAndStoreFactoryFunc) error {
observationCtx := observation.NewContext(logger)
outputFactory := func() *output.Output { return out }
newRunnerWithSchemas := func(schemaNames []string, schemas []*schemas.Schema) (cliutil.Runner, error) {
return NewRunnerWithSchemas(observationCtx, logger, schemaNames, schemas)
newRunnerWithSchemas := func(schemaNames []string, schemas []*schemas.Schema) (*runner.Runner, error) {
return migration.NewRunnerWithSchemas(observationCtx, out, "migrator", schemaNames, schemas)
}
newRunner := func(schemaNames []string) (cliutil.Runner, error) {
newRunner := func(schemaNames []string) (*runner.Runner, error) {
return newRunnerWithSchemas(schemaNames, schemas.Schemas)
}
registerMigrators := composeRegisterMigratorsFuncs(
registerMigrators := store.ComposeRegisterMigratorsFuncs(
ossmigrations.RegisterOSSMigratorsUsingConfAndStoreFactory,
registerEnterpriseMigrators,
)

View File

@ -1,43 +0,0 @@
package shared
import (
"context"
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations"
)
type registerMigratorsUsingConfAndStoreFactoryFunc func(
ctx context.Context,
db database.DB,
runner *oobmigration.Runner,
conf conftypes.UnifiedQuerier,
storeFactory migrations.StoreFactory,
) error
func composeRegisterMigratorsFuncs(fnsFromConfAndStoreFactory ...registerMigratorsUsingConfAndStoreFactoryFunc) func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc {
return func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc {
return func(ctx context.Context, db database.DB, runner *oobmigration.Runner) error {
conf, err := newStaticConf(ctx, db)
if err != nil {
return err
}
fns := make([]oobmigration.RegisterMigratorsFunc, 0, len(fnsFromConfAndStoreFactory))
for _, f := range fnsFromConfAndStoreFactory {
f := f // avoid loop capture
if f == nil {
continue
}
fns = append(fns, func(ctx context.Context, db database.DB, runner *oobmigration.Runner) error {
return f(ctx, db, runner, conf, storeFactory)
})
}
return oobmigration.ComposeRegisterMigratorsFuncs(fns...)(ctx, db, runner)
}
}
}

View File

@ -20,6 +20,7 @@ import (
"github.com/sourcegraph/sourcegraph/dev/sg/root"
connections "github.com/sourcegraph/sourcegraph/internal/database/connections/live"
"github.com/sourcegraph/sourcegraph/internal/database/migration/cliutil"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/database/postgresdsn"
@ -114,9 +115,9 @@ var (
// at compile-time in sg.
outputFactory = func() *output.Output { return std.Out.Output }
schemaFactories = []cliutil.ExpectedSchemaFactory{
schemaFactories = []schemas.ExpectedSchemaFactory{
localGitExpectedSchemaFactory,
cliutil.GCSExpectedSchemaFactory,
schemas.GCSExpectedSchemaFactory,
}
upCommand = cliutil.Up("sg migration", makeRunner, outputFactory, true)
@ -209,7 +210,7 @@ sg migration squash
}
)
func makeRunner(schemaNames []string) (cliutil.Runner, error) {
func makeRunner(schemaNames []string) (*runner.Runner, error) {
filesystemSchemas, err := getFilesystemSchemas()
if err != nil {
return nil, err
@ -218,7 +219,7 @@ func makeRunner(schemaNames []string) (cliutil.Runner, error) {
return makeRunnerWithSchemas(schemaNames, filesystemSchemas)
}
func makeRunnerWithSchemas(schemaNames []string, schemas []*schemas.Schema) (cliutil.Runner, error) {
func makeRunnerWithSchemas(schemaNames []string, schemas []*schemas.Schema) (*runner.Runner, error) {
// Try to read the `sg` configuration so we can read ENV vars from the
// configuration and use process env as fallback.
var getEnv func(string) string
@ -238,13 +239,13 @@ func makeRunnerWithSchemas(schemaNames []string, schemas []*schemas.Schema) (cli
return nil, err
}
return cliutil.NewShim(r), nil
return r, nil
}
// localGitExpectedSchemaFactory returns the description of the given schema at the given version via the
// (assumed) local git clone. If the version is not resolvable as a git rev-like, or if the file does not
// exist at that revision, then a false valued-flag is returned. All other failures are reported as errors.
var localGitExpectedSchemaFactory = cliutil.NewExpectedSchemaFactory(
var localGitExpectedSchemaFactory = schemas.NewExpectedSchemaFactory(
"git",
nil,
func(filename, version string) string {

View File

@ -16,6 +16,7 @@ go_library(
deps = [
"//enterprise/cmd/frontend/shared",
"//enterprise/cmd/sourcegraph/enterprisecmd",
"//internal/oobmigration",
"//internal/sanitycheck",
"//ui/assets",
"//ui/assets/enterprise",

View File

@ -6,12 +6,18 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/frontend/shared"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/sourcegraph/enterprisecmd"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/sanitycheck"
"github.com/sourcegraph/sourcegraph/ui/assets"
_ "github.com/sourcegraph/sourcegraph/ui/assets/enterprise" // Select enterprise assets
)
func init() {
// TODO(sqs): TODO(single-binary): could we move this out of init?
oobmigration.ReturnEnterpriseMigrations = true
}
func main() {
sanitycheck.Pass()
if os.Getenv("WEBPACK_DEV_SERVER") == "1" {

View File

@ -6,7 +6,6 @@ go_library(
importpath = "github.com/sourcegraph/sourcegraph/enterprise/cmd/sourcegraph/enterprisecmd",
visibility = ["//visibility:public"],
deps = [
"//internal/oobmigration",
"//internal/service",
"//internal/service/svcmain",
],

View File

@ -4,7 +4,6 @@
package enterprisecmd
import (
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/service"
"github.com/sourcegraph/sourcegraph/internal/service/svcmain"
)
@ -21,8 +20,3 @@ func MainEnterprise(services []service.Service, args []string) {
func SingleServiceMainEnterprise(service service.Service) {
svcmain.SingleServiceMain(service, config)
}
func init() {
// TODO(sqs): TODO(single-binary): could we move this out of init?
oobmigration.ReturnEnterpriseMigrations = true
}

17
internal/database/migration/BUILD.bazel generated Normal file
View File

@ -0,0 +1,17 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "migration",
srcs = ["runner.go"],
importpath = "github.com/sourcegraph/sourcegraph/internal/database/migration",
visibility = ["//:__subpackages__"],
deps = [
"//internal/database/connections/live",
"//internal/database/migration/runner",
"//internal/database/migration/schemas",
"//internal/database/migration/store",
"//internal/database/postgresdsn",
"//internal/observation",
"//lib/output",
],
)

View File

@ -10,8 +10,6 @@ go_library(
"downto.go",
"drift.go",
"drift_autofix.go",
"drift_schema.go",
"drift_util.go",
"help.go",
"iface.go",
"multiversion.go",
@ -27,13 +25,12 @@ go_library(
visibility = ["//:__subpackages__"],
deps = [
"//internal/database",
"//internal/database/basestore",
"//internal/database/migration/definition",
"//internal/database/migration/drift",
"//internal/database/migration/multiversion",
"//internal/database/migration/runner",
"//internal/database/migration/schemas",
"//internal/database/migration/shared",
"//internal/lazyregexp",
"//internal/database/migration/store",
"//internal/observation",
"//internal/oobmigration",
"//internal/oobmigration/migrations",
@ -41,9 +38,7 @@ go_library(
"//internal/version/upgradestore",
"//lib/errors",
"//lib/output",
"@com_github_google_go_cmp//cmp",
"@com_github_sourcegraph_log//:log",
"@com_github_urfave_cli_v2//:cli",
"@org_cuelang_go//pkg/strings",
],
)

View File

@ -6,6 +6,10 @@ import (
"github.com/urfave/cli/v2"
"github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations"
"github.com/sourcegraph/sourcegraph/lib/errors"
@ -14,10 +18,10 @@ import (
func Downgrade(
commandName string,
runnerFactory RunnerFactoryWithSchemas,
runnerFactory runner.RunnerFactoryWithSchemas,
outFactory OutputFactory,
registerMigrators func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc,
expectedSchemaFactories ...ExpectedSchemaFactory,
expectedSchemaFactories ...schemas.ExpectedSchemaFactory,
) *cli.Command {
fromFlag := &cli.StringFlag{
Name: "from",
@ -118,7 +122,7 @@ func Downgrade(
// Find the relevant schema and data migrations to perform (and in what order)
// for the given version range.
plan, err := planMigration(from, to, versionRange, interrupts)
plan, err := multiversion.PlanMigration(from, to, versionRange, interrupts)
if err != nil {
return err
}
@ -128,9 +132,21 @@ func Downgrade(
return err
}
runner, err := runnerFactory(schemas.SchemaNames, schemas.Schemas)
if err != nil {
return errors.Wrap(err, "new runner")
}
// connect to db and get upgrade readiness state
db, err := store.ExtractDatabase(ctx, runner)
if err != nil {
return errors.Wrap(err, "new db handle")
}
// Perform the downgrade on the configured databases.
return runMigration(
return multiversion.RunMigration(
ctx,
db,
runnerFactory,
plan,
privilegedMode,

View File

@ -3,20 +3,18 @@ package cliutil
import (
"context"
"fmt"
"net/url"
"sort"
"cuelang.org/go/pkg/strings"
"github.com/urfave/cli/v2"
"github.com/sourcegraph/sourcegraph/internal/database/migration/drift"
descriptions "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/output"
)
func Drift(commandName string, factory RunnerFactory, outFactory OutputFactory, development bool, expectedSchemaFactories ...ExpectedSchemaFactory) *cli.Command {
func Drift(commandName string, factory RunnerFactory, outFactory OutputFactory, development bool, expectedSchemaFactories ...schemas.ExpectedSchemaFactory) *cli.Command {
defaultVersion := ""
if development {
defaultVersion = "HEAD"
@ -126,33 +124,33 @@ func Drift(commandName string, factory RunnerFactory, outFactory OutputFactory,
}
if file != "" {
expectedSchemaFactories = []ExpectedSchemaFactory{
NewExplicitFileSchemaFactory(file),
expectedSchemaFactories = []schemas.ExpectedSchemaFactory{
schemas.NewExplicitFileSchemaFactory(file),
}
}
expectedSchema, err := FetchExpectedSchema(ctx, schemaName, version, out, expectedSchemaFactories)
expectedSchema, err := multiversion.FetchExpectedSchema(ctx, schemaName, version, out, expectedSchemaFactories)
if err != nil {
return err
}
schemas, err := store.Describe(ctx)
allSchemas, err := store.Describe(ctx)
if err != nil {
return err
}
schema := schemas["public"]
summaries := drift.CompareSchemaDescriptions(schemaName, version, Canonicalize(schema), Canonicalize(expectedSchema))
schema := allSchemas["public"]
summaries := drift.CompareSchemaDescriptions(schemaName, version, multiversion.Canonicalize(schema), multiversion.Canonicalize(expectedSchema))
if autofixFlag.Get(cmd) {
summaries, err = attemptAutofix(ctx, out, store, summaries, func(schema descriptions.SchemaDescription) []drift.Summary {
return drift.CompareSchemaDescriptions(schemaName, version, Canonicalize(schema), Canonicalize(expectedSchema))
summaries, err = attemptAutofix(ctx, out, store, summaries, func(schema schemas.SchemaDescription) []drift.Summary {
return drift.CompareSchemaDescriptions(schemaName, version, multiversion.Canonicalize(schema), multiversion.Canonicalize(expectedSchema))
})
if err != nil {
return err
}
}
return displayDriftSummaries(out, summaries)
return drift.DisplaySchemaSummaries(out, summaries)
})
flags := []cli.Flag{
@ -174,117 +172,3 @@ func Drift(commandName string, factory RunnerFactory, outFactory OutputFactory,
Flags: flags,
}
}
func FetchExpectedSchema(
ctx context.Context,
schemaName string,
version string,
out *output.Output,
expectedSchemaFactories []ExpectedSchemaFactory,
) (descriptions.SchemaDescription, error) {
filename, err := getSchemaJSONFilename(schemaName)
if err != nil {
return descriptions.SchemaDescription{}, err
}
out.WriteLine(output.Line(output.EmojiInfo, output.StyleReset, "Locating schema description"))
for i, factory := range expectedSchemaFactories {
matches := false
patterns := factory.VersionPatterns()
for _, pattern := range patterns {
if pattern.MatchString(version) {
matches = true
break
}
}
if len(patterns) > 0 && !matches {
continue
}
resourcePath := factory.ResourcePath(filename, version)
expectedSchema, err := factory.CreateFromPath(ctx, resourcePath)
if err != nil {
suffix := ""
if i < len(expectedSchemaFactories)-1 {
suffix = " Will attempt a fallback source."
}
out.WriteLine(output.Linef(output.EmojiInfo, output.StyleReset, "Reading schema definition in %s (%s)... Schema not found (%s).%s", factory.Name(), resourcePath, err, suffix))
continue
}
out.WriteLine(output.Linef(output.EmojiSuccess, output.StyleReset, "Schema found in %s (%s).", factory.Name(), resourcePath))
return expectedSchema, nil
}
exampleMap := map[string]struct{}{}
failedPaths := map[string]struct{}{}
for _, factory := range expectedSchemaFactories {
for _, pattern := range factory.VersionPatterns() {
if !pattern.MatchString(version) {
exampleMap[pattern.Example()] = struct{}{}
} else {
failedPaths[factory.ResourcePath(filename, version)] = struct{}{}
}
}
}
versionExamples := make([]string, 0, len(exampleMap))
for pattern := range exampleMap {
versionExamples = append(versionExamples, pattern)
}
sort.Strings(versionExamples)
paths := make([]string, 0, len(exampleMap))
for path := range failedPaths {
if u, err := url.Parse(path); err == nil && (u.Scheme == "http" || u.Scheme == "https") {
paths = append(paths, path)
}
}
sort.Strings(paths)
if len(paths) > 0 {
var additionalHints string
if len(versionExamples) > 0 {
additionalHints = fmt.Sprintf(
"Alternative, provide a different version that matches one of the following patterns: \n - %s\n", strings.Join(versionExamples, "\n - "),
)
}
out.WriteLine(output.Linef(
output.EmojiLightbulb,
output.StyleFailure,
"Schema not found. "+
"Check if the following resources exist. "+
"If they do, then the context in which this migrator is being run may not be permitted to reach the public internet."+
"\n - %s\n%s",
strings.Join(paths, "\n - "),
additionalHints,
))
} else if len(versionExamples) > 0 {
out.WriteLine(output.Linef(
output.EmojiLightbulb,
output.StyleFailure,
"Schema not found. Ensure your supplied version matches one of the following patterns: \n - %s\n", strings.Join(versionExamples, "\n - "),
))
}
return descriptions.SchemaDescription{}, errors.New("failed to locate target schema description")
}
func Canonicalize(schemaDescription descriptions.SchemaDescription) descriptions.SchemaDescription {
descriptions.Canonicalize(schemaDescription)
filtered := schemaDescription.Tables[:0]
for i, table := range schemaDescription.Tables {
if table.Name == "migration_logs" {
continue
}
filtered = append(filtered, schemaDescription.Tables[i])
}
schemaDescription.Tables = filtered
return schemaDescription
}

View File

@ -25,17 +25,4 @@ type Store interface {
// OutputFactory allows providing global output that might not be instantiated at compile time.
type OutputFactory func() *output.Output
type RunnerFactory func(schemaNames []string) (Runner, error)
type RunnerFactoryWithSchemas func(schemaNames []string, schemas []*schemas.Schema) (Runner, error)
type runnerShim struct {
*runner.Runner
}
func NewShim(runner *runner.Runner) Runner {
return &runnerShim{Runner: runner}
}
func (r *runnerShim) Store(ctx context.Context, schemaName string) (Store, error) {
return r.Runner.Store(ctx, schemaName)
}
type RunnerFactory func(schemaNames []string) (*runner.Runner, error)

View File

@ -1,289 +1,28 @@
package cliutil
import (
"bytes"
"context"
"fmt"
"github.com/sourcegraph/sourcegraph/internal/database/migration/definition"
"github.com/sourcegraph/sourcegraph/internal/database/migration/drift"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/shared"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations"
"github.com/sourcegraph/sourcegraph/internal/version/upgradestore"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/output"
)
type migrationPlan struct {
// the source and target instance versions
from, to oobmigration.Version
// the stitched schema migration definitions over the entire version range by schema name
stitchedDefinitionsBySchemaName map[string]*definition.Definitions
// the sequence of migration steps over the stiched schema migration definitions; we can't
// simply apply all schema migrations as out-of-band migration can only run within a certain
// slice of the schema's definition where that out-of-band migration was defined
steps []migrationStep
}
type migrationStep struct {
// the target version to migrate to
instanceVersion oobmigration.Version
// the leaf migrations of this version by schema name
schemaMigrationLeafIDsBySchemaName map[string][]int
// the set of out-of-band migrations that must complete before schema migrations begin
// for the following minor instance version
outOfBandMigrationIDs []int
}
// planMigration returns a path to migrate through the given version ranges. Each step corresponds to
// a target instance version to migrate to, and a set of out-of-band migraitons that need to complete.
// Done insequence, it forms a complete multi-version upgrade or migration plan.
func planMigration(
from, to oobmigration.Version,
versionRange []oobmigration.Version,
interrupts []oobmigration.MigrationInterrupt,
) (migrationPlan, error) {
versionTags := make([]string, 0, len(versionRange))
for _, version := range versionRange {
versionTags = append(versionTags, version.GitTag())
}
// Retrieve relevant stitched migrations for this version range
stitchedMigrationBySchemaName, err := filterStitchedMigrationsForTags(versionTags)
if err != nil {
return migrationPlan{}, err
}
// Extract/rotate stitched migration definitions so we can query them by schem name
stitchedDefinitionsBySchemaName := make(map[string]*definition.Definitions, len(stitchedMigrationBySchemaName))
for schemaName, stitchedMigration := range stitchedMigrationBySchemaName {
stitchedDefinitionsBySchemaName[schemaName] = stitchedMigration.Definitions
}
// Extract/rotate leaf identifiers so we can query them by version/git-tag first
leafIDsBySchemaNameByTag := make(map[string]map[string][]int, len(versionRange))
for schemaName, stitchedMigration := range stitchedMigrationBySchemaName {
for tag, bounds := range stitchedMigration.BoundsByRev {
if _, ok := leafIDsBySchemaNameByTag[tag]; !ok {
leafIDsBySchemaNameByTag[tag] = map[string][]int{}
}
leafIDsBySchemaNameByTag[tag][schemaName] = bounds.LeafIDs
}
}
//
// Interleave out-of-band migration interrupts and schema migrations
steps := make([]migrationStep, 0, len(interrupts)+1)
for _, interrupt := range interrupts {
steps = append(steps, migrationStep{
instanceVersion: interrupt.Version,
schemaMigrationLeafIDsBySchemaName: leafIDsBySchemaNameByTag[interrupt.Version.GitTag()],
outOfBandMigrationIDs: interrupt.MigrationIDs,
})
}
steps = append(steps, migrationStep{
instanceVersion: to,
schemaMigrationLeafIDsBySchemaName: leafIDsBySchemaNameByTag[to.GitTag()],
outOfBandMigrationIDs: nil, // all required out of band migrations have already completed
})
return migrationPlan{
from: from,
to: to,
stitchedDefinitionsBySchemaName: stitchedDefinitionsBySchemaName,
steps: steps,
}, nil
}
// runMigration initializes a schema and out-of-band migration runner and performs the given migration plan.
func runMigration(
ctx context.Context,
runnerFactory RunnerFactoryWithSchemas,
plan migrationPlan,
privilegedMode runner.PrivilegedMode,
privilegedHashes []string,
skipVersionCheck bool,
skipDriftCheck bool,
dryRun bool,
up bool,
animateProgress bool,
registerMigratorsWithStore func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc,
expectedSchemaFactories []ExpectedSchemaFactory,
out *output.Output,
) error {
var runnerSchemas []*schemas.Schema
for _, schemaName := range schemas.SchemaNames {
runnerSchemas = append(runnerSchemas, &schemas.Schema{
Name: schemaName,
MigrationsTableName: schemas.MigrationsTableName(schemaName),
Definitions: plan.stitchedDefinitionsBySchemaName[schemaName],
})
}
r, err := runnerFactory(schemas.SchemaNames, runnerSchemas)
if err != nil {
return err
}
db, err := extractDatabase(ctx, r)
if err != nil {
return err
}
registerMigrators := registerMigratorsWithStore(basestoreExtractor{r})
// Note: Error is correctly checked here; we want to use the return value
// `patch` below but only if we can best-effort fetch it. We want to allow
// the user to skip erroring here if they are explicitly skipping this
// version check.
version, patch, ok, err := GetServiceVersion(ctx, r)
if !skipVersionCheck {
if err != nil {
return err
}
if !ok {
err := errors.Newf("version assertion failed: unknown version != %q", plan.from)
return errors.Newf("%s. Re-invoke with --skip-version-check to ignore this check", err)
}
if oobmigration.CompareVersions(version, plan.from) != oobmigration.VersionOrderEqual {
err := errors.Newf("version assertion failed: %q != %q", version, plan.from)
return errors.Newf("%s. Re-invoke with --skip-version-check to ignore this check", err)
}
}
if !skipDriftCheck {
if err := CheckDrift(ctx, r, plan.from.GitTagWithPatch(patch), out, false, schemas.SchemaNames, expectedSchemaFactories); err != nil {
return err
}
}
for i, step := range plan.steps {
out.WriteLine(output.Linef(
output.EmojiFingerPointRight,
output.StyleReset,
"Migrating to v%s (step %d of %d)",
step.instanceVersion,
i+1,
len(plan.steps),
))
out.WriteLine(output.Line(output.EmojiFingerPointRight, output.StyleReset, "Running schema migrations"))
if !dryRun {
operationType := runner.MigrationOperationTypeTargetedUp
if !up {
operationType = runner.MigrationOperationTypeTargetedDown
}
operations := make([]runner.MigrationOperation, 0, len(step.schemaMigrationLeafIDsBySchemaName))
for schemaName, leafMigrationIDs := range step.schemaMigrationLeafIDsBySchemaName {
operations = append(operations, runner.MigrationOperation{
SchemaName: schemaName,
Type: operationType,
TargetVersions: leafMigrationIDs,
})
}
if err := r.Run(ctx, runner.Options{
Operations: operations,
PrivilegedMode: privilegedMode,
MatchPrivilegedHash: func(hash string) bool {
for _, candidate := range privilegedHashes {
if hash == candidate {
return true
}
}
return false
},
IgnoreSingleDirtyLog: true,
IgnoreSinglePendingLog: true,
}); err != nil {
return err
}
out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Schema migrations complete"))
}
if len(step.outOfBandMigrationIDs) > 0 {
if err := runOutOfBandMigrations(
ctx,
db,
dryRun,
up,
animateProgress,
registerMigrators,
out,
step.outOfBandMigrationIDs,
); err != nil {
return err
}
}
}
if !dryRun {
// After successful migration, set the new instance version. The frontend still checks on
// startup that the previously running instance version was only one minor version away.
// If we run the upload without updating that value, the new instance will refuse to
// start without manual modification of the database.
//
// Note that we don't want to get rid of that check entirely from the frontend, as we do
// still want to catch the cases where site-admins "jump forward" several versions while
// using the standard upgrade path (not a multi-version upgrade that handles these cases).
if err := setServiceVersion(ctx, r, plan.to); err != nil {
return err
}
}
return nil
}
// filterStitchedMigrationsForTags returns a copy of the pre-compiled stitchedMap with references
// to tags outside of the given set removed. This allows a migrator instance that knows the migration
// path from X -> Y to also know the path from any partial migration X <= W -> Z <= Y.
func filterStitchedMigrationsForTags(tags []string) (map[string]shared.StitchedMigration, error) {
filteredStitchedMigrationBySchemaName := make(map[string]shared.StitchedMigration, len(schemas.SchemaNames))
for _, schemaName := range schemas.SchemaNames {
boundsByRev := make(map[string]shared.MigrationBounds, len(tags))
for _, tag := range tags {
bounds, ok := shared.StitchedMigationsBySchemaName[schemaName].BoundsByRev[tag]
if !ok {
return nil, errors.Newf("unknown tag %q", tag)
}
boundsByRev[tag] = bounds
}
filteredStitchedMigrationBySchemaName[schemaName] = shared.StitchedMigration{
Definitions: shared.StitchedMigationsBySchemaName[schemaName].Definitions,
BoundsByRev: boundsByRev,
}
}
return filteredStitchedMigrationBySchemaName, nil
}
// GetRawServiceVersion returns the frontend service version information for the given runner as a raw string.
func GetRawServiceVersion(ctx context.Context, r Runner) (_ string, ok bool, _ error) {
db, err := extractDatabase(ctx, r)
func GetRawServiceVersion(ctx context.Context, r *runner.Runner) (_ string, ok bool, _ error) {
db, err := store.ExtractDatabase(ctx, r)
if err != nil {
return "", false, err
}
return upgradestore.New(db).GetServiceVersion(ctx, "frontend")
return upgradestore.New(db).GetServiceVersion(ctx)
}
// GetServiceVersion returns the frontend service version information for the given runner as a parsed version.
// Both of the return values `ok` and `error` should be checked to ensure a valid version is returned.
func GetServiceVersion(ctx context.Context, r Runner) (_ oobmigration.Version, patch int, ok bool, _ error) {
func GetServiceVersion(ctx context.Context, r *runner.Runner) (_ oobmigration.Version, patch int, ok bool, _ error) {
versionStr, ok, err := GetRawServiceVersion(ctx, r)
if err != nil {
return oobmigration.Version{}, 0, false, err
@ -299,108 +38,3 @@ func GetServiceVersion(ctx context.Context, r Runner) (_ oobmigration.Version, p
return version, patch, true, nil
}
func setServiceVersion(ctx context.Context, r Runner, version oobmigration.Version) error {
db, err := extractDatabase(ctx, r)
if err != nil {
return err
}
return upgradestore.New(db).SetServiceVersion(
ctx,
"frontend",
fmt.Sprintf("%d.%d.0", version.Major, version.Minor),
)
}
var ErrDatabaseDriftDetected = errors.New("database drift detected")
// CheckDrift uses given runner to check whether schema drift exists for any
// non-empty database. It returns ErrDatabaseDriftDetected when the schema drift
// exists, and nil error when not.
//
// - The `verbose` indicates whether to collect drift details in the output.
// - The `schemaNames` is the list of schema names to check for drift.
// - The `expectedSchemaFactories` is the means to retrieve the schema.
// definitions at the target version.
func CheckDrift(ctx context.Context, r Runner, version string, out *output.Output, verbose bool, schemaNames []string, expectedSchemaFactories []ExpectedSchemaFactory) error {
type schemaWithDrift struct {
name string
drift *bytes.Buffer
}
schemasWithDrift := make([]*schemaWithDrift, 0, len(schemaNames))
for _, schemaName := range schemaNames {
store, err := r.Store(ctx, schemaName)
if err != nil {
return errors.Wrap(err, "get migration store")
}
schemaDescriptions, err := store.Describe(ctx)
if err != nil {
return err
}
schema := schemaDescriptions["public"]
var buf bytes.Buffer
driftOut := output.NewOutput(&buf, output.OutputOpts{})
expectedSchema, err := FetchExpectedSchema(ctx, schemaName, version, driftOut, expectedSchemaFactories)
if err != nil {
return err
}
if err := displayDriftSummaries(driftOut, drift.CompareSchemaDescriptions(schemaName, version, Canonicalize(schema), Canonicalize(expectedSchema))); err != nil {
schemasWithDrift = append(schemasWithDrift,
&schemaWithDrift{
name: schemaName,
drift: &buf,
},
)
}
}
drift := false
for _, schemaWithDrift := range schemasWithDrift {
empty, err := isEmptySchema(ctx, r, schemaWithDrift.name)
if err != nil {
return err
}
if empty {
continue
}
drift = true
out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Schema drift detected for %s", schemaWithDrift.name))
if verbose {
out.Write(schemaWithDrift.drift.String())
}
}
if !drift {
return nil
}
out.WriteLine(output.Linef(
output.EmojiLightbulb,
output.StyleItalic,
""+
"Before continuing with this operation, run the migrator's drift command and follow instructions to repair the schema to the expected current state."+
" "+
"See https://docs.sourcegraph.com/admin/how-to/manual_database_migrations#drift for additional instructions."+
"\n",
))
return ErrDatabaseDriftDetected
}
func isEmptySchema(ctx context.Context, r Runner, schemaName string) (bool, error) {
store, err := r.Store(ctx, schemaName)
if err != nil {
return false, err
}
appliedVersions, _, _, err := store.Versions(ctx)
if err != nil {
return false, err
}
return len(appliedVersions) == 0, nil
}

View File

@ -2,22 +2,14 @@ package cliutil
import (
"context"
"database/sql"
"fmt"
"os"
"sort"
"time"
"github.com/urfave/cli/v2"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
oobmigrations "github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/output"
)
@ -48,13 +40,13 @@ func RunOutOfBandMigrations(
if err != nil {
return err
}
db, err := extractDatabase(ctx, r)
db, err := store.ExtractDatabase(ctx, r)
if err != nil {
return err
}
registerMigrators := registerMigratorsWithStore(basestoreExtractor{r})
registerMigrators := registerMigratorsWithStore(store.BasestoreExtractor{r})
if err := runOutOfBandMigrations(
if err := multiversion.RunOutOfBandMigrations(
ctx,
db,
false, // dry-run
@ -82,169 +74,3 @@ func RunOutOfBandMigrations(
},
}
}
func runOutOfBandMigrations(
ctx context.Context,
db database.DB,
dryRun bool,
up bool,
animateProgress bool,
registerMigrations oobmigration.RegisterMigratorsFunc,
out *output.Output,
ids []int,
) (err error) {
if len(ids) != 0 {
out.WriteLine(output.Linef(output.EmojiFingerPointRight, output.StyleReset, "Running out of band migrations %v", ids))
if dryRun {
return nil
}
}
store := oobmigration.NewStoreWithDB(db)
runner := outOfBandMigrationRunnerWithStore(store)
if err := runner.SynchronizeMetadata(ctx); err != nil {
return err
}
if err := registerMigrations(ctx, db, runner); err != nil {
return err
}
if len(ids) == 0 {
migrations, err := store.List(ctx)
if err != nil {
return err
}
for _, migration := range migrations {
ids = append(ids, migration.ID)
}
}
sort.Ints(ids)
if dryRun {
return nil
}
if err := runner.UpdateDirection(ctx, ids, !up); err != nil {
return err
}
go runner.StartPartial(ids)
defer runner.Stop()
defer func() {
if err == nil {
out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Out of band migrations complete"))
} else {
out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Out of band migrations failed: %s", err))
}
}()
updateMigrationProgress, cleanup := makeOutOfBandMigrationProgressUpdater(out, ids, animateProgress)
defer cleanup()
ticker := time.NewTicker(time.Second).C
for {
migrations, err := getMigrations(ctx, store, ids)
if err != nil {
return err
}
sort.Slice(migrations, func(i, j int) bool { return migrations[i].ID < migrations[j].ID })
for i, m := range migrations {
updateMigrationProgress(i, m)
}
complete := true
for _, m := range migrations {
if !m.Complete() {
if m.ApplyReverse && m.NonDestructive {
continue
}
complete = false
}
}
if complete {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker:
}
}
}
// makeOutOfBandMigrationProgressUpdater returns a two functions: `update` should be called
// when the updates to the progress of an out-of-band migration are made and should be reflected
// in the output; and `cleanup` should be called on defer when the progress object should be
// disposed.
func makeOutOfBandMigrationProgressUpdater(out *output.Output, ids []int, animateProgress bool) (
update func(i int, m oobmigration.Migration),
cleanup func(),
) {
if !animateProgress || shouldDisableProgressAnimation() {
update = func(i int, m oobmigration.Migration) {
out.WriteLine(output.Linef("", output.StyleReset, "Migration #%d is %.2f%% complete", m.ID, m.Progress*100))
}
return update, func() {}
}
bars := make([]output.ProgressBar, 0, len(ids))
for _, id := range ids {
bars = append(bars, output.ProgressBar{
Label: fmt.Sprintf("Migration #%d", id),
Max: 1.0,
})
}
progress := out.Progress(bars, nil)
return func(i int, m oobmigration.Migration) { progress.SetValue(i, m.Progress) }, progress.Destroy
}
// shouldDisableProgressAnimation determines if progress bars should be avoided because the log level
// will create output that interferes with a stable canvas. In effect, this adds the -disable-animation
// flag when SRC_LOG_LEVEL is info or debug.
func shouldDisableProgressAnimation() bool {
switch log.Level(os.Getenv(log.EnvLogLevel)) {
case log.LevelDebug:
return true
case log.LevelInfo:
return true
default:
return false
}
}
func getMigrations(ctx context.Context, store *oobmigration.Store, ids []int) ([]oobmigration.Migration, error) {
migrations := make([]oobmigration.Migration, 0, len(ids))
for _, id := range ids {
migration, ok, err := store.GetByID(ctx, id)
if err != nil {
return nil, err
}
if !ok {
return nil, errors.Newf("unknown migration id %d", id)
}
migrations = append(migrations, migration)
}
sort.Slice(migrations, func(i, j int) bool { return migrations[i].ID < migrations[j].ID })
return migrations, nil
}
type basestoreExtractor struct {
runner Runner
}
func (r basestoreExtractor) Store(ctx context.Context, schemaName string) (*basestore.Store, error) {
shareableStore, err := extractDB(ctx, r.runner, schemaName)
if err != nil {
return nil, err
}
return basestore.NewWithHandle(basestore.NewHandleWithDB(log.NoOp(), shareableStore, sql.TxOptions{})), nil
}

View File

@ -7,6 +7,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/version"
"github.com/sourcegraph/sourcegraph/internal/version/upgradestore"
@ -105,7 +106,7 @@ func Up(commandName string, factory RunnerFactory, outFactory OutputFactory, dev
return err
}
db, err := extractDatabase(ctx, r)
db, err := store.ExtractDatabase(ctx, r)
if err != nil {
return err
}

View File

@ -6,7 +6,10 @@ import (
"github.com/urfave/cli/v2"
"github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations"
"github.com/sourcegraph/sourcegraph/internal/version"
@ -17,10 +20,10 @@ import (
func Upgrade(
commandName string,
runnerFactory RunnerFactoryWithSchemas,
runnerFactory runner.RunnerFactoryWithSchemas,
outFactory OutputFactory,
registerMigrators func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc,
expectedSchemaFactories ...ExpectedSchemaFactory,
expectedSchemaFactories ...schemas.ExpectedSchemaFactory,
) *cli.Command {
fromFlag := &cli.StringFlag{
Name: "from",
@ -99,12 +102,11 @@ func Upgrade(
}
// connect to db and get upgrade readiness state
db, err := extractDatabase(ctx, runner)
db, err := store.ExtractDatabase(ctx, runner)
if err != nil {
return errors.Wrap(err, "new db handle")
}
store := upgradestore.New(db)
currentVersion, autoUpgrade, err := store.GetAutoUpgrade(ctx)
currentVersion, autoUpgrade, err := upgradestore.New(db).GetAutoUpgrade(ctx)
if err != nil {
return errors.Wrap(err, "checking auto upgrade")
}
@ -151,7 +153,7 @@ func Upgrade(
// Find the relevant schema and data migrations to perform (and in what order)
// for the given version range.
plan, err := planMigration(from, to, versionRange, interrupts)
plan, err := multiversion.PlanMigration(from, to, versionRange, interrupts)
if err != nil {
return err
}
@ -162,8 +164,9 @@ func Upgrade(
}
// Perform the upgrade on the configured databases.
return runMigration(
return multiversion.RunMigration(
ctx,
db,
runnerFactory,
plan,
privilegedMode,

View File

@ -2,9 +2,7 @@ package cliutil
import (
"context"
"database/sql"
"flag"
"fmt"
"net/http"
"strconv"
"strings"
@ -12,10 +10,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/observation"
@ -46,7 +41,7 @@ func flagHelp(out *output.Output, message string, args ...any) error {
}
// setupRunner initializes and returns the runner associated witht the given schema.
func setupRunner(factory RunnerFactory, schemaNames ...string) (Runner, error) {
func setupRunner(factory RunnerFactory, schemaNames ...string) (*runner.Runner, error) {
r, err := factory(schemaNames)
if err != nil {
return nil, err
@ -56,7 +51,7 @@ func setupRunner(factory RunnerFactory, schemaNames ...string) (Runner, error) {
}
// setupStore initializes and returns the store associated witht the given schema.
func setupStore(ctx context.Context, factory RunnerFactory, schemaName string) (Store, error) {
func setupStore(ctx context.Context, factory RunnerFactory, schemaName string) (runner.Store, error) {
r, err := setupRunner(factory, schemaName)
if err != nil {
return nil, err
@ -144,33 +139,6 @@ func getPivilegedModeFromFlags(cmd *cli.Context, out *output.Output, unprivilege
return runner.ApplyPrivilegedMigrations, nil
}
func extractDatabase(ctx context.Context, r Runner) (database.DB, error) {
db, err := extractDB(ctx, r, "frontend")
if err != nil {
return nil, err
}
return database.NewDB(log.Scoped("migrator", ""), db), nil
}
func extractDB(ctx context.Context, r Runner, schemaName string) (*sql.DB, error) {
store, err := r.Store(ctx, schemaName)
if err != nil {
return nil, err
}
// NOTE: The migration runner package cannot import basestore without
// creating a cyclic import in db connection packages. Hence, we cannot
// embed basestore.ShareableStore here and must "backdoor" extract the
// database connection.
shareableStore, ok := basestore.Raw(store)
if !ok {
return nil, errors.New("store does not support direct database handle access")
}
return shareableStore, nil
}
var migratorObservationCtx = &observation.TestContext
func outOfBandMigrationRunner(db database.DB) *oobmigration.Runner {
@ -185,12 +153,12 @@ func outOfBandMigrationRunnerWithStore(store *oobmigration.Store) *oobmigration.
// or GCS, to report whether the migrator may be operating in an airgapped environment.
func isAirgapped(ctx context.Context) (err error) {
// known good version and filename in both GCS and Github
filename, _ := getSchemaJSONFilename("frontend")
filename, _ := schemas.GetSchemaJSONFilename("frontend")
const version = "v3.41.1"
timedCtx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
url := githubExpectedSchemaPath(filename, version)
url := schemas.GithubExpectedSchemaPath(filename, version)
req, _ := http.NewRequestWithContext(timedCtx, http.MethodHead, url, nil)
resp, gherr := http.DefaultClient.Do(req)
if resp != nil {
@ -200,7 +168,7 @@ func isAirgapped(ctx context.Context) (err error) {
timedCtx, cancel = context.WithTimeout(ctx, time.Second*3)
defer cancel()
url = gcsExpectedSchemaPath(filename, version)
url = schemas.GcsExpectedSchemaPath(filename, version)
req, _ = http.NewRequestWithContext(timedCtx, http.MethodHead, url, nil)
resp, gcserr := http.DefaultClient.Do(req)
if resp != nil {
@ -220,20 +188,6 @@ func isAirgapped(ctx context.Context) (err error) {
return err
}
// getSchemaJSONFilename returns the basename of the JSON-serialized schema in the sg/sg repository.
func getSchemaJSONFilename(schemaName string) (string, error) {
switch schemaName {
case "frontend":
return "internal/database/schema.json", nil
case "codeintel":
fallthrough
case "codeinsights":
return fmt.Sprintf("internal/database/schema.%s.json", schemaName), nil
}
return "", errors.Newf("unknown schema name %q", schemaName)
}
func checkForMigratorUpdate(ctx context.Context) (latest string, hasUpdate bool, err error) {
migratorVersion, migratorPatch, ok := oobmigration.NewVersionAndPatchFromString(version.Version())
if !ok || migratorVersion.Dev {

View File

@ -5,6 +5,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/lib/output"
)
@ -39,7 +40,7 @@ func Validate(commandName string, factory RunnerFactory, outFactory OutputFactor
out.WriteLine(output.Emoji(output.EmojiSuccess, "schema okay!"))
if !skipOutOfBandMigrationsFlag.Get(cmd) {
db, err := extractDatabase(ctx, r)
db, err := store.ExtractDatabase(ctx, r)
if err != nil {
return err
}

View File

@ -15,12 +15,15 @@ go_library(
"compare_triggers.go",
"compare_views.go",
"summary.go",
"util.go",
"util_search.go",
],
importpath = "github.com/sourcegraph/sourcegraph/internal/database/migration/drift",
visibility = ["//:__subpackages__"],
deps = [
"//internal/database/migration/schemas",
"//lib/errors",
"//lib/output",
"@com_github_google_go_cmp//cmp",
],
)

View File

@ -1,4 +1,4 @@
package cliutil
package drift
import (
"fmt"
@ -6,15 +6,15 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/sourcegraph/sourcegraph/internal/database/migration/drift"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/output"
)
var errOutOfSync = errors.Newf("database schema is out of sync")
func displayDriftSummaries(rawOut *output.Output, summaries []drift.Summary) (err error) {
func DisplaySchemaSummaries(rawOut *output.Output, summaries []Summary) (err error) {
out := &preambledOutput{rawOut, false}
for _, summary := range summaries {
displaySummary(out, summary)
err = errOutOfSync
@ -26,7 +26,7 @@ func displayDriftSummaries(rawOut *output.Output, summaries []drift.Summary) (er
return err
}
func displaySummary(out *preambledOutput, summary drift.Summary) {
func displaySummary(out *preambledOutput, summary Summary) {
out.WriteLine(output.Line(output.EmojiFailure, output.StyleBold, summary.Problem()))
if a, b, ok := summary.Diff(); ok {

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "multiversion",
srcs = [
"drift.go",
"plan.go",
"run.go",
"util.go",
],
importpath = "github.com/sourcegraph/sourcegraph/internal/database/migration/multiversion",
visibility = ["//:__subpackages__"],
deps = [
"//internal/database",
"//internal/database/migration/definition",
"//internal/database/migration/drift",
"//internal/database/migration/runner",
"//internal/database/migration/schemas",
"//internal/database/migration/shared",
"//internal/database/migration/store",
"//internal/observation",
"//internal/oobmigration",
"//internal/oobmigration/migrations",
"//internal/version/upgradestore",
"//lib/errors",
"//lib/output",
],
)

View File

@ -0,0 +1,221 @@
package multiversion
import (
"bytes"
"context"
"fmt"
"net/url"
"sort"
"strings"
"github.com/sourcegraph/sourcegraph/internal/database/migration/drift"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/output"
)
// CheckDrift uses given runner to check whether schema drift exists for any
// non-empty database. It returns ErrDatabaseDriftDetected when the schema drift
// exists, and nil error when not.
//
// - The `verbose` indicates whether to collect drift details in the output.
// - The `schemaNames` is the list of schema names to check for drift.
// - The `expectedSchemaFactories` is the means to retrieve the schema.
// definitions at the target version.
func CheckDrift(ctx context.Context, r *runner.Runner, version string, out *output.Output, verbose bool, schemaNames []string, expectedSchemaFactories []schemas.ExpectedSchemaFactory) error {
type schemaWithDrift struct {
name string
drift *bytes.Buffer
}
schemasWithDrift := make([]*schemaWithDrift, 0, len(schemaNames))
for _, schemaName := range schemaNames {
store, err := r.Store(ctx, schemaName)
if err != nil {
return errors.Wrap(err, "get migration store")
}
schemaDescriptions, err := store.Describe(ctx)
if err != nil {
return err
}
schema := schemaDescriptions["public"]
var driftBuff bytes.Buffer
driftOut := output.NewOutput(&driftBuff, output.OutputOpts{})
expectedSchema, err := FetchExpectedSchema(ctx, schemaName, version, driftOut, expectedSchemaFactories)
if err != nil {
return err
}
if err := drift.DisplaySchemaSummaries(driftOut, drift.CompareSchemaDescriptions(schemaName, version, Canonicalize(schema), Canonicalize(expectedSchema))); err != nil {
schemasWithDrift = append(schemasWithDrift,
&schemaWithDrift{
name: schemaName,
drift: &driftBuff,
},
)
}
}
drift := false
for _, schemaWithDrift := range schemasWithDrift {
empty, err := isEmptySchema(ctx, r, schemaWithDrift.name)
if err != nil {
return err
}
if empty {
continue
}
drift = true
out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Schema drift detected for %s", schemaWithDrift.name))
if verbose {
out.Write(schemaWithDrift.drift.String())
}
}
if !drift {
return nil
}
out.WriteLine(output.Linef(
output.EmojiLightbulb,
output.StyleItalic,
""+
"Before continuing with this operation, run the migrator's drift command and follow instructions to repair the schema to the expected current state."+
" "+
"See https://docs.sourcegraph.com/admin/how-to/manual_database_migrations#drift for additional instructions."+
"\n",
))
return ErrDatabaseDriftDetected
}
var ErrDatabaseDriftDetected = errors.New("database drift detected")
func isEmptySchema(ctx context.Context, r *runner.Runner, schemaName string) (bool, error) {
store, err := r.Store(ctx, schemaName)
if err != nil {
return false, err
}
appliedVersions, _, _, err := store.Versions(ctx)
if err != nil {
return false, err
}
return len(appliedVersions) == 0, nil
}
func FetchExpectedSchema(
ctx context.Context,
schemaName string,
version string,
out *output.Output,
expectedSchemaFactories []schemas.ExpectedSchemaFactory,
) (schemas.SchemaDescription, error) {
filename, err := schemas.GetSchemaJSONFilename(schemaName)
if err != nil {
return schemas.SchemaDescription{}, err
}
out.WriteLine(output.Line(output.EmojiInfo, output.StyleReset, "Locating schema description"))
for i, factory := range expectedSchemaFactories {
matches := false
patterns := factory.VersionPatterns()
for _, pattern := range patterns {
if pattern.MatchString(version) {
matches = true
break
}
}
if len(patterns) > 0 && !matches {
continue
}
resourcePath := factory.ResourcePath(filename, version)
expectedSchema, err := factory.CreateFromPath(ctx, resourcePath)
if err != nil {
suffix := ""
if i < len(expectedSchemaFactories)-1 {
suffix = " Will attempt a fallback source."
}
out.WriteLine(output.Linef(output.EmojiInfo, output.StyleReset, "Reading schema definition in %s (%s)... Schema not found (%s).%s", factory.Name(), resourcePath, err, suffix))
continue
}
out.WriteLine(output.Linef(output.EmojiSuccess, output.StyleReset, "Schema found in %s (%s).", factory.Name(), resourcePath))
return expectedSchema, nil
}
exampleMap := map[string]struct{}{}
failedPaths := map[string]struct{}{}
for _, factory := range expectedSchemaFactories {
for _, pattern := range factory.VersionPatterns() {
if !pattern.MatchString(version) {
exampleMap[pattern.Example()] = struct{}{}
} else {
failedPaths[factory.ResourcePath(filename, version)] = struct{}{}
}
}
}
versionExamples := make([]string, 0, len(exampleMap))
for pattern := range exampleMap {
versionExamples = append(versionExamples, pattern)
}
sort.Strings(versionExamples)
paths := make([]string, 0, len(exampleMap))
for path := range failedPaths {
if u, err := url.Parse(path); err == nil && (u.Scheme == "http" || u.Scheme == "https") {
paths = append(paths, path)
}
}
sort.Strings(paths)
if len(paths) > 0 {
var additionalHints string
if len(versionExamples) > 0 {
additionalHints = fmt.Sprintf(
"Alternative, provide a different version that matches one of the following patterns: \n - %s\n", strings.Join(versionExamples, "\n - "),
)
}
out.WriteLine(output.Linef(
output.EmojiLightbulb,
output.StyleFailure,
"Schema not found. "+
"Check if the following resources exist. "+
"If they do, then the context in which this migrator is being run may not be permitted to reach the public internet."+
"\n - %s\n%s",
strings.Join(paths, "\n - "),
additionalHints,
))
} else if len(versionExamples) > 0 {
out.WriteLine(output.Linef(
output.EmojiLightbulb,
output.StyleFailure,
"Schema not found. Ensure your supplied version matches one of the following patterns: \n - %s\n", strings.Join(versionExamples, "\n - "),
))
}
return schemas.SchemaDescription{}, errors.New("failed to locate target schema description")
}
func Canonicalize(schemaDescription schemas.SchemaDescription) schemas.SchemaDescription {
schemas.Canonicalize(schemaDescription)
filtered := schemaDescription.Tables[:0]
for i, table := range schemaDescription.Tables {
if table.Name == "migration_logs" {
continue
}
filtered = append(filtered, schemaDescription.Tables[i])
}
schemaDescription.Tables = filtered
return schemaDescription
}

View File

@ -0,0 +1,114 @@
package multiversion
import (
"github.com/sourcegraph/sourcegraph/internal/database/migration/definition"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/shared"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type MigrationPlan struct {
// the source and target instance versions
from, to oobmigration.Version
// the stitched schema migration definitions over the entire version range by schema name
stitchedDefinitionsBySchemaName map[string]*definition.Definitions
// the sequence of migration steps over the stiched schema migration definitions; we can't
// simply apply all schema migrations as out-of-band migration can only run within a certain
// slice of the schema's definition where that out-of-band migration was defined
steps []MigrationStep
}
type MigrationStep struct {
// the target version to migrate to
instanceVersion oobmigration.Version
// the leaf migrations of this version by schema name
schemaMigrationLeafIDsBySchemaName map[string][]int
// the set of out-of-band migrations that must complete before schema migrations begin
// for the following minor instance version
outOfBandMigrationIDs []int
}
func PlanMigration(from, to oobmigration.Version, versionRange []oobmigration.Version, interrupts []oobmigration.MigrationInterrupt) (MigrationPlan, error) {
versionTags := make([]string, 0, len(versionRange))
for _, version := range versionRange {
versionTags = append(versionTags, version.GitTag())
}
// Retrieve relevant stitched migrations for this version range
stitchedMigrationBySchemaName, err := filterStitchedMigrationsForTags(versionTags)
if err != nil {
return MigrationPlan{}, err
}
// Extract/rotate stitched migration definitions so we can query them by schem name
stitchedDefinitionsBySchemaName := make(map[string]*definition.Definitions, len(stitchedMigrationBySchemaName))
for schemaName, stitchedMigration := range stitchedMigrationBySchemaName {
stitchedDefinitionsBySchemaName[schemaName] = stitchedMigration.Definitions
}
// Extract/rotate leaf identifiers so we can query them by version/git-tag first
leafIDsBySchemaNameByTag := make(map[string]map[string][]int, len(versionRange))
for schemaName, stitchedMigration := range stitchedMigrationBySchemaName {
for tag, bounds := range stitchedMigration.BoundsByRev {
if _, ok := leafIDsBySchemaNameByTag[tag]; !ok {
leafIDsBySchemaNameByTag[tag] = map[string][]int{}
}
leafIDsBySchemaNameByTag[tag][schemaName] = bounds.LeafIDs
}
}
//
// Interleave out-of-band migration interrupts and schema migrations
steps := make([]MigrationStep, 0, len(interrupts)+1)
for _, interrupt := range interrupts {
steps = append(steps, MigrationStep{
instanceVersion: interrupt.Version,
schemaMigrationLeafIDsBySchemaName: leafIDsBySchemaNameByTag[interrupt.Version.GitTag()],
outOfBandMigrationIDs: interrupt.MigrationIDs,
})
}
steps = append(steps, MigrationStep{
instanceVersion: to,
schemaMigrationLeafIDsBySchemaName: leafIDsBySchemaNameByTag[to.GitTag()],
outOfBandMigrationIDs: nil, // all required out of band migrations have already completed
})
return MigrationPlan{
from: from,
to: to,
stitchedDefinitionsBySchemaName: stitchedDefinitionsBySchemaName,
steps: steps,
}, nil
}
// filterStitchedMigrationsForTags returns a copy of the pre-compiled stitchedMap with references
// to tags outside of the given set removed. This allows a migrator instance that knows the migration
// path from X -> Y to also know the path from any partial migration X <= W -> Z <= Y.
func filterStitchedMigrationsForTags(tags []string) (map[string]shared.StitchedMigration, error) {
filteredStitchedMigrationBySchemaName := make(map[string]shared.StitchedMigration, len(schemas.SchemaNames))
for _, schemaName := range schemas.SchemaNames {
boundsByRev := make(map[string]shared.MigrationBounds, len(tags))
for _, tag := range tags {
bounds, ok := shared.StitchedMigationsBySchemaName[schemaName].BoundsByRev[tag]
if !ok {
return nil, errors.Newf("unknown tag %q", tag)
}
boundsByRev[tag] = bounds
}
filteredStitchedMigrationBySchemaName[schemaName] = shared.StitchedMigration{
Definitions: shared.StitchedMigationsBySchemaName[schemaName].Definitions,
BoundsByRev: boundsByRev,
}
}
return filteredStitchedMigrationBySchemaName, nil
}

View File

@ -0,0 +1,257 @@
package multiversion
import (
"context"
"fmt"
"sort"
"time"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/output"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/migration/definition"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations"
"github.com/sourcegraph/sourcegraph/internal/version/upgradestore"
)
type Store interface {
WithMigrationLog(ctx context.Context, definition definition.Definition, up bool, f func() error) error
Describe(ctx context.Context) (map[string]schemas.SchemaDescription, error)
Versions(ctx context.Context) (appliedVersions, pendingVersions, failedVersions []int, _ error)
}
func RunMigration(
ctx context.Context,
db database.DB,
runnerFactory runner.RunnerFactoryWithSchemas,
plan MigrationPlan,
privilegedMode runner.PrivilegedMode,
privilegedHashes []string,
skipVersionCheck bool,
skipDriftCheck bool,
dryRun bool,
up bool,
animateProgress bool,
registerMigratorsWithStore func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc,
expectedSchemaFactories []schemas.ExpectedSchemaFactory,
out *output.Output,
) error {
var runnerSchemas []*schemas.Schema
for _, schemaName := range schemas.SchemaNames {
runnerSchemas = append(runnerSchemas, &schemas.Schema{
Name: schemaName,
MigrationsTableName: schemas.MigrationsTableName(schemaName),
Definitions: plan.stitchedDefinitionsBySchemaName[schemaName],
})
}
r, err := runnerFactory(schemas.SchemaNames, runnerSchemas)
if err != nil {
return err
}
registerMigrators := registerMigratorsWithStore(store.BasestoreExtractor{Runner: r})
// Note: Error is correctly checked here; we want to use the return value
// `patch` below but only if we can best-effort fetch it. We want to allow
// the user to skip erroring here if they are explicitly skipping this
// version check.
version, patch, ok, err := GetServiceVersion(ctx, db)
if !skipVersionCheck {
if err != nil {
return err
}
if !ok {
return errors.Newf("version assertion failed: unknown version != %q. Re-invoke with --skip-version-check to ignore this check", plan.from)
}
if oobmigration.CompareVersions(version, plan.from) != oobmigration.VersionOrderEqual {
return errors.Newf("version assertion failed: %q != %q. Re-invoke with --skip-version-check to ignore this check", version, plan.from)
}
}
if !skipDriftCheck {
if err := CheckDrift(ctx, r, plan.from.GitTagWithPatch(patch), out, false, schemas.SchemaNames, expectedSchemaFactories); err != nil {
return err
}
}
for i, step := range plan.steps {
out.WriteLine(output.Linef(
output.EmojiFingerPointRight,
output.StyleReset,
"Migrating to v%s (step %d of %d)",
step.instanceVersion,
i+1,
len(plan.steps),
))
out.WriteLine(output.Line(output.EmojiFingerPointRight, output.StyleReset, "Running schema migrations"))
if !dryRun {
operationType := runner.MigrationOperationTypeTargetedUp
if !up {
operationType = runner.MigrationOperationTypeTargetedDown
}
operations := make([]runner.MigrationOperation, 0, len(step.schemaMigrationLeafIDsBySchemaName))
for schemaName, leafMigrationIDs := range step.schemaMigrationLeafIDsBySchemaName {
operations = append(operations, runner.MigrationOperation{
SchemaName: schemaName,
Type: operationType,
TargetVersions: leafMigrationIDs,
})
}
if err := r.Run(ctx, runner.Options{
Operations: operations,
PrivilegedMode: privilegedMode,
MatchPrivilegedHash: func(hash string) bool {
for _, candidate := range privilegedHashes {
if hash == candidate {
return true
}
}
return false
},
IgnoreSingleDirtyLog: true,
IgnoreSinglePendingLog: true,
}); err != nil {
return err
}
out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Schema migrations complete"))
}
if len(step.outOfBandMigrationIDs) > 0 {
if err := RunOutOfBandMigrations(
ctx,
db,
dryRun,
up,
animateProgress,
registerMigrators,
out,
step.outOfBandMigrationIDs,
); err != nil {
return err
}
}
}
if !dryRun {
// After successful migration, set the new instance version. The frontend still checks on
// startup that the previously running instance version was only one minor version away.
// If we run the upload without updating that value, the new instance will refuse to
// start without manual modification of the database.
//
// Note that we don't want to get rid of that check entirely from the frontend, as we do
// still want to catch the cases where site-admins "jump forward" several versions while
// using the standard upgrade path (not a multi-version upgrade that handles these cases).
if err := upgradestore.New(db).SetServiceVersion(ctx, fmt.Sprintf("%d.%d.0", plan.to.Major, plan.to.Minor)); err != nil {
return err
}
}
return nil
}
func RunOutOfBandMigrations(
ctx context.Context,
db database.DB,
dryRun bool,
up bool,
animateProgress bool,
registerMigrations oobmigration.RegisterMigratorsFunc,
out *output.Output,
ids []int,
) (err error) {
if len(ids) != 0 {
out.WriteLine(output.Linef(output.EmojiFingerPointRight, output.StyleReset, "Running out of band migrations %v", ids))
if dryRun {
return nil
}
}
store := oobmigration.NewStoreWithDB(db)
runner := oobmigration.NewRunnerWithDB(&observation.TestContext, db, time.Second)
if err := runner.SynchronizeMetadata(ctx); err != nil {
return err
}
if err := registerMigrations(ctx, db, runner); err != nil {
return err
}
if len(ids) == 0 {
migrations, err := store.List(ctx)
if err != nil {
return err
}
for _, migration := range migrations {
ids = append(ids, migration.ID)
}
}
sort.Ints(ids)
if dryRun {
return nil
}
if err := runner.UpdateDirection(ctx, ids, !up); err != nil {
return err
}
go runner.StartPartial(ids)
defer runner.Stop()
defer func() {
if err == nil {
out.WriteLine(output.Line(output.EmojiSuccess, output.StyleSuccess, "Out of band migrations complete"))
} else {
out.WriteLine(output.Linef(output.EmojiFailure, output.StyleFailure, "Out of band migrations failed: %s", err))
}
}()
updateMigrationProgress, cleanup := oobmigration.MakeProgressUpdater(out, ids, animateProgress)
defer cleanup()
ticker := time.NewTicker(time.Second).C
for {
migrations, err := store.GetByIDs(ctx, ids)
if err != nil {
return err
}
sort.Slice(migrations, func(i, j int) bool { return migrations[i].ID < migrations[j].ID })
for i, m := range migrations {
updateMigrationProgress(i, m)
}
complete := true
for _, m := range migrations {
if !m.Complete() {
if m.ApplyReverse && m.NonDestructive {
continue
}
complete = false
}
}
if complete {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker:
}
}
}

View File

@ -0,0 +1,24 @@
package multiversion
import (
"context"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/version/upgradestore"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func GetServiceVersion(ctx context.Context, db database.DB) (oobmigration.Version, int, bool, error) {
versionStr, ok, err := upgradestore.New(db).GetServiceVersion(ctx)
if err != nil || !ok {
return oobmigration.Version{}, 0, ok, err
}
version, patch, ok := oobmigration.NewVersionAndPatchFromString(versionStr)
if !ok {
return oobmigration.Version{}, 0, ok, errors.Newf("cannot parse version: %q - expected [v]X.Y[.Z]", versionStr)
}
return version, patch, true, nil
}

View File

@ -0,0 +1,44 @@
package migration
import (
"database/sql"
"strings"
connections "github.com/sourcegraph/sourcegraph/internal/database/connections/live"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/migration/store"
"github.com/sourcegraph/sourcegraph/internal/database/postgresdsn"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/lib/output"
)
func NewRunnerWithSchemas(
observationCtx *observation.Context,
out *output.Output,
appName string,
schemaNames []string,
schemas []*schemas.Schema,
) (*runner.Runner, error) {
dsns, err := postgresdsn.DSNsBySchema(schemaNames)
if err != nil {
return nil, err
}
var dsnsStrings []string
for schema, dsn := range dsns {
dsnsStrings = append(dsnsStrings, schema+" => "+dsn)
}
out.WriteLine(output.Linef(output.EmojiInfo, output.StyleGrey, "Connection DSNs used: %s", strings.Join(dsnsStrings, ", ")))
storeFactory := func(db *sql.DB, migrationsTable string) connections.Store {
return connections.NewStoreShim(store.NewWithDB(observationCtx, db, migrationsTable))
}
r, err := connections.RunnerFromDSNsWithSchemas(out, observationCtx.Logger, dsns, appName, storeFactory, schemas)
if err != nil {
return nil, err
}
return r, nil
}

View File

@ -14,6 +14,8 @@ import (
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type RunnerFactoryWithSchemas func(schemaNames []string, schemas []*schemas.Schema) (*Runner, error)
type Runner struct {
logger log.Logger
storeFactoryCaches map[string]*storeFactoryCache

View File

@ -5,6 +5,7 @@ go_library(
name = "schemas",
srcs = [
"description.go",
"drift_schema.go",
"formatter.go",
"formatter_json.go",
"formatter_psql.go",

View File

@ -1,4 +1,4 @@
package cliutil
package schemas
import (
"context"
@ -9,7 +9,6 @@ import (
"path/filepath"
"strings"
descriptions "github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/lazyregexp"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -22,7 +21,7 @@ type ExpectedSchemaFactory interface {
Name() string
VersionPatterns() []NamedRegexp
ResourcePath(filename, version string) string
CreateFromPath(ctx context.Context, path string) (descriptions.SchemaDescription, error)
CreateFromPath(ctx context.Context, path string) (SchemaDescription, error)
}
type NamedRegexp struct {
@ -43,9 +42,9 @@ var (
)
// GitHubExpectedSchemaFactory reads schema definitions from the sourcegraph/sourcegraph repository via GitHub's API.
var GitHubExpectedSchemaFactory = NewExpectedSchemaFactory("GitHub", allPatterns, githubExpectedSchemaPath, fetchSchema)
var GitHubExpectedSchemaFactory = NewExpectedSchemaFactory("GitHub", allPatterns, GithubExpectedSchemaPath, fetchSchema)
func githubExpectedSchemaPath(filename, version string) string {
func GithubExpectedSchemaPath(filename, version string) string {
return fmt.Sprintf("https://raw.githubusercontent.com/sourcegraph/sourcegraph/%s/%s", version, filename)
}
@ -54,18 +53,18 @@ func githubExpectedSchemaPath(filename, version string) string {
// been backfilled to this bucket by hand.
//
// See the ./drift-schemas directory for more details on how this data was generated.
var GCSExpectedSchemaFactory = NewExpectedSchemaFactory("GCS", []NamedRegexp{tagPattern}, gcsExpectedSchemaPath, fetchSchema)
var GCSExpectedSchemaFactory = NewExpectedSchemaFactory("GCS", []NamedRegexp{tagPattern}, GcsExpectedSchemaPath, fetchSchema)
func gcsExpectedSchemaPath(filename, version string) string {
func GcsExpectedSchemaPath(filename, version string) string {
return fmt.Sprintf("https://storage.googleapis.com/sourcegraph-assets/migrations/drift/%s-%s", version, strings.ReplaceAll(filename, "/", "_"))
}
// LocalExpectedSchemaFactory reads schema definitions from a local directory baked into the migrator image.
var LocalExpectedSchemaFactory = NewExpectedSchemaFactory("Local file", []NamedRegexp{tagPattern}, localSchemaPath, ReadSchemaFromFile)
var LocalExpectedSchemaFactory = NewExpectedSchemaFactory("Local file", []NamedRegexp{tagPattern}, LocalSchemaPath, ReadSchemaFromFile)
const migratorImageDescriptionPrefix = "/schema-descriptions"
func localSchemaPath(filename, version string) string {
func LocalSchemaPath(filename, version string) string {
return filepath.Join(migratorImageDescriptionPrefix, fmt.Sprintf("%s-%s", version, strings.ReplaceAll(filename, "/", "_")))
}
@ -76,32 +75,32 @@ func NewExplicitFileSchemaFactory(filename string) ExpectedSchemaFactory {
}
// fetchSchema makes an HTTP GET request to the given URL and reads the schema description from the response.
func fetchSchema(ctx context.Context, url string) (descriptions.SchemaDescription, error) {
func fetchSchema(ctx context.Context, url string) (SchemaDescription, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return descriptions.SchemaDescription{}, err
return SchemaDescription{}, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return descriptions.SchemaDescription{}, err
return SchemaDescription{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return descriptions.SchemaDescription{}, errors.Newf("HTTP %d: %s", resp.StatusCode, url)
return SchemaDescription{}, errors.Newf("HTTP %d: %s", resp.StatusCode, url)
}
var schemaDescription descriptions.SchemaDescription
var schemaDescription SchemaDescription
err = json.NewDecoder(resp.Body).Decode(&schemaDescription)
return schemaDescription, err
}
// ReadSchemaFromFile reads a schema description from the given filename.
func ReadSchemaFromFile(ctx context.Context, filename string) (descriptions.SchemaDescription, error) {
func ReadSchemaFromFile(ctx context.Context, filename string) (SchemaDescription, error) {
f, err := os.Open(filename)
if err != nil {
return descriptions.SchemaDescription{}, err
return SchemaDescription{}, err
}
defer func() {
if closeErr := f.Close(); closeErr != nil {
@ -109,7 +108,7 @@ func ReadSchemaFromFile(ctx context.Context, filename string) (descriptions.Sche
}
}()
var schemaDescription descriptions.SchemaDescription
var schemaDescription SchemaDescription
err = json.NewDecoder(f).Decode(&schemaDescription)
return schemaDescription, err
}
@ -121,14 +120,14 @@ type expectedSchemaFactory struct {
name string
versionPatterns []NamedRegexp
resourcePathFunc func(filename, version string) string
createFromPathFunc func(ctx context.Context, path string) (descriptions.SchemaDescription, error)
createFromPathFunc func(ctx context.Context, path string) (SchemaDescription, error)
}
func NewExpectedSchemaFactory(
name string,
versionPatterns []NamedRegexp,
resourcePathFunc func(filename, version string) string,
createFromPathFunc func(ctx context.Context, path string) (descriptions.SchemaDescription, error),
createFromPathFunc func(ctx context.Context, path string) (SchemaDescription, error),
) ExpectedSchemaFactory {
return &expectedSchemaFactory{
name: name,
@ -150,6 +149,6 @@ func (f expectedSchemaFactory) ResourcePath(filename, version string) string {
return f.resourcePathFunc(filename, version)
}
func (f expectedSchemaFactory) CreateFromPath(ctx context.Context, path string) (descriptions.SchemaDescription, error) {
func (f expectedSchemaFactory) CreateFromPath(ctx context.Context, path string) (SchemaDescription, error) {
return f.createFromPathFunc(ctx, path)
}

View File

@ -86,3 +86,17 @@ func FilterSchemasByName(schemas []*Schema, targetNames []string) []*Schema {
return filtered
}
// getSchemaJSONFilename returns the basename of the JSON-serialized schema in the sg/sg repository.
func GetSchemaJSONFilename(schemaName string) (string, error) {
switch schemaName {
case "frontend":
return "internal/database/schema.json", nil
case "codeintel":
fallthrough
case "codeinsights":
return fmt.Sprintf("internal/database/schema.%s.json", schemaName), nil
}
return "", errors.Newf("unknown schema name %q", schemaName)
}

View File

@ -6,24 +6,35 @@ go_library(
srcs = [
"describe.go",
"describe_scan.go",
"extractor.go",
"observability.go",
"registration.go",
"store.go",
],
importpath = "github.com/sourcegraph/sourcegraph/internal/database/migration/store",
visibility = ["//:__subpackages__"],
deps = [
"//internal/conf/conftypes",
"//internal/database",
"//internal/database/basestore",
"//internal/database/dbutil",
"//internal/database/locker",
"//internal/database/migration/definition",
"//internal/database/migration/runner",
"//internal/database/migration/schemas",
"//internal/database/migration/shared",
"//internal/database/postgresdsn",
"//internal/jsonc",
"//internal/metrics",
"//internal/observation",
"//internal/oobmigration",
"//internal/oobmigration/migrations",
"//lib/errors",
"//schema",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_lib_pq//:pq",
"@com_github_sourcegraph_log//:log",
"@io_opentelemetry_go_otel//attribute",
],
)

View File

@ -0,0 +1,53 @@
package store
import (
"context"
"database/sql"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type BasestoreExtractor struct {
Runner *runner.Runner
}
func (r BasestoreExtractor) Store(ctx context.Context, schemaName string) (*basestore.Store, error) {
shareableStore, err := ExtractDB(ctx, r.Runner, schemaName)
if err != nil {
return nil, err
}
return basestore.NewWithHandle(basestore.NewHandleWithDB(log.NoOp(), shareableStore, sql.TxOptions{})), nil
}
func ExtractDatabase(ctx context.Context, r *runner.Runner) (database.DB, error) {
db, err := ExtractDB(ctx, r, "frontend")
if err != nil {
return nil, err
}
return database.NewDB(log.Scoped("migrator", ""), db), nil
}
func ExtractDB(ctx context.Context, r *runner.Runner, schemaName string) (*sql.DB, error) {
store, err := r.Store(ctx, schemaName)
if err != nil {
return nil, err
}
// NOTE: The migration runner package cannot import basestore without
// creating a cyclic import in db connection packages. Hence, we cannot
// embed basestore.ShareableStore here and must "backdoor" extract the
// database connection.
shareableStore, ok := basestore.Raw(store)
if !ok {
return nil, errors.New("store does not support direct database handle access")
}
return shareableStore, nil
}

View File

@ -1,4 +1,4 @@
package shared
package store
import (
"context"
@ -11,17 +11,52 @@ import (
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
"github.com/sourcegraph/sourcegraph/internal/database/postgresdsn"
"github.com/sourcegraph/sourcegraph/internal/jsonc"
"github.com/sourcegraph/sourcegraph/internal/oobmigration"
"github.com/sourcegraph/sourcegraph/internal/oobmigration/migrations"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/schema"
)
type RegisterMigratorsUsingConfAndStoreFactoryFunc func(
ctx context.Context,
db database.DB,
runner *oobmigration.Runner,
conf conftypes.UnifiedQuerier,
storeFactory migrations.StoreFactory,
) error
func ComposeRegisterMigratorsFuncs(fnsFromConfAndStoreFactory ...RegisterMigratorsUsingConfAndStoreFactoryFunc) func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc {
return func(storeFactory migrations.StoreFactory) oobmigration.RegisterMigratorsFunc {
return func(ctx context.Context, db database.DB, runner *oobmigration.Runner) error {
conf, err := newStaticConf(ctx, db)
if err != nil {
return err
}
fns := make([]oobmigration.RegisterMigratorsFunc, 0, len(fnsFromConfAndStoreFactory))
for _, f := range fnsFromConfAndStoreFactory {
f := f // avoid loop capture
if f == nil {
continue
}
fns = append(fns, func(ctx context.Context, db database.DB, runner *oobmigration.Runner) error {
return f(ctx, db, runner, conf, storeFactory)
})
}
return oobmigration.ComposeRegisterMigratorsFuncs(fns...)(ctx, db, runner)
}
}
}
type staticConf struct {
serviceConnections conftypes.ServiceConnections
siteConfig schema.SiteConfiguration
}
func newStaticConf(ctx context.Context, db database.DB) (*staticConf, error) {
serviceConnections, err := serviceConnections()
serviceConnections, err := migrationServiceConnections()
if err != nil {
return nil, err
}
@ -40,7 +75,7 @@ func newStaticConf(ctx context.Context, db database.DB) (*staticConf, error) {
func (c staticConf) ServiceConnections() conftypes.ServiceConnections { return c.serviceConnections }
func (c staticConf) SiteConfig() schema.SiteConfiguration { return c.siteConfig }
func serviceConnections() (conftypes.ServiceConnections, error) {
func migrationServiceConnections() (conftypes.ServiceConnections, error) {
dsns, err := postgresdsn.DSNsBySchema(schemas.SchemaNames)
if err != nil {
return conftypes.ServiceConnections{}, err

View File

@ -9,6 +9,7 @@ go_library(
"interrupts.go",
"migrator.go",
"observability.go",
"progress.go",
"registration.go",
"runner.go",
"store.go",
@ -31,6 +32,7 @@ go_library(
"//internal/version",
"//internal/version/upgradestore",
"//lib/errors",
"//lib/output",
"@com_github_derision_test_glock//:glock",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_keegancsmith_sqlf//:sqlf",

View File

@ -30,7 +30,7 @@ func RegisterOSSMigratorsUsingConfAndStoreFactory(
db database.DB,
runner *oobmigration.Runner,
conf conftypes.UnifiedQuerier,
storeFactory StoreFactory,
_ StoreFactory,
) error {
keys, err := keyring.NewRing(ctx, conf.SiteConfig().EncryptionKeys)
if err != nil {

View File

@ -0,0 +1,52 @@
package oobmigration
import (
"fmt"
"os"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/lib/output"
)
// makeOutOfBandMigrationProgressUpdater returns a two functions: `update` should be called
// when the updates to the progress of an out-of-band migration are made and should be reflected
// in the output; and `cleanup` should be called on defer when the progress object should be
// disposed.
func MakeProgressUpdater(out *output.Output, ids []int, animateProgress bool) (
update func(i int, m Migration),
cleanup func(),
) {
if !animateProgress || shouldDisableProgressAnimation() {
update = func(i int, m Migration) {
out.WriteLine(output.Linef("", output.StyleReset, "Migration #%d is %.2f%% complete", m.ID, m.Progress*100))
}
return update, func() {}
}
bars := make([]output.ProgressBar, 0, len(ids))
for _, id := range ids {
bars = append(bars, output.ProgressBar{
Label: fmt.Sprintf("Migration #%d", id),
Max: 1.0,
})
}
progress := out.Progress(bars, nil)
return func(i int, m Migration) { progress.SetValue(i, m.Progress) }, progress.Destroy
}
// shouldDisableProgressAnimation determines if progress bars should be avoided because the log level
// will create output that interferes with a stable canvas. In effect, this adds the -disable-animation
// flag when SRC_LOG_LEVEL is info or debug.
func shouldDisableProgressAnimation() bool {
switch log.Level(os.Getenv(log.EnvLogLevel)) {
case log.LevelDebug:
return true
case log.LevelInfo:
return true
default:
return false
}
}

View File

@ -29,7 +29,7 @@ func ValidateOutOfBandMigrationRunner(ctx context.Context, db database.DB, runne
return nil
}
firstSemverString, ok, err := upgradestore.New(db).GetFirstServiceVersion(ctx, "frontend")
firstSemverString, ok, err := upgradestore.New(db).GetFirstServiceVersion(ctx)
if err != nil {
return errors.Wrap(err, "failed to retrieve first instance version")
}

View File

@ -35,8 +35,8 @@ func NewWith(db basestore.TransactableHandle) *store {
// GetFirstServiceVersion returns the first version registered for the given Sourcegraph service. This
// method will return a false-valued flag if UpdateServiceVersion has never been called for the given
// service.
func (s *store) GetFirstServiceVersion(ctx context.Context, service string) (string, bool, error) {
version, ok, err := basestore.ScanFirstString(s.db.Query(ctx, sqlf.Sprintf(getFirstServiceVersionQuery, service)))
func (s *store) GetFirstServiceVersion(ctx context.Context) (string, bool, error) {
version, ok, err := basestore.ScanFirstString(s.db.Query(ctx, sqlf.Sprintf(getFirstServiceVersionQuery, "frontend")))
return version, ok, filterMissingRelationError(err)
}
@ -47,8 +47,8 @@ SELECT first_version FROM versions WHERE service = %s
// GetServiceVersion returns the previous version registered for the given Sourcegraph service. This
// method will return a false-valued flag if UpdateServiceVersion has never been called for the given
// service.
func (s *store) GetServiceVersion(ctx context.Context, service string) (string, bool, error) {
version, ok, err := basestore.ScanFirstString(s.db.Query(ctx, sqlf.Sprintf(getServiceVersionQuery, service)))
func (s *store) GetServiceVersion(ctx context.Context) (string, bool, error) {
version, ok, err := basestore.ScanFirstString(s.db.Query(ctx, sqlf.Sprintf(getServiceVersionQuery, "frontend")))
return version, ok, filterMissingRelationError(err)
}
@ -65,8 +65,8 @@ func (s *store) ValidateUpgrade(ctx context.Context, service, version string) er
// UpdateServiceVersion updates the latest version for the given Sourcegraph service. This method also enforces
// our documented upgrade policy and will return an error (performing no side-effects) if the upgrade is between
// two unsupported versions. See https://docs.sourcegraph.com/#upgrading-sourcegraph.
func (s *store) UpdateServiceVersion(ctx context.Context, service, version string) error {
return s.updateServiceVersion(ctx, service, version, true)
func (s *store) UpdateServiceVersion(ctx context.Context, version string) error {
return s.updateServiceVersion(ctx, "frontend", version, true)
}
func (s *store) updateServiceVersion(ctx context.Context, service, version string, update bool) error {
@ -114,8 +114,8 @@ WHERE versions.version = %s
// SetServiceVersion updates the latest version for the given Sourcegraph service. This method also enforces
// our documented upgrade policy and will return an error (performing no side-effects) if the upgrade is between
// two unsupported versions. See https://docs.sourcegraph.com/#upgrading-sourcegraph.
func (s *store) SetServiceVersion(ctx context.Context, service, version string) error {
return s.db.Exec(ctx, sqlf.Sprintf(setServiceVersionQuery, version, time.Now().UTC(), service))
func (s *store) SetServiceVersion(ctx context.Context, version string) error {
return s.db.Exec(ctx, sqlf.Sprintf(setServiceVersionQuery, version, time.Now().UTC(), "frontend"))
}
const setServiceVersionQuery = `

View File

@ -22,7 +22,7 @@ func TestGetServiceVersion(t *testing.T) {
store := New(db)
t.Run("fresh db", func(t *testing.T) {
_, ok, err := store.GetServiceVersion(ctx, "service")
_, ok, err := store.GetServiceVersion(ctx)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -32,17 +32,17 @@ func TestGetServiceVersion(t *testing.T) {
})
t.Run("after updates", func(t *testing.T) {
if err := store.UpdateServiceVersion(ctx, "service", "1.2.3"); err != nil {
if err := store.UpdateServiceVersion(ctx, "1.2.3"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := store.UpdateServiceVersion(ctx, "service", "1.2.4"); err != nil {
if err := store.UpdateServiceVersion(ctx, "1.2.4"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := store.UpdateServiceVersion(ctx, "service", "1.3.0"); err != nil {
if err := store.UpdateServiceVersion(ctx, "1.3.0"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
version, ok, err := store.GetServiceVersion(ctx, "service")
version, ok, err := store.GetServiceVersion(ctx)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -59,7 +59,7 @@ func TestGetServiceVersion(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
_, ok, err := store.GetServiceVersion(ctx, "service")
_, ok, err := store.GetServiceVersion(ctx)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -75,15 +75,15 @@ func TestSetServiceVersion(t *testing.T) {
db := database.NewDB(logger, dbtest.NewDB(logger, t))
store := New(db)
if err := store.UpdateServiceVersion(ctx, "service", "1.2.3"); err != nil {
if err := store.UpdateServiceVersion(ctx, "1.2.3"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := store.SetServiceVersion(ctx, "service", "1.2.5"); err != nil {
if err := store.SetServiceVersion(ctx, "1.2.5"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
version, _, err := store.GetServiceVersion(ctx, "service")
version, _, err := store.GetServiceVersion(ctx)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -99,7 +99,7 @@ func TestGetFirstServiceVersion(t *testing.T) {
store := New(db)
t.Run("fresh db", func(t *testing.T) {
_, ok, err := store.GetFirstServiceVersion(ctx, "service")
_, ok, err := store.GetFirstServiceVersion(ctx)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -109,17 +109,17 @@ func TestGetFirstServiceVersion(t *testing.T) {
})
t.Run("after updates", func(t *testing.T) {
if err := store.UpdateServiceVersion(ctx, "service", "1.2.3"); err != nil {
if err := store.UpdateServiceVersion(ctx, "1.2.3"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := store.UpdateServiceVersion(ctx, "service", "1.2.4"); err != nil {
if err := store.UpdateServiceVersion(ctx, "1.2.4"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := store.UpdateServiceVersion(ctx, "service", "1.3.0"); err != nil {
if err := store.UpdateServiceVersion(ctx, "1.3.0"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
firstVersion, ok, err := store.GetFirstServiceVersion(ctx, "service")
firstVersion, ok, err := store.GetFirstServiceVersion(ctx)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -136,7 +136,7 @@ func TestGetFirstServiceVersion(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
_, ok, err := store.GetFirstServiceVersion(ctx, "service")
_, ok, err := store.GetFirstServiceVersion(ctx)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -163,12 +163,12 @@ func TestUpdateServiceVersion(t *testing.T) {
{"0.2.0", nil},
{"1.0.0", nil},
{"1.2.0", &UpgradeError{
Service: "service",
Service: "frontend",
Previous: semver.MustParse("1.0.0"),
Latest: semver.MustParse("1.2.0"),
}},
{"2.1.0", &UpgradeError{
Service: "service",
Service: "frontend",
Previous: semver.MustParse("1.0.0"),
Latest: semver.MustParse("2.1.0"),
}},
@ -176,12 +176,12 @@ func TestUpdateServiceVersion(t *testing.T) {
{"non-semantic-version-is-always-valid", nil},
{"1.0.0", nil}, // back to semantic version is allowed
{"2.1.0", &UpgradeError{
Service: "service",
Service: "frontend",
Previous: semver.MustParse("1.0.0"),
Latest: semver.MustParse("2.1.0"),
}}, // upgrade policy violation returns
} {
have := store.UpdateServiceVersion(ctx, "service", tc.version)
have := store.UpdateServiceVersion(ctx, tc.version)
want := tc.err
if !errors.Is(have, want) {
@ -197,7 +197,7 @@ func TestUpdateServiceVersion(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
if err := store.UpdateServiceVersion(ctx, "service", "0.0.1"); err == nil {
if err := store.UpdateServiceVersion(ctx, "0.0.1"); err == nil {
t.Fatalf("expected error, got none")
}
})
@ -214,7 +214,7 @@ func TestValidateUpgrade(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
if err := store.ValidateUpgrade(ctx, "service", "0.0.1"); err != nil {
if err := store.ValidateUpgrade(ctx, "frontend", "0.0.1"); err != nil {
t.Fatalf("unexpected error: %s", err)
}
})

View File

@ -8,6 +8,7 @@ go_library(
"capabilities.go",
"emoji.go",
"line.go",
"logger.go",
"noop_writer.go",
"output.go",
"output_unix.go",
@ -37,6 +38,7 @@ go_library(
"@com_github_mattn_go_runewidth//:go-runewidth",
"@com_github_moby_term//:term",
"@com_github_muesli_termenv//:termenv",
"@com_github_sourcegraph_log//:log",
"@org_golang_x_term//:term",
] + select({
"@io_bazel_rules_go//go/platform:windows": [

View File

@ -1,5 +1,20 @@
package output
var allEmojis = [...]string{
EmojiFailure,
EmojiWarning,
EmojiSuccess,
EmojiInfo,
EmojiLightbulb,
EmojiAsterisk,
EmojiWarningSign,
EmojiFingerPointRight,
EmojiHourglass,
EmojiShrug,
EmojiOk,
EmojiQuestionMark,
}
// Standard emoji for use in output.
const (
EmojiFailure = "❌"

31
lib/output/logger.go Normal file
View File

@ -0,0 +1,31 @@
package output
import (
"bytes"
"github.com/sourcegraph/log"
)
type logFacade struct {
logger log.Logger
}
func OutputFromLogger(logger log.Logger) *Output {
return NewOutput(&logFacade{logger}, OutputOpts{})
}
func (l *logFacade) Write(p []byte) (n int, err error) {
for _, emoji := range allEmojis {
if bytes.HasPrefix(p, []byte(emoji)) {
switch emoji {
case EmojiWarningSign:
l.logger.Warn(string(p[len(emoji):]))
case EmojiFailure, EmojiWarning:
l.logger.Error(string(p[len(emoji):]))
default:
l.logger.Info(string(p[len(emoji):]))
}
}
}
return len(p), nil
}