mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 14:11:44 +00:00
We dived into our go postgres driver and when executing a migration it is executed as a "simple query". Postgres in this case automatically wraps the collection of statements in a transaction, unless it contains transaction statements. So our last attempt at removing the transaction failed. In this attempt we use COMMIT AND CHAIN after each table alter. What this does is commit the current transaction and then starts it up again. From the perspective of the go driver, it is as if there was only one transaction. We then switch the migration to using a transaction to ensure the go drivers clean up the postgres connection in case of failure. IE if a query manually starts a transaction and does not clean up, the connection will be marked as broken for the next person who gets the connection from the pool. By wrapping in go's transaction code the connection will be properly cleaned up. Test Plan: All continuous environments have already succeeded or failed on this migration number. So we will manually run this again against them with the migrator code to ensure the same code paths. If they succeed we will keep code as is, otherwise we will rollback. Additionally we did lots of adhoc testing to understand the characteristics of go and transaction handling. Co-authored-by: Erik Seliger <erikseliger@me.com>
193 lines
7.1 KiB
Diff
193 lines
7.1 KiB
Diff
diff --git a/internal/database/connections/live/BUILD.bazel b/internal/database/connections/live/BUILD.bazel
|
|
index cd397a24b1a..e2b0ba10c9d 100644
|
|
--- a/internal/database/connections/live/BUILD.bazel
|
|
+++ b/internal/database/connections/live/BUILD.bazel
|
|
@@ -35,6 +35,7 @@ go_test(
|
|
],
|
|
deps = [
|
|
"//internal/database/dbtest",
|
|
+ "//internal/database/migration/definition",
|
|
"//internal/database/migration/drift",
|
|
"//internal/database/migration/runner",
|
|
"//internal/database/migration/schemas",
|
|
diff --git a/internal/database/connections/live/migration_test.go b/internal/database/connections/live/migration_test.go
|
|
index 18938f332c6..8880ec58259 100644
|
|
--- a/internal/database/connections/live/migration_test.go
|
|
+++ b/internal/database/connections/live/migration_test.go
|
|
@@ -2,6 +2,7 @@ package connections
|
|
|
|
import (
|
|
"context"
|
|
+ "database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
@@ -12,6 +13,7 @@ import (
|
|
"github.com/sourcegraph/log/logtest"
|
|
|
|
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
|
|
+ "github.com/sourcegraph/sourcegraph/internal/database/migration/definition"
|
|
"github.com/sourcegraph/sourcegraph/internal/database/migration/drift"
|
|
"github.com/sourcegraph/sourcegraph/internal/database/migration/runner"
|
|
"github.com/sourcegraph/sourcegraph/internal/database/migration/schemas"
|
|
@@ -129,7 +131,7 @@ func testMigrationIdempotency(t *testing.T, name string, schema *schemas.Schema)
|
|
|
|
t.Run("idempotent up", func(t *testing.T) {
|
|
for _, definition := range all {
|
|
- if _, err := db.Exec(definition.UpQuery.Query(sqlf.PostgresBindVar)); err != nil {
|
|
+ if err := applyMigration(db, definition, true); err != nil {
|
|
t.Fatalf("failed to perform upgrade of migration %d: %s", definition.ID, err)
|
|
}
|
|
|
|
@@ -139,7 +141,7 @@ func testMigrationIdempotency(t *testing.T, name string, schema *schemas.Schema)
|
|
continue
|
|
}
|
|
|
|
- if _, err := db.Exec(definition.UpQuery.Query(sqlf.PostgresBindVar)); err != nil {
|
|
+ if err := applyMigration(db, definition, true); err != nil {
|
|
t.Fatalf("migration %d is not idempotent%s: %s", definition.ID, formatHint(err), err)
|
|
}
|
|
}
|
|
@@ -149,7 +151,7 @@ func testMigrationIdempotency(t *testing.T, name string, schema *schemas.Schema)
|
|
for i := len(all) - 1; i >= 0; i-- {
|
|
definition := all[i]
|
|
|
|
- if _, err := db.Exec(definition.DownQuery.Query(sqlf.PostgresBindVar)); err != nil {
|
|
+ if err := applyMigration(db, definition, false); err != nil {
|
|
t.Fatalf("failed to perform downgrade of migration %d: %s", definition.ID, err)
|
|
}
|
|
|
|
@@ -159,7 +161,7 @@ func testMigrationIdempotency(t *testing.T, name string, schema *schemas.Schema)
|
|
continue
|
|
}
|
|
|
|
- if _, err := db.Exec(definition.DownQuery.Query(sqlf.PostgresBindVar)); err != nil {
|
|
+ if err := applyMigration(db, definition, false); err != nil {
|
|
t.Fatalf("migration %d is not idempotent%s: %s", definition.ID, formatHint(err), err)
|
|
}
|
|
}
|
|
@@ -183,7 +185,7 @@ func testDownMigrationsDoNotCreateDrift(t *testing.T, name string, schema *schem
|
|
expectedDescription := expectedDescriptions["public"]
|
|
|
|
// Run query up
|
|
- if _, err := db.Exec(definition.UpQuery.Query(sqlf.PostgresBindVar)); err != nil {
|
|
+ if err := applyMigration(db, definition, true); err != nil {
|
|
t.Fatalf("failed to perform upgrade of migration %d: %s", definition.ID, err)
|
|
}
|
|
|
|
@@ -194,7 +196,7 @@ func testDownMigrationsDoNotCreateDrift(t *testing.T, name string, schema *schem
|
|
}
|
|
|
|
// Run query down (should restore previous state)
|
|
- if _, err := db.Exec(definition.DownQuery.Query(sqlf.PostgresBindVar)); err != nil {
|
|
+ if err := applyMigration(db, definition, false); err != nil {
|
|
t.Fatalf("failed to perform downgrade of migration %d: %s", definition.ID, err)
|
|
}
|
|
|
|
@@ -232,7 +234,7 @@ func testDownMigrationsDoNotCreateDrift(t *testing.T, name string, schema *schem
|
|
}
|
|
|
|
// Re-run query up to prepare for next round
|
|
- if _, err := db.Exec(definition.UpQuery.Query(sqlf.PostgresBindVar)); err != nil {
|
|
+ if err := applyMigration(db, definition, true); err != nil {
|
|
t.Fatalf("failed to re-perform upgrade of migration %d: %s", definition.ID, err)
|
|
}
|
|
}
|
|
@@ -264,3 +266,38 @@ func formatHint(err error) string {
|
|
|
|
return ""
|
|
}
|
|
+
|
|
+// applyMigration applies migrations for testing. In real-world, they run inside of a
|
|
+// transaction, sp we mimic that in this helper as well.
|
|
+func applyMigration(db *sql.DB, definition definition.Definition, up bool) (err error) {
|
|
+ type execer interface {
|
|
+ Exec(query string, args ...any) (sql.Result, error)
|
|
+ }
|
|
+ var queryRunner execer = db
|
|
+
|
|
+ if !definition.IsCreateIndexConcurrently {
|
|
+ tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
|
|
+ if err != nil {
|
|
+ return err
|
|
+ }
|
|
+ queryRunner = tx
|
|
+ defer func() {
|
|
+ if err != nil {
|
|
+ err = errors.Append(err, tx.Rollback())
|
|
+ }
|
|
+ err = tx.Commit()
|
|
+ }()
|
|
+ }
|
|
+
|
|
+ if up {
|
|
+ if _, err := queryRunner.Exec(definition.UpQuery.Query(sqlf.PostgresBindVar)); err != nil {
|
|
+ return errors.Wrapf(err, "failed to apply migration %d:\n```\n%s\n```\n", definition.ID, definition.UpQuery.Query(sqlf.PostgresBindVar))
|
|
+ }
|
|
+ } else {
|
|
+ if _, err := queryRunner.Exec(definition.DownQuery.Query(sqlf.PostgresBindVar)); err != nil {
|
|
+ return errors.Wrapf(err, "failed to apply migration %d:\n```\n%s\n```\n", definition.ID, definition.DownQuery.Query(sqlf.PostgresBindVar))
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return nil
|
|
+}
|
|
diff --git a/internal/database/connections/test/store.go b/internal/database/connections/test/store.go
|
|
index bd4439c96a1..df755f98920 100644
|
|
--- a/internal/database/connections/test/store.go
|
|
+++ b/internal/database/connections/test/store.go
|
|
@@ -17,6 +17,7 @@ import (
|
|
// not passed to any underlying persistence layer.
|
|
type memoryStore struct {
|
|
db *sql.DB
|
|
+ tx *sql.Tx
|
|
appliedVersions []int
|
|
pendingVersions []int
|
|
failedVersions []int
|
|
@@ -28,11 +29,25 @@ func newMemoryStore(db *sql.DB) runner.Store {
|
|
}
|
|
}
|
|
|
|
-func (s *memoryStore) Transact(ctx context.Context) (runner.Store, error) {
|
|
- return s, nil
|
|
+func (s *memoryStore) Transact(ctx context.Context) (_ runner.Store, err error) {
|
|
+ if s.tx != nil {
|
|
+ return nil, errors.New("cannot start transaction when another transaction is in progress, call Done before")
|
|
+ }
|
|
+ s.tx, err = s.db.BeginTx(ctx, &sql.TxOptions{})
|
|
+ return s, err
|
|
}
|
|
|
|
func (s *memoryStore) Done(err error) error {
|
|
+ defer func() {
|
|
+ s.tx = nil
|
|
+ }()
|
|
+
|
|
+ if s.tx != nil {
|
|
+ if err != nil {
|
|
+ return errors.Append(err, s.tx.Rollback())
|
|
+ }
|
|
+ return s.tx.Commit()
|
|
+ }
|
|
return err
|
|
}
|
|
|
|
@@ -68,8 +83,16 @@ func (s *memoryStore) IndexStatus(_ context.Context, _, _ string) (shared.IndexS
|
|
return shared.IndexStatus{}, false, nil
|
|
}
|
|
|
|
+type execContexter interface {
|
|
+ ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
|
|
+}
|
|
+
|
|
func (s *memoryStore) exec(ctx context.Context, migration definition.Definition, query *sqlf.Query) error {
|
|
- _, err := s.db.ExecContext(ctx, query.Query(sqlf.PostgresBindVar), query.Args()...)
|
|
+ var db execContexter = s.db
|
|
+ if s.tx != nil {
|
|
+ db = s.tx
|
|
+ }
|
|
+ _, err := db.ExecContext(ctx, query.Query(sqlf.PostgresBindVar), query.Args()...)
|
|
if err != nil {
|
|
s.failedVersions = append(s.failedVersions, migration.ID)
|
|
return err
|