Revert "insights: batch insert series points (#42025)" (#43013)

* Revert "insights: batch insert series points (#42025)"

This reverts commit 6b2b8c6b4b.

* put back sql statement that was removed as dead code
This commit is contained in:
chwarwick 2022-10-14 15:54:56 -04:00 committed by GitHub
parent 8d9fa8dd15
commit c0c0963bf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 238 additions and 98 deletions

View File

@ -7,7 +7,7 @@ If a large number of rows are being inserted into the same table, use of a [batc
The package provides many convenience functions, but basic usage is as follows. An inserter is created with a table name and a list of column names for which values will be supplied. Then, the `Insert` method is called for each row to be inserted. It is expected that the number of values supplied to each call to `Insert` matches the number of columns supplied at construction of the inserter. On each call to `Insert`, if the current batch is full, it will be prepared and sent to the database, leaving an empty batch for future operations. A final call to `Flush` will ensure that any remaining batched rows are sent to the database.
```go
inserter := batch.NewInserter(ctx, db, batch.MaxNumPostgresParameters, "table", "col1", "col2", "col3" /* , ... */)
inserter := batch.NewInserter(ctx, db, "table", "col1", "col2", "col3" /* , ... */)
for /* ... */ {
if err := inserter.Insert(ctx, val1, val2, val3 /* , ... */); err != nil {
@ -54,7 +54,7 @@ Here, we defined the temporary table with the clause `ON COMMIT DROP`, which wil
Next, create and use a batch inserter instance just as described in the previous section, but target the newly created temporary table. Only the columns that are defined on the temporary table need to be supplied when calling the `Insert` method.
```go
inserter := batch.NewInserter(ctx, db, batch.MaxNumPostgresParameters, "temp_table", "col3", "col4")
inserter := batch.NewInserter(ctx, db, "temp_table", "col3", "col4")
for /* ... */ {
if err := inserter.Insert(ctx, val3, val4); err != nil {

View File

@ -110,6 +110,10 @@ func testHistoricalEnqueuer(t *testing.T, p *testParams) *testResults {
}
return 0, nil
})
insightsStore.RecordSeriesPointFunc.SetDefaultHook(func(ctx context.Context, args store.RecordSeriesPointArgs) error {
r.operations = append(r.operations, fmt.Sprintf("recordSeriesPoint(point=%v, repoName=%v)", args.Point.String(), *args.RepoName))
return nil
})
repoStore := NewMockRepoStore()
repos := map[api.RepoName]*types.Repo{}

View File

@ -1598,6 +1598,9 @@ type MockInterface struct {
// CountDataFunc is an instance of a mock function object controlling
// the behavior of the method CountData.
CountDataFunc *InterfaceCountDataFunc
// RecordSeriesPointFunc is an instance of a mock function object
// controlling the behavior of the method RecordSeriesPoint.
RecordSeriesPointFunc *InterfaceRecordSeriesPointFunc
// RecordSeriesPointsFunc is an instance of a mock function object
// controlling the behavior of the method RecordSeriesPoints.
RecordSeriesPointsFunc *InterfaceRecordSeriesPointsFunc
@ -1615,6 +1618,11 @@ func NewMockInterface() *MockInterface {
return
},
},
RecordSeriesPointFunc: &InterfaceRecordSeriesPointFunc{
defaultHook: func(context.Context, RecordSeriesPointArgs) (r0 error) {
return
},
},
RecordSeriesPointsFunc: &InterfaceRecordSeriesPointsFunc{
defaultHook: func(context.Context, []RecordSeriesPointArgs) (r0 error) {
return
@ -1637,6 +1645,11 @@ func NewStrictMockInterface() *MockInterface {
panic("unexpected invocation of MockInterface.CountData")
},
},
RecordSeriesPointFunc: &InterfaceRecordSeriesPointFunc{
defaultHook: func(context.Context, RecordSeriesPointArgs) error {
panic("unexpected invocation of MockInterface.RecordSeriesPoint")
},
},
RecordSeriesPointsFunc: &InterfaceRecordSeriesPointsFunc{
defaultHook: func(context.Context, []RecordSeriesPointArgs) error {
panic("unexpected invocation of MockInterface.RecordSeriesPoints")
@ -1657,6 +1670,9 @@ func NewMockInterfaceFrom(i Interface) *MockInterface {
CountDataFunc: &InterfaceCountDataFunc{
defaultHook: i.CountData,
},
RecordSeriesPointFunc: &InterfaceRecordSeriesPointFunc{
defaultHook: i.RecordSeriesPoint,
},
RecordSeriesPointsFunc: &InterfaceRecordSeriesPointsFunc{
defaultHook: i.RecordSeriesPoints,
},
@ -1773,6 +1789,111 @@ func (c InterfaceCountDataFuncCall) Results() []interface{} {
return []interface{}{c.Result0, c.Result1}
}
// InterfaceRecordSeriesPointFunc describes the behavior when the
// RecordSeriesPoint method of the parent MockInterface instance is invoked.
type InterfaceRecordSeriesPointFunc struct {
defaultHook func(context.Context, RecordSeriesPointArgs) error
hooks []func(context.Context, RecordSeriesPointArgs) error
history []InterfaceRecordSeriesPointFuncCall
mutex sync.Mutex
}
// RecordSeriesPoint delegates to the next hook function in the queue and
// stores the parameter and result values of this invocation.
func (m *MockInterface) RecordSeriesPoint(v0 context.Context, v1 RecordSeriesPointArgs) error {
r0 := m.RecordSeriesPointFunc.nextHook()(v0, v1)
m.RecordSeriesPointFunc.appendCall(InterfaceRecordSeriesPointFuncCall{v0, v1, r0})
return r0
}
// SetDefaultHook sets function that is called when the RecordSeriesPoint
// method of the parent MockInterface instance is invoked and the hook queue
// is empty.
func (f *InterfaceRecordSeriesPointFunc) SetDefaultHook(hook func(context.Context, RecordSeriesPointArgs) error) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// RecordSeriesPoint method of the parent MockInterface instance invokes the
// hook at the front of the queue and discards it. After the queue is empty,
// the default hook function is invoked for any future action.
func (f *InterfaceRecordSeriesPointFunc) PushHook(hook func(context.Context, RecordSeriesPointArgs) error) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *InterfaceRecordSeriesPointFunc) SetDefaultReturn(r0 error) {
f.SetDefaultHook(func(context.Context, RecordSeriesPointArgs) error {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *InterfaceRecordSeriesPointFunc) PushReturn(r0 error) {
f.PushHook(func(context.Context, RecordSeriesPointArgs) error {
return r0
})
}
func (f *InterfaceRecordSeriesPointFunc) nextHook() func(context.Context, RecordSeriesPointArgs) error {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *InterfaceRecordSeriesPointFunc) appendCall(r0 InterfaceRecordSeriesPointFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of InterfaceRecordSeriesPointFuncCall objects
// describing the invocations of this function.
func (f *InterfaceRecordSeriesPointFunc) History() []InterfaceRecordSeriesPointFuncCall {
f.mutex.Lock()
history := make([]InterfaceRecordSeriesPointFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// InterfaceRecordSeriesPointFuncCall is an object that describes an
// invocation of method RecordSeriesPoint on an instance of MockInterface.
type InterfaceRecordSeriesPointFuncCall struct {
// Arg0 is the value of the 1st argument passed to this method
// invocation.
Arg0 context.Context
// Arg1 is the value of the 2nd argument passed to this method
// invocation.
Arg1 RecordSeriesPointArgs
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c InterfaceRecordSeriesPointFuncCall) Args() []interface{} {
return []interface{}{c.Arg0, c.Arg1}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c InterfaceRecordSeriesPointFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// InterfaceRecordSeriesPointsFunc describes the behavior when the
// RecordSeriesPoints method of the parent MockInterface instance is
// invoked.

View File

@ -9,13 +9,13 @@ import (
"time"
"github.com/RoaringBitmap/roaring"
"github.com/keegancsmith/sqlf"
edb "github.com/sourcegraph/sourcegraph/enterprise/internal/database"
"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/types"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/batch"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -24,6 +24,7 @@ import (
// for actual API usage.
type Interface interface {
SeriesPoints(ctx context.Context, opts SeriesPointsOpts) ([]SeriesPoint, error)
RecordSeriesPoint(ctx context.Context, v RecordSeriesPointArgs) error
RecordSeriesPoints(ctx context.Context, pts []RecordSeriesPointArgs) error
CountData(ctx context.Context, opts CountDataOpts) (int, error)
}
@ -481,6 +482,66 @@ type RecordSeriesPointArgs struct {
PersistMode PersistMode
}
// RecordSeriesPoint records a data point for the specfied series ID (which is a unique ID for the
// series, not a DB table primary key ID).
func (s *Store) RecordSeriesPoint(ctx context.Context, v RecordSeriesPointArgs) (err error) {
// Start transaction.
var txStore *basestore.Store
txStore, err = s.Store.Transact(ctx)
if err != nil {
return err
}
defer func() { err = txStore.Done(err) }()
if (v.RepoName != nil && v.RepoID == nil) || (v.RepoID != nil && v.RepoName == nil) {
return errors.New("RepoName and RepoID must be mutually specified")
}
// Upsert the repository name into a separate table, so we get a small ID we can reference
// many times from the series_points table without storing the repo name multiple times.
var repoNameID *int
if v.RepoName != nil {
repoNameIDValue, ok, err := basestore.ScanFirstInt(txStore.Query(ctx, sqlf.Sprintf(upsertRepoNameFmtStr, *v.RepoName, *v.RepoName)))
if err != nil {
return errors.Wrap(err, "upserting repo name ID")
}
if !ok {
return errors.Wrap(err, "repo name ID not found (this should never happen)")
}
repoNameID = &repoNameIDValue
}
tableName, err := getTableForPersistMode(v.PersistMode)
if err != nil {
return err
}
q := sqlf.Sprintf(
recordSeriesPointFmtstr,
sqlf.Sprintf(tableName),
v.SeriesID, // series_id
v.Point.Time.UTC(), // time
v.Point.Value, // value
v.RepoID, // repo_id
repoNameID, // repo_name_id
repoNameID, // original_repo_name_id
v.Point.Capture,
)
// Insert the actual data point.
return txStore.Exec(ctx, q)
}
func getTableForPersistMode(mode PersistMode) (string, error) {
switch mode {
case RecordMode:
return recordingTable, nil
case SnapshotMode:
return snapshotsTable, nil
default:
return "", errors.Newf("unsupported insights series point persist mode: %v", mode)
}
}
// RecordSeriesPoints stores multiple data points atomically.
func (s *Store) RecordSeriesPoints(ctx context.Context, pts []RecordSeriesPointArgs) (err error) {
tx, err := s.Transact(ctx)
@ -489,58 +550,12 @@ func (s *Store) RecordSeriesPoints(ctx context.Context, pts []RecordSeriesPointA
}
defer func() { err = tx.Done(err) }()
tableColumns := []string{"series_id", "time", "value", "repo_id", "repo_name_id", "original_repo_name_id", "capture"}
// In our current use cases we should only ever use one of these for one function call, but this could change.
inserters := map[PersistMode]*batch.Inserter{
RecordMode: batch.NewInserter(ctx, tx.Handle(), recordingTable, batch.MaxNumPostgresParameters, tableColumns...),
SnapshotMode: batch.NewInserter(ctx, tx.Handle(), snapshotsTable, batch.MaxNumPostgresParameters, tableColumns...),
}
for _, pt := range pts {
inserter, ok := inserters[pt.PersistMode]
if !ok {
return errors.Newf("unsupported insights series point persist mode: %v", pt.PersistMode)
}
if (pt.RepoName != nil && pt.RepoID == nil) || (pt.RepoID != nil && pt.RepoName == nil) {
return errors.New("RepoName and RepoID must be mutually specified")
}
// Upsert the repository name into a separate table, so we get a small ID we can reference
// many times from the series_points table without storing the repo name multiple times.
var repoNameID *int
if pt.RepoName != nil {
repoNameIDValue, ok, err := basestore.ScanFirstInt(tx.Query(ctx, sqlf.Sprintf(upsertRepoNameFmtStr, *pt.RepoName, *pt.RepoName)))
if err != nil {
return errors.Wrap(err, "upserting repo name ID")
}
if !ok {
return errors.Wrap(err, "repo name ID not found (this should never happen)")
}
repoNameID = &repoNameIDValue
}
if err := inserter.Insert(
ctx,
pt.SeriesID, // series_id
pt.Point.Time.UTC(), // time
pt.Point.Value, // value
pt.RepoID, // repo_id
repoNameID, // repo_name_id
repoNameID, // original_repo_name_id
pt.Point.Capture, // capture
); err != nil {
return errors.Wrap(err, "Insert")
// this is a pretty naive implementation, this can be refactored to reduce db calls
if err := s.RecordSeriesPoint(ctx, pt); err != nil {
return err
}
}
for _, inserter := range inserters {
if err := inserter.Flush(ctx); err != nil {
return errors.Wrap(err, "Flush")
}
}
return nil
}
@ -556,6 +571,18 @@ UNION
SELECT id FROM repo_names WHERE name = %s;
`
const recordSeriesPointFmtstr = `
-- source: enterprise/internal/insights/store/store.go:RecordSeriesPoint
INSERT INTO %s (
series_id,
time,
value,
repo_id,
repo_name_id,
original_repo_name_id, capture)
VALUES (%s, %s, %s, %s, %s, %s, %s);
`
func (s *Store) query(ctx context.Context, q *sqlf.Query, sc scanFunc) error {
rows, err := s.Store.Query(ctx, q)
if err != nil {

View File

@ -7,12 +7,13 @@ import (
"testing"
"time"
"github.com/sourcegraph/log/logtest"
"github.com/stretchr/testify/require"
"github.com/sourcegraph/sourcegraph/internal/api"
"k8s.io/apimachinery/pkg/util/rand"
"github.com/sourcegraph/log/logtest"
edb "github.com/sourcegraph/sourcegraph/enterprise/internal/database"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
@ -33,13 +34,12 @@ func initializeData(ctx context.Context, store *Store, repos, times int, withCap
seriesID := rand.String(8)
currentTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
var records []RecordSeriesPointArgs
for i := 0; i < times; i++ {
for j := 0; j < repos; j++ {
repoName := fmt.Sprintf("repo-%d", j)
id := api.RepoID(j)
for _, val := range cv {
records = append(records, RecordSeriesPointArgs{
err := store.RecordSeriesPoint(ctx, RecordSeriesPointArgs{
SeriesID: seriesID,
Point: SeriesPoint{
SeriesID: seriesID,
@ -51,14 +51,14 @@ func initializeData(ctx context.Context, store *Store, repos, times int, withCap
RepoID: &id,
PersistMode: RecordMode,
})
if err != nil {
panic(err)
}
}
}
currentTime = currentTime.AddDate(0, 1, 0)
}
if err := store.RecordSeriesPoints(ctx, records); err != nil {
panic(err)
}
return seriesID
}

View File

@ -17,7 +17,6 @@ import (
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
"github.com/sourcegraph/sourcegraph/internal/timeutil"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func TestSeriesPoints(t *testing.T) {
@ -153,7 +152,7 @@ func TestCountData(t *testing.T) {
optionalRepoID := func(v api.RepoID) *api.RepoID { return &v }
// Record some duplicate data points.
records := []RecordSeriesPointArgs{
for _, record := range []RecordSeriesPointArgs{
{
SeriesID: "one",
Point: SeriesPoint{Time: timeValue("2020-03-01T00:00:00Z"), Value: 1.1},
@ -181,9 +180,10 @@ func TestCountData(t *testing.T) {
Point: SeriesPoint{Time: timeValue("2020-03-03T00:01:00Z"), Value: 3.3},
PersistMode: RecordMode,
},
}
if err := store.RecordSeriesPoints(ctx, records); err != nil {
t.Fatal(err)
} {
if err := store.RecordSeriesPoint(ctx, record); err != nil {
t.Fatal(err)
}
}
// How many data points on 02-29?
@ -230,17 +230,12 @@ func TestRecordSeriesPoints(t *testing.T) {
permStore := NewInsightPermissionStore(postgres)
store := NewWithClock(insightsDB, permStore, clock)
// First test it does not error with no records.
if err := store.RecordSeriesPoints(ctx, []RecordSeriesPointArgs{}); err != nil {
t.Fatal(err)
}
optionalString := func(v string) *string { return &v }
optionalRepoID := func(v api.RepoID) *api.RepoID { return &v }
current := time.Date(2021, time.September, 10, 10, 0, 0, 0, time.UTC)
records := []RecordSeriesPointArgs{
for _, record := range []RecordSeriesPointArgs{
{
SeriesID: "one",
Point: SeriesPoint{Time: current, Value: 1.1},
@ -260,18 +255,19 @@ func TestRecordSeriesPoints(t *testing.T) {
Point: SeriesPoint{Time: current.Add(-time.Hour * 24 * 28), Value: 3.3},
RepoName: optionalString("repo1"),
RepoID: optionalRepoID(3),
PersistMode: SnapshotMode,
PersistMode: RecordMode,
},
{
SeriesID: "one",
Point: SeriesPoint{Time: current.Add(-time.Hour * 24 * 42), Value: 3.3},
RepoName: optionalString("repo1"),
RepoID: optionalRepoID(3),
PersistMode: SnapshotMode,
PersistMode: RecordMode,
},
}
if err := store.RecordSeriesPoints(ctx, records); err != nil {
t.Fatal(err)
} {
if err := store.RecordSeriesPoint(ctx, record); err != nil {
t.Fatal(err)
}
}
want := []SeriesPoint{
@ -338,7 +334,7 @@ func TestRecordSeriesPointsSnapshotOnly(t *testing.T) {
current := time.Date(2021, time.September, 10, 10, 0, 0, 0, time.UTC)
records := []RecordSeriesPointArgs{
for _, record := range []RecordSeriesPointArgs{
{
SeriesID: "one",
Point: SeriesPoint{Time: current, Value: 1.1},
@ -346,9 +342,10 @@ func TestRecordSeriesPointsSnapshotOnly(t *testing.T) {
RepoID: optionalRepoID(3),
PersistMode: SnapshotMode,
},
}
if err := store.RecordSeriesPoints(ctx, records); err != nil {
t.Fatal(err)
} {
if err := store.RecordSeriesPoint(ctx, record); err != nil {
t.Fatal(err)
}
}
// check snapshots table has a row
@ -401,7 +398,7 @@ func TestRecordSeriesPointsRecordingOnly(t *testing.T) {
current := time.Date(2021, time.September, 10, 10, 0, 0, 0, time.UTC)
records := []RecordSeriesPointArgs{
for _, record := range []RecordSeriesPointArgs{
{
SeriesID: "one",
Point: SeriesPoint{Time: current, Value: 1.1},
@ -409,9 +406,10 @@ func TestRecordSeriesPointsRecordingOnly(t *testing.T) {
RepoID: optionalRepoID(3),
PersistMode: RecordMode,
},
}
if err := store.RecordSeriesPoints(ctx, records); err != nil {
t.Fatal(err)
} {
if err := store.RecordSeriesPoint(ctx, record); err != nil {
t.Fatal(err)
}
}
// check snapshots table has a row
@ -465,7 +463,7 @@ func TestDeleteSnapshots(t *testing.T) {
current := time.Date(2021, time.September, 10, 10, 0, 0, 0, time.UTC)
seriesID := "one"
records := []RecordSeriesPointArgs{
for _, record := range []RecordSeriesPointArgs{
{
SeriesID: seriesID,
Point: SeriesPoint{Time: current, Value: 1.1},
@ -480,9 +478,10 @@ func TestDeleteSnapshots(t *testing.T) {
RepoID: optionalRepoID(3),
PersistMode: RecordMode,
},
}
if err := store.RecordSeriesPoints(ctx, records); err != nil {
t.Fatal(err)
} {
if err := store.RecordSeriesPoint(ctx, record); err != nil {
t.Fatal(err)
}
}
// first check that we have one recording and one snapshot
@ -625,14 +624,3 @@ func TestDelete(t *testing.T) {
t.Errorf("expected 1 count for series2 in snapshot table")
}
}
func getTableForPersistMode(mode PersistMode) (string, error) {
switch mode {
case RecordMode:
return recordingTable, nil
case SnapshotMode:
return snapshotsTable, nil
default:
return "", errors.Newf("unsupported insights series point persist mode: %v", mode)
}
}