database: support choice between ordered and unordered maps in keyed collection scanner (reducer) (#47029)

This commit is contained in:
Noah S-C 2023-01-27 17:49:20 +00:00 committed by GitHub
parent 9534aa21ea
commit 119ca1c34a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 740 additions and 35 deletions

View File

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"crypto/sha256"
"database/sql"
"fmt"
"os"
"sort"
@ -608,8 +607,10 @@ func (s *scipWriter) InsertDocument(
return nil
}
const DocumentsBatchSize = 256
const MaxBatchPayloadSum = 1024 * 1024 * 32
const (
DocumentsBatchSize = 256
MaxBatchPayloadSum = 1024 * 1024 * 32
)
func (s *scipWriter) flush(ctx context.Context) (err error) {
documents := s.batch
@ -852,7 +853,7 @@ const deleteLSIFDataQuery = `
DELETE FROM %s WHERE dump_id = %s
`
func makeDocumentScanner(serializer *serializer) func(rows *sql.Rows, queryErr error) (map[string]DocumentData, error) {
func makeDocumentScanner(serializer *serializer) func(rows basestore.Rows, queryErr error) (map[string]DocumentData, error) {
return basestore.NewMapScanner(func(s dbutil.Scanner) (string, DocumentData, error) {
var path string
var data MarshalledDocumentData
@ -869,7 +870,7 @@ func makeDocumentScanner(serializer *serializer) func(rows *sql.Rows, queryErr e
})
}
func scanResultChunksIntoMap(serializer *serializer, f func(idx int, resultChunk ResultChunkData) error) func(rows *sql.Rows, queryErr error) error {
func scanResultChunksIntoMap(serializer *serializer, f func(idx int, resultChunk ResultChunkData) error) func(rows basestore.Rows, queryErr error) error {
return basestore.NewCallbackScanner(func(s dbutil.Scanner) (bool, error) {
var idx int
var rawData []byte

3
go.mod
View File

@ -191,6 +191,8 @@ require github.com/XSAM/otelsql v0.15.0
require (
cloud.google.com/go/compute/metadata v0.2.1 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cloudflare/circl v1.3.0 // indirect
github.com/cockroachdb/apd/v2 v2.0.1 // indirect
github.com/dennwc/varint v1.0.0 // indirect
@ -234,6 +236,7 @@ require (
github.com/sourcegraph/conc v0.1.0
github.com/sourcegraph/mountinfo v0.0.0-20221027185101-272dd8baaf4a
github.com/sourcegraph/sourcegraph/monitoring v0.0.0-20230124144931-b2d81b1accb6
github.com/wk8/go-ordered-map/v2 v2.1.5
github.com/xanzy/go-gitlab v0.76.0
github.com/yuin/goldmark-highlighting/v2 v2.0.0-20220924101305-151362477c87
go.opentelemetry.io/otel/exporters/jaeger v1.11.2

6
go.sum
View File

@ -357,6 +357,8 @@ github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
@ -393,6 +395,8 @@ github.com/bshuster-repo/logrus-logstash-hook v1.0.0/go.mod h1:zsTqEiSzDgAa/8GZR
github.com/bufbuild/buf v1.4.0 h1:GqE3a8CMmcFvWPzuY3Mahf9Kf3S9XgZ/ORpfYFzO+90=
github.com/bufbuild/buf v1.4.0/go.mod h1:mwHG7klTHnX+rM/ym8LXGl7vYpVmnwT96xWoRB4H5QI=
github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8=
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50=
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
@ -2267,6 +2271,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV
github.com/vmware/govmomi v0.20.3/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/wk8/go-ordered-map/v2 v2.1.5 h1:jLbYIFyWQMUwHLO20cImlCRBoNc5lp0nmE2dvwcxc7k=
github.com/wk8/go-ordered-map/v2 v2.1.5/go.mod h1:9Xvgm2mV2kSq2SAm0Y608tBmu8akTzI7c2bz7/G7ZN4=
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0/go.mod h1:IXCdmsXIht47RaVFLEdVnh1t+pgYtTAhQGj73kz+2DM=
github.com/xanzy/go-gitlab v0.31.0/go.mod h1:sPLojNBn68fMUWSxIJtdVVIP8uSBYqesTfDUseX11Ug=
github.com/xanzy/go-gitlab v0.32.0/go.mod h1:sPLojNBn68fMUWSxIJtdVVIP8uSBYqesTfDUseX11Ug=

502
internal/database/basestore/mocks_test.go generated Normal file
View File

@ -0,0 +1,502 @@
// Code generated by go-mockgen 1.3.7; DO NOT EDIT.
//
// This file was generated by running `sg generate` (or `go-mockgen`) at the root of
// this repository. To add additional mocks to this or another package, add a new entry
// to the mockgen.yaml file in the root of this repository.
package basestore
import "sync"
// MockRows is a mock implementation of the Rows interface (from the package
// github.com/sourcegraph/sourcegraph/internal/database/basestore) used for
// unit testing.
type MockRows struct {
// CloseFunc is an instance of a mock function object controlling the
// behavior of the method Close.
CloseFunc *RowsCloseFunc
// ErrFunc is an instance of a mock function object controlling the
// behavior of the method Err.
ErrFunc *RowsErrFunc
// NextFunc is an instance of a mock function object controlling the
// behavior of the method Next.
NextFunc *RowsNextFunc
// ScanFunc is an instance of a mock function object controlling the
// behavior of the method Scan.
ScanFunc *RowsScanFunc
}
// NewMockRows creates a new mock of the Rows interface. All methods return
// zero values for all results, unless overwritten.
func NewMockRows() *MockRows {
return &MockRows{
CloseFunc: &RowsCloseFunc{
defaultHook: func() (r0 error) {
return
},
},
ErrFunc: &RowsErrFunc{
defaultHook: func() (r0 error) {
return
},
},
NextFunc: &RowsNextFunc{
defaultHook: func() (r0 bool) {
return
},
},
ScanFunc: &RowsScanFunc{
defaultHook: func(...interface{}) (r0 error) {
return
},
},
}
}
// NewStrictMockRows creates a new mock of the Rows interface. All methods
// panic on invocation, unless overwritten.
func NewStrictMockRows() *MockRows {
return &MockRows{
CloseFunc: &RowsCloseFunc{
defaultHook: func() error {
panic("unexpected invocation of MockRows.Close")
},
},
ErrFunc: &RowsErrFunc{
defaultHook: func() error {
panic("unexpected invocation of MockRows.Err")
},
},
NextFunc: &RowsNextFunc{
defaultHook: func() bool {
panic("unexpected invocation of MockRows.Next")
},
},
ScanFunc: &RowsScanFunc{
defaultHook: func(...interface{}) error {
panic("unexpected invocation of MockRows.Scan")
},
},
}
}
// NewMockRowsFrom creates a new mock of the MockRows interface. All methods
// delegate to the given implementation, unless overwritten.
func NewMockRowsFrom(i Rows) *MockRows {
return &MockRows{
CloseFunc: &RowsCloseFunc{
defaultHook: i.Close,
},
ErrFunc: &RowsErrFunc{
defaultHook: i.Err,
},
NextFunc: &RowsNextFunc{
defaultHook: i.Next,
},
ScanFunc: &RowsScanFunc{
defaultHook: i.Scan,
},
}
}
// RowsCloseFunc describes the behavior when the Close method of the parent
// MockRows instance is invoked.
type RowsCloseFunc struct {
defaultHook func() error
hooks []func() error
history []RowsCloseFuncCall
mutex sync.Mutex
}
// Close delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockRows) Close() error {
r0 := m.CloseFunc.nextHook()()
m.CloseFunc.appendCall(RowsCloseFuncCall{r0})
return r0
}
// SetDefaultHook sets function that is called when the Close method of the
// parent MockRows instance is invoked and the hook queue is empty.
func (f *RowsCloseFunc) SetDefaultHook(hook func() error) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Close method of the parent MockRows instance invokes the hook at the
// front of the queue and discards it. After the queue is empty, the default
// hook function is invoked for any future action.
func (f *RowsCloseFunc) PushHook(hook func() error) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *RowsCloseFunc) SetDefaultReturn(r0 error) {
f.SetDefaultHook(func() error {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *RowsCloseFunc) PushReturn(r0 error) {
f.PushHook(func() error {
return r0
})
}
func (f *RowsCloseFunc) nextHook() func() error {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *RowsCloseFunc) appendCall(r0 RowsCloseFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of RowsCloseFuncCall objects describing the
// invocations of this function.
func (f *RowsCloseFunc) History() []RowsCloseFuncCall {
f.mutex.Lock()
history := make([]RowsCloseFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// RowsCloseFuncCall is an object that describes an invocation of method
// Close on an instance of MockRows.
type RowsCloseFuncCall struct {
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c RowsCloseFuncCall) Args() []interface{} {
return []interface{}{}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c RowsCloseFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// RowsErrFunc describes the behavior when the Err method of the parent
// MockRows instance is invoked.
type RowsErrFunc struct {
defaultHook func() error
hooks []func() error
history []RowsErrFuncCall
mutex sync.Mutex
}
// Err delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockRows) Err() error {
r0 := m.ErrFunc.nextHook()()
m.ErrFunc.appendCall(RowsErrFuncCall{r0})
return r0
}
// SetDefaultHook sets function that is called when the Err method of the
// parent MockRows instance is invoked and the hook queue is empty.
func (f *RowsErrFunc) SetDefaultHook(hook func() error) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Err method of the parent MockRows instance invokes the hook at the front
// of the queue and discards it. After the queue is empty, the default hook
// function is invoked for any future action.
func (f *RowsErrFunc) PushHook(hook func() error) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *RowsErrFunc) SetDefaultReturn(r0 error) {
f.SetDefaultHook(func() error {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *RowsErrFunc) PushReturn(r0 error) {
f.PushHook(func() error {
return r0
})
}
func (f *RowsErrFunc) nextHook() func() error {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *RowsErrFunc) appendCall(r0 RowsErrFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of RowsErrFuncCall objects describing the
// invocations of this function.
func (f *RowsErrFunc) History() []RowsErrFuncCall {
f.mutex.Lock()
history := make([]RowsErrFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// RowsErrFuncCall is an object that describes an invocation of method Err
// on an instance of MockRows.
type RowsErrFuncCall struct {
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 error
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c RowsErrFuncCall) Args() []interface{} {
return []interface{}{}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c RowsErrFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// RowsNextFunc describes the behavior when the Next method of the parent
// MockRows instance is invoked.
type RowsNextFunc struct {
defaultHook func() bool
hooks []func() bool
history []RowsNextFuncCall
mutex sync.Mutex
}
// Next delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockRows) Next() bool {
r0 := m.NextFunc.nextHook()()
m.NextFunc.appendCall(RowsNextFuncCall{r0})
return r0
}
// SetDefaultHook sets function that is called when the Next method of the
// parent MockRows instance is invoked and the hook queue is empty.
func (f *RowsNextFunc) SetDefaultHook(hook func() bool) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Next method of the parent MockRows instance invokes the hook at the front
// of the queue and discards it. After the queue is empty, the default hook
// function is invoked for any future action.
func (f *RowsNextFunc) PushHook(hook func() bool) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *RowsNextFunc) SetDefaultReturn(r0 bool) {
f.SetDefaultHook(func() bool {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *RowsNextFunc) PushReturn(r0 bool) {
f.PushHook(func() bool {
return r0
})
}
func (f *RowsNextFunc) nextHook() func() bool {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *RowsNextFunc) appendCall(r0 RowsNextFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of RowsNextFuncCall objects describing the
// invocations of this function.
func (f *RowsNextFunc) History() []RowsNextFuncCall {
f.mutex.Lock()
history := make([]RowsNextFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// RowsNextFuncCall is an object that describes an invocation of method Next
// on an instance of MockRows.
type RowsNextFuncCall struct {
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 bool
}
// Args returns an interface slice containing the arguments of this
// invocation.
func (c RowsNextFuncCall) Args() []interface{} {
return []interface{}{}
}
// Results returns an interface slice containing the results of this
// invocation.
func (c RowsNextFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}
// RowsScanFunc describes the behavior when the Scan method of the parent
// MockRows instance is invoked.
type RowsScanFunc struct {
defaultHook func(...interface{}) error
hooks []func(...interface{}) error
history []RowsScanFuncCall
mutex sync.Mutex
}
// Scan delegates to the next hook function in the queue and stores the
// parameter and result values of this invocation.
func (m *MockRows) Scan(v0 ...interface{}) error {
r0 := m.ScanFunc.nextHook()(v0...)
m.ScanFunc.appendCall(RowsScanFuncCall{v0, r0})
return r0
}
// SetDefaultHook sets function that is called when the Scan method of the
// parent MockRows instance is invoked and the hook queue is empty.
func (f *RowsScanFunc) SetDefaultHook(hook func(...interface{}) error) {
f.defaultHook = hook
}
// PushHook adds a function to the end of hook queue. Each invocation of the
// Scan method of the parent MockRows instance invokes the hook at the front
// of the queue and discards it. After the queue is empty, the default hook
// function is invoked for any future action.
func (f *RowsScanFunc) PushHook(hook func(...interface{}) error) {
f.mutex.Lock()
f.hooks = append(f.hooks, hook)
f.mutex.Unlock()
}
// SetDefaultReturn calls SetDefaultHook with a function that returns the
// given values.
func (f *RowsScanFunc) SetDefaultReturn(r0 error) {
f.SetDefaultHook(func(...interface{}) error {
return r0
})
}
// PushReturn calls PushHook with a function that returns the given values.
func (f *RowsScanFunc) PushReturn(r0 error) {
f.PushHook(func(...interface{}) error {
return r0
})
}
func (f *RowsScanFunc) nextHook() func(...interface{}) error {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.hooks) == 0 {
return f.defaultHook
}
hook := f.hooks[0]
f.hooks = f.hooks[1:]
return hook
}
func (f *RowsScanFunc) appendCall(r0 RowsScanFuncCall) {
f.mutex.Lock()
f.history = append(f.history, r0)
f.mutex.Unlock()
}
// History returns a sequence of RowsScanFuncCall objects describing the
// invocations of this function.
func (f *RowsScanFunc) History() []RowsScanFuncCall {
f.mutex.Lock()
history := make([]RowsScanFuncCall, len(f.history))
copy(history, f.history)
f.mutex.Unlock()
return history
}
// RowsScanFuncCall is an object that describes an invocation of method Scan
// on an instance of MockRows.
type RowsScanFuncCall struct {
// Arg0 is a slice containing the values of the variadic arguments
// passed to this method invocation.
Arg0 []interface{}
// Result0 is the value of the 1st result returned from this method
// invocation.
Result0 error
}
// Args returns an interface slice containing the arguments of this
// invocation. The variadic slice argument is flattened in this array such
// that one positional argument and three variadic arguments would result in
// a slice of four, not two.
func (c RowsScanFuncCall) Args() []interface{} {
trailing := []interface{}{}
for _, val := range c.Arg0 {
trailing = append(trailing, val)
}
return append([]interface{}{}, trailing...)
}
// Results returns an interface slice containing the results of this
// invocation.
func (c RowsScanFuncCall) Results() []interface{} {
return []interface{}{c.Result0}
}

View File

@ -1,11 +1,16 @@
package basestore
import (
"database/sql"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type Rows interface {
Next() bool
Close() error
Err() error
Scan(...interface{}) error
}
// CloseRows closes the given rows object. The resulting error is a multierror
// containing the error parameter along with any errors that occur during scanning
// or closing the rows object. The rows object is assumed to be non-nil.
@ -25,6 +30,6 @@ import (
// ensure that the rows are always properly handled.
//
// things, err := ScanThings(store.Query(ctx, query))
func CloseRows(rows *sql.Rows, err error) error {
func CloseRows(rows Rows, err error) error {
return errors.Append(err, rows.Close(), rows.Err())
}

View File

@ -1,7 +1,8 @@
package basestore
import (
"database/sql"
orderedmap "github.com/wk8/go-ordered-map/v2"
"golang.org/x/exp/maps"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
)
@ -9,8 +10,8 @@ import (
// NewCallbackScanner returns a basestore scanner function that invokes the given
// function on every SQL row object in the given query result set. If the callback
// function returns a false-valued flag, the remaining rows are discarded.
func NewCallbackScanner(f func(dbutil.Scanner) (bool, error)) func(rows *sql.Rows, queryErr error) error {
return func(rows *sql.Rows, queryErr error) (err error) {
func NewCallbackScanner(f func(dbutil.Scanner) (bool, error)) func(rows Rows, queryErr error) error {
return func(rows Rows, queryErr error) (err error) {
if queryErr != nil {
return queryErr
}
@ -32,8 +33,8 @@ func NewCallbackScanner(f func(dbutil.Scanner) (bool, error)) func(rows *sql.Row
// first value of a query result (assuming there is at most one value).
// The given function is invoked with a SQL rows object to scan a single
// value.
func NewFirstScanner[T any](f func(dbutil.Scanner) (T, error)) func(rows *sql.Rows, queryErr error) (T, bool, error) {
return func(rows *sql.Rows, queryErr error) (value T, called bool, _ error) {
func NewFirstScanner[T any](f func(dbutil.Scanner) (T, error)) func(rows Rows, queryErr error) (T, bool, error) {
return func(rows Rows, queryErr error) (value T, called bool, _ error) {
scanner := func(s dbutil.Scanner) (_ bool, err error) {
called = true
value, err = f(s)
@ -48,8 +49,8 @@ func NewFirstScanner[T any](f func(dbutil.Scanner) (T, error)) func(rows *sql.Ro
// NewSliceScanner returns a basestore scanner function that returns all
// the values of a query result. The given function is invoked multiple
// times with a SQL rows object to scan a single value.
func NewSliceScanner[T any](f func(dbutil.Scanner) (T, error)) func(rows *sql.Rows, queryErr error) ([]T, error) {
return func(rows *sql.Rows, queryErr error) (values []T, _ error) {
func NewSliceScanner[T any](f func(dbutil.Scanner) (T, error)) func(rows Rows, queryErr error) ([]T, error) {
return func(rows Rows, queryErr error) (values []T, _ error) {
scanner := func(s dbutil.Scanner) (bool, error) {
value, err := f(s)
if err != nil {
@ -72,8 +73,8 @@ func NewSliceScanner[T any](f func(dbutil.Scanner) (T, error)) func(rows *sql.Ro
// Example query that would avail of this function, where we want only 10 rows but still
// the count of everything that would have been returned, without performing two separate queries:
// SELECT u.id, COUNT(*) OVER() as count FROM users LIMIT 10
func NewSliceWithCountScanner[T any](f func(dbutil.Scanner) (T, int, error)) func(rows *sql.Rows, queryErr error) ([]T, int, error) {
return func(rows *sql.Rows, queryErr error) (values []T, totalCount int, _ error) {
func NewSliceWithCountScanner[T any](f func(dbutil.Scanner) (T, int, error)) func(rows Rows, queryErr error) ([]T, int, error) {
return func(rows Rows, queryErr error) (values []T, totalCount int, _ error) {
scanner := func(s dbutil.Scanner) (bool, error) {
value, count, err := f(s)
if err != nil {
@ -94,44 +95,52 @@ func NewSliceWithCountScanner[T any](f func(dbutil.Scanner) (T, int, error)) fun
// query result organized as a map. The given function is invoked multiple times with a SQL rows
// object to scan a single map value. The given reducer provides a way to customize how multiple
// values are reduced into a collection.
func NewKeyedCollectionScanner[K comparable, V, Vs any](
func NewKeyedCollectionScanner[Map keyedMap[K, Vs], K comparable, V, Vs any](
values Map,
scanPair func(dbutil.Scanner) (K, V, error),
reducer CollectionReducer[V, Vs],
) func(rows *sql.Rows, queryErr error) (map[K]Vs, error) {
return func(rows *sql.Rows, queryErr error) (map[K]Vs, error) {
values := map[K]Vs{}
) func(rows Rows, queryErr error) error {
return func(rows Rows, queryErr error) error {
scanner := func(s dbutil.Scanner) (bool, error) {
key, value, err := scanPair(s)
if err != nil {
return false, err
}
collection, ok := values[key]
collection, ok := values.Get(key)
if !ok {
collection = reducer.Create()
}
values[key] = reducer.Reduce(collection, value)
values.Set(key, reducer.Reduce(collection, value))
return true, nil
}
err := NewCallbackScanner(scanner)(rows, queryErr)
return values, err
return err
}
}
// NewMapScanner returns a basestore scanner function that returns the values of a
// query result organized as a map. The given function is invoked multiple times with
// a SQL rows object to scan a single map value.
func NewMapScanner[K comparable, V any](f func(dbutil.Scanner) (K, V, error)) func(rows *sql.Rows, queryErr error) (map[K]V, error) {
return NewKeyedCollectionScanner[K, V, V](f, SingleValueReducer[V]{})
func NewMapScanner[K comparable, V any](f func(dbutil.Scanner) (K, V, error)) func(rows Rows, queryErr error) (map[K]V, error) {
return func(rows Rows, queryErr error) (map[K]V, error) {
m := &UnorderedMap[K, V]{m: make(map[K]V)}
err := NewKeyedCollectionScanner[*UnorderedMap[K, V], K, V, V](m, f, SingleValueReducer[V]{})(rows, queryErr)
return m.ToMap(), err
}
}
// NewMapSliceScanner returns a basestore scanner function that returns the values
// of a query result organized as a map of slice values. The given function is invoked
// multiple times with a SQL rows object to scan a single map key value.
func NewMapSliceScanner[K comparable, V any](f func(dbutil.Scanner) (K, V, error)) func(rows *sql.Rows, queryErr error) (map[K][]V, error) {
return NewKeyedCollectionScanner[K, V, []V](f, SliceReducer[V]{})
func NewMapSliceScanner[K comparable, V any](f func(dbutil.Scanner) (K, V, error)) func(rows Rows, queryErr error) (map[K][]V, error) {
return func(rows Rows, queryErr error) (map[K][]V, error) {
m := &UnorderedMap[K, []V]{m: make(map[K][]V)}
err := NewKeyedCollectionScanner[*UnorderedMap[K, []V], K, V, []V](m, f, SliceReducer[V]{})(rows, queryErr)
return m.ToMap(), err
}
}
// CollectionReducer configures how scanners created by `NewKeyedCollectionScanner` will
@ -155,3 +164,68 @@ type SingleValueReducer[T any] struct{}
func (r SingleValueReducer[T]) Create() (_ T) { return }
func (r SingleValueReducer[T]) Reduce(collection T, value T) T { return value }
type keyedMap[K comparable, V any] interface {
Get(K) (V, bool)
Set(K, V)
Len() int
Values() []V
ToMap() map[K]V
}
type UnorderedMap[K comparable, V any] struct {
m map[K]V
}
func (m UnorderedMap[K, V]) Get(key K) (V, bool) {
v, ok := m.m[key]
return v, ok
}
func (m UnorderedMap[K, V]) Set(key K, val V) {
m.m[key] = val
}
func (m UnorderedMap[K, V]) Len() int {
return len(m.m)
}
func (m UnorderedMap[K, V]) Values() []V {
return maps.Values(m.m)
}
func (m *UnorderedMap[K, V]) ToMap() map[K]V {
return m.m
}
type OrderedMap[K comparable, V any] struct {
m *orderedmap.OrderedMap[K, V]
}
func (m OrderedMap[K, V]) Get(key K) (V, bool) {
return m.m.Get(key)
}
func (m OrderedMap[K, V]) Set(key K, val V) {
m.m.Set(key, val)
}
func (m OrderedMap[K, V]) Len() int {
return m.m.Len()
}
func (m OrderedMap[K, V]) Values() []V {
values := make([]V, 0, m.m.Len())
for pair := m.m.Oldest(); pair != nil; pair = pair.Next() {
values = append(values, pair.Value)
}
return values
}
func (m *OrderedMap[K, V]) ToMap() map[K]V {
ret := make(map[K]V, m.m.Len())
for pair := m.m.Oldest(); pair != nil; pair = pair.Next() {
ret[pair.Key] = pair.Value
}
return ret
}

View File

@ -0,0 +1,112 @@
package basestore
import (
"testing"
"github.com/google/go-cmp/cmp"
orderedmap "github.com/wk8/go-ordered-map/v2"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
)
type reducee struct {
ID int
Values []int
}
type reduceeRow struct {
ID, Value int
}
type testReducer struct{}
func (d testReducer) Create() reducee {
return reducee{Values: make([]int, 0)}
}
func (d testReducer) Reduce(collection reducee, value reduceeRow) reducee {
collection.ID = value.ID
collection.Values = append(collection.Values, value.Value)
return collection
}
func Test_KeyedCollectionScannerOrdered(t *testing.T) {
data := []reduceeRow{
{
ID: 0,
Value: 0,
},
{
ID: 0,
Value: 1,
},
{
ID: 0,
Value: 2,
},
{
ID: 2,
Value: 0,
},
{
ID: 1,
Value: 1,
},
{
ID: 0,
Value: 3,
},
{
ID: -1,
Value: -1,
},
{
ID: 1,
Value: 0,
},
}
offset := -1
rows := NewMockRows()
rows.NextFunc.SetDefaultHook(func() bool {
offset++
return offset < len(data)
})
rows.ScanFunc.SetDefaultHook(func(i ...interface{}) error {
*(i[0].(*int)) = data[offset].ID
*(i[1].(*int)) = data[offset].Value
return nil
})
m := &OrderedMap[int, reducee]{m: orderedmap.New[int, reducee]()}
NewKeyedCollectionScanner[*OrderedMap[int, reducee], int, reduceeRow, reducee](m, func(s dbutil.Scanner) (int, reduceeRow, error) {
var red reduceeRow
err := s.Scan(&red.ID, &red.Value)
return red.ID, red, err
}, testReducer{})(rows, nil)
if m.Len() != 4 {
t.Errorf("unexpected map size: want=%d got=%d\n%v", 4, m.Len(), m.Values())
}
if diff := cmp.Diff([]reducee{
{
ID: 0,
Values: []int{0, 1, 2, 3},
},
{
ID: 2,
Values: []int{0},
},
{
ID: 1,
Values: []int{1, 0},
},
{
ID: -1,
Values: []int{-1},
},
}, m.Values()); diff != "" {
t.Errorf("unexpected collection output (-want,+got):\n%s", diff)
}
}

View File

@ -1,8 +1,6 @@
package database
import (
"database/sql"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/encryption"
@ -15,7 +13,7 @@ type EncryptionConfig struct {
KeyIDFieldName string
EncryptedFieldNames []string
UpdateAsBytes bool
Scan func(*sql.Rows, error) (map[int]Encrypted, error)
Scan func(basestore.Rows, error) (map[int]Encrypted, error)
Key func() encryption.Key
Limit int
}

View File

@ -195,9 +195,9 @@ func copyPtr[T any](n *T) *T {
// Clone (aka deepcopy) returns a new PaginationArgs object with the same values as "p".
func (p *PaginationArgs) Clone() *PaginationArgs {
return &PaginationArgs{
First: copyPtr[int](p.First),
Last: copyPtr[int](p.Last),
After: copyPtr[string](p.After),
Before: copyPtr[string](p.Before),
First: copyPtr(p.First),
Last: copyPtr(p.Last),
After: copyPtr(p.After),
Before: copyPtr(p.Before),
}
}

View File

@ -178,6 +178,10 @@
path: github.com/sourcegraph/sourcegraph/internal/database/migration/runner
interfaces:
- Store
- filename: internal/database/basestore/mocks_test.go
path: github.com/sourcegraph/sourcegraph/internal/database/basestore
interfaces:
- Rows
- filename: internal/featureflag/mocks_test.go
path: github.com/sourcegraph/sourcegraph/internal/featureflag
interfaces: