mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 20:31:48 +00:00
search jobs: switch from CSV to JSON (#59619)
Relates to #59352 We switch the format of the results download from CSV to line-separated JSON. Each line corresponds to a JSON object containing chunk matches. The JSON object has the same format as the matches served by the Stream API. This is a **breaking change** motivated by customer feedback. Search Jobs is still released as EAP so a breaking change is acceptable. Pros: - richer information (matches, content, positions) - supports all result types - same format as Stream API Cons: - Requires more storage - Not as easy to parse by a human as a CSV ## Test plan - updated and new units tests
This commit is contained in:
parent
06ea78ee9d
commit
d29948e654
@ -35,6 +35,7 @@ All notable changes to Sourcegraph are documented in this file.
|
||||
- For `size` and `stars` the supported operators are `<`, `>`, `<=`, `>=`.
|
||||
- For `size` the supported units are `B`, `b`, `kB`, `KB`, `kiB`, `KiB`, `MiB`, `MB`, `GiB`, `GB`. No decimals points are supported.
|
||||
- Structural Search is now disabled by default. To enable it, set `experimentalFeatures.structuralSearch: "enabled"` in the site configuration. [#57584](https://github.com/sourcegraph/sourcegraph/pull/57584)
|
||||
- Search Jobs switches the format of downloaded results from CSV to JSON. [#59619](https://github.com/sourcegraph/sourcegraph/pull/59619)
|
||||
|
||||
### Fixed
|
||||
|
||||
|
||||
@ -374,7 +374,7 @@ export const ExhaustiveSearchMessage: FC<ExhaustiveSearchMessageProps> = props =
|
||||
)}
|
||||
|
||||
<Text className={classNames(validationError && 'text-muted', styles.exhaustiveSearchText)}>
|
||||
Search jobs exhaustively return all matches of a query. Results can be downloaded via CSV.
|
||||
Search jobs exhaustively return all matches of a query. Results can be downloaded as JSON.
|
||||
</Text>
|
||||
|
||||
{error && <ErrorAlert error={error} className="mt-3" />}
|
||||
|
||||
@ -161,7 +161,7 @@ func NewHandler(
|
||||
m.Path("/src-cli/{rest:.*}").Methods("GET").Handler(trace.Route(newSrcCliVersionHandler(logger)))
|
||||
m.Path("/insights/export/{id}").Methods("GET").Handler(trace.Route(handlers.CodeInsightsDataExportHandler))
|
||||
m.Path("/search/stream").Methods("GET").Handler(trace.Route(frontendsearch.StreamHandler(db)))
|
||||
m.Path("/search/export/{id}.csv").Methods("GET").Handler(trace.Route(handlers.SearchJobsDataExportHandler))
|
||||
m.Path("/search/export/{id}.json").Methods("GET").Handler(trace.Route(handlers.SearchJobsDataExportHandler))
|
||||
m.Path("/search/export/{id}.log").Methods("GET").Handler(trace.Route(handlers.SearchJobsLogsHandler))
|
||||
|
||||
m.Path("/completions/stream").Methods("POST").Handler(trace.Route(handlers.NewChatCompletionsStreamHandler()))
|
||||
|
||||
@ -33,14 +33,14 @@ func ServeSearchJobDownload(logger log.Logger, svc *service.Service) http.Handle
|
||||
return
|
||||
}
|
||||
|
||||
csvWriterTo, err := svc.GetSearchJobCSVWriterTo(r.Context(), int64(jobID))
|
||||
writerTo, err := svc.GetSearchJobResultsWriterTo(r.Context(), int64(jobID))
|
||||
if err != nil {
|
||||
httpError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
filename := filenamePrefix(jobID) + ".csv"
|
||||
writeCSV(logger.With(log.Int("jobID", jobID)), w, filename, csvWriterTo)
|
||||
filename := filenamePrefix(jobID) + ".json"
|
||||
writeJSON(logger.With(log.Int("jobID", jobID)), w, filename, writerTo)
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,6 +76,16 @@ func writeCSV(logger log.Logger, w http.ResponseWriter, filenameNoQuotes string,
|
||||
}
|
||||
}
|
||||
|
||||
func writeJSON(logger log.Logger, w http.ResponseWriter, filenameNoQuotes string, writerTo io.WriterTo) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filenameNoQuotes))
|
||||
w.WriteHeader(200)
|
||||
n, err := writerTo.WriteTo(w)
|
||||
if err != nil {
|
||||
logger.Warn("failed while writing search job response", log.String("filename", filenameNoQuotes), log.Int64("bytesWritten", n), log.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func httpError(w http.ResponseWriter, err error) {
|
||||
switch {
|
||||
case errors.Is(err, auth.ErrMustBeSiteAdminOrSameUser):
|
||||
|
||||
@ -46,11 +46,11 @@ func TestServeSearchJobDownload(t *testing.T) {
|
||||
svc := service.New(observationCtx, s, mockUploadStore, service.NewSearcherFake())
|
||||
|
||||
router := mux.NewRouter()
|
||||
router.HandleFunc("/{id}.csv", ServeSearchJobDownload(logger, svc))
|
||||
router.HandleFunc("/{id}.json", ServeSearchJobDownload(logger, svc))
|
||||
|
||||
// no job
|
||||
{
|
||||
req, err := http.NewRequest(http.MethodGet, "/99.csv", nil)
|
||||
req, err := http.NewRequest(http.MethodGet, "/99.json", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
@ -70,7 +70,7 @@ func TestServeSearchJobDownload(t *testing.T) {
|
||||
_, err = svc.CreateSearchJob(userCtx, "1@rev1")
|
||||
require.NoError(t, err)
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, "/1.csv", nil)
|
||||
req, err := http.NewRequest(http.MethodGet, "/1.json", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
req = req.WithContext(actor.WithActor(context.Background(), &actor.Actor{UID: userID}))
|
||||
@ -87,7 +87,7 @@ func TestServeSearchJobDownload(t *testing.T) {
|
||||
userID, err := createUser(bs, "alice")
|
||||
require.NoError(t, err)
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, "/1.csv", nil)
|
||||
req, err := http.NewRequest(http.MethodGet, "/1.json", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
req = req.WithContext(actor.WithActor(context.Background(), &actor.Actor{UID: userID}))
|
||||
|
||||
@ -77,7 +77,7 @@ func (r *searchJobResolver) FinishedAt(ctx context.Context) *gqlutil.DateTime {
|
||||
|
||||
func (r *searchJobResolver) URL(ctx context.Context) (*string, error) {
|
||||
if r.Job.State == types.JobStateCompleted {
|
||||
exportPath, err := url.JoinPath(conf.Get().ExternalURL, fmt.Sprintf("/.api/search/export/%d.csv", r.Job.ID))
|
||||
exportPath, err := url.JoinPath(conf.Get().ExternalURL, fmt.Sprintf("/.api/search/export/%d.json", r.Job.ID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -72,13 +72,13 @@ func (h *exhaustiveSearchRepoRevHandler) Handle(ctx context.Context, logger log.
|
||||
return err
|
||||
}
|
||||
|
||||
csvWriter, err := service.NewCSVWriter(ctx, h.uploadStore, fmt.Sprintf("%d-%d", jobID, record.ID))
|
||||
w, err := service.NewJSONWriter(ctx, h.uploadStore, fmt.Sprintf("%d-%d", jobID, record.ID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = q.Search(ctx, repoRev, csvWriter)
|
||||
if closeErr := csvWriter.Close(); closeErr != nil {
|
||||
err = q.Search(ctx, repoRev, w)
|
||||
if closeErr := w.Flush(); closeErr != nil {
|
||||
err = errors.Append(err, closeErr)
|
||||
}
|
||||
|
||||
|
||||
@ -117,11 +117,10 @@ func TestExhaustiveSearch(t *testing.T) {
|
||||
vals = append(vals, v)
|
||||
}
|
||||
sort.Strings(vals)
|
||||
require.Equal([]string{
|
||||
"repository,revision,file_path,match_count,first_match_url\n1,rev1,path/to/file.go,0,/1@rev1/-/blob/path/to/file.go\n",
|
||||
"repository,revision,file_path,match_count,first_match_url\n1,rev2,path/to/file.go,0,/1@rev2/-/blob/path/to/file.go\n",
|
||||
"repository,revision,file_path,match_count,first_match_url\n2,rev3,path/to/file.go,0,/2@rev3/-/blob/path/to/file.go\n",
|
||||
}, vals)
|
||||
require.Equal([]string{`{"type":"path","path":"path/to/file.go","repositoryID":1,"repository":"repo1","commit":"rev1","language":"Go"}
|
||||
`, `{"type":"path","path":"path/to/file.go","repositoryID":1,"repository":"repo1","commit":"rev2","language":"Go"}
|
||||
`, `{"type":"path","path":"path/to/file.go","repositoryID":2,"repository":"repo2","commit":"rev3","language":"Go"}
|
||||
`}, vals)
|
||||
}
|
||||
|
||||
// Minor assertion that the job is regarded as finished.
|
||||
|
||||
@ -4,7 +4,6 @@ load("//dev:go_defs.bzl", "go_test")
|
||||
go_library(
|
||||
name = "service",
|
||||
srcs = [
|
||||
"matchcsv.go",
|
||||
"matchjson.go",
|
||||
"search.go",
|
||||
"searcher.go",
|
||||
@ -31,7 +30,6 @@ go_library(
|
||||
"//internal/search/repos",
|
||||
"//internal/search/result",
|
||||
"//internal/search/streaming",
|
||||
"//internal/search/streaming/http",
|
||||
"//internal/types",
|
||||
"//internal/uploadstore",
|
||||
"//lib/errors",
|
||||
|
||||
@ -1,182 +0,0 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf"
|
||||
"github.com/sourcegraph/sourcegraph/internal/search/result"
|
||||
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
)
|
||||
|
||||
type MatchCSVWriter struct {
|
||||
w CSVWriter
|
||||
headerTyp string
|
||||
host *url.URL
|
||||
}
|
||||
|
||||
func NewCSVWriter(ctx context.Context, store uploadstore.Store, prefix string) (*MatchCSVWriter, error) {
|
||||
csvWriter := newBlobstoreCSVWriter(ctx, store, prefix)
|
||||
return newMatchCSVWriter(csvWriter)
|
||||
}
|
||||
|
||||
func newMatchCSVWriter(w CSVWriter) (*MatchCSVWriter, error) {
|
||||
externalURL := conf.Get().ExternalURL
|
||||
u, err := url.Parse(externalURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &MatchCSVWriter{w: w, host: u}, nil
|
||||
}
|
||||
|
||||
func (w *MatchCSVWriter) Close() error {
|
||||
return w.w.Close()
|
||||
}
|
||||
|
||||
func (w *MatchCSVWriter) Write(match result.Match) error {
|
||||
// TODO compare to logic used by the webapp to convert
|
||||
// results into csv. See
|
||||
// client/web/src/search/results/export/searchResultsExport.ts
|
||||
|
||||
switch m := match.(type) {
|
||||
case *result.FileMatch:
|
||||
return w.writeFileMatch(m)
|
||||
default:
|
||||
return errors.Errorf("match type %T not yet supported", match)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *MatchCSVWriter) writeFileMatch(fm *result.FileMatch) error {
|
||||
// Differences to "Export CSV" in webapp. We have removed columns since it
|
||||
// is easier to add columns than to remove them.
|
||||
//
|
||||
// Spaces :: We remove spaces from all column names. This is to avoid
|
||||
// needing to quote them. This makes processing of the output more
|
||||
// pleasant in tools like shell pipelines, sqlite's csv mode, etc.
|
||||
//
|
||||
// Match type :: Excluded since we only have one type for now. When we add
|
||||
// other types we may want to include them in different ways.
|
||||
//
|
||||
// Repository export URL :: We don't like it. It is verbose and is just
|
||||
// repo + rev fields. Unsure why someone would want to click on it.
|
||||
//
|
||||
// File URL :: We like this, but since we leave out actual ranges we
|
||||
// instead include an example URL to a match.
|
||||
//
|
||||
// Chunk Matches :: We are unsure who this field is for. It is hard for a
|
||||
// human to read and similarly weird for a machine to parse JSON out of a
|
||||
// CSV file. Instead we have "First match url" for a human to help
|
||||
// validate and "Match count" for calculating aggregate counts.
|
||||
//
|
||||
// First match url :: This is a new field which is a convenient URL for a
|
||||
// human to click on. We only have one URL to prevent blowing up the size
|
||||
// of the CSV. We find this field useful for building confidence.
|
||||
//
|
||||
// Match count :: In general a useful field for humans and machines.
|
||||
//
|
||||
// While we are EAP, feel free to drastically change this based on
|
||||
// feedback. After that adjusting these columns (including order) may
|
||||
// break customer workflows.
|
||||
|
||||
if ok, err := w.writeHeader("content"); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
if err := w.w.WriteHeader(
|
||||
"repository",
|
||||
"revision",
|
||||
"file_path",
|
||||
"match_count",
|
||||
"first_match_url",
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
firstMatchURL := *w.host
|
||||
firstMatchURL.Path = fm.File.URLAtCommit().Path
|
||||
|
||||
if queryParam, ok := firstMatchRawQuery(fm.ChunkMatches); ok {
|
||||
firstMatchURL.RawQuery = queryParam
|
||||
}
|
||||
|
||||
return w.w.WriteRow(
|
||||
// repository
|
||||
string(fm.Repo.Name),
|
||||
|
||||
// revision
|
||||
string(fm.CommitID),
|
||||
|
||||
// file_path
|
||||
fm.Path,
|
||||
|
||||
// match_count
|
||||
strconv.Itoa(fm.ChunkMatches.MatchCount()),
|
||||
|
||||
// first_match_url
|
||||
firstMatchURL.String(),
|
||||
)
|
||||
}
|
||||
|
||||
// firstMatchRawQuery returns the raw query parameter for the location of the
|
||||
// first match. This is what is appended to the sourcegraph URL when clicking
|
||||
// on a search result. eg if the match is on line 11 it is "L11". If it is
|
||||
// multiline to line 13 it will be L11-13.
|
||||
func firstMatchRawQuery(cms result.ChunkMatches) (string, bool) {
|
||||
cm, ok := minChunkMatch(cms)
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
r, ok := minRange(cm.Ranges)
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
|
||||
// TODO validate how we use r.End. It is documented to be [Start, End) but
|
||||
// that would be weird for line numbers.
|
||||
|
||||
// Note: Range.Line is 0-based but our UX is 1-based for line.
|
||||
if r.Start.Line != r.End.Line {
|
||||
return fmt.Sprintf("L%d-%d", r.Start.Line+1, r.End.Line+1), true
|
||||
}
|
||||
return fmt.Sprintf("L%d", r.Start.Line+1), true
|
||||
}
|
||||
|
||||
func minChunkMatch(cms result.ChunkMatches) (result.ChunkMatch, bool) {
|
||||
if len(cms) == 0 {
|
||||
return result.ChunkMatch{}, false
|
||||
}
|
||||
min := cms[0]
|
||||
for _, cm := range cms[1:] {
|
||||
if cm.ContentStart.Line < min.ContentStart.Line {
|
||||
min = cm
|
||||
}
|
||||
}
|
||||
return min, true
|
||||
}
|
||||
|
||||
func minRange(ranges result.Ranges) (result.Range, bool) {
|
||||
if len(ranges) == 0 {
|
||||
return result.Range{}, false
|
||||
}
|
||||
min := ranges[0]
|
||||
for _, r := range ranges[1:] {
|
||||
if r.Start.Offset < min.Start.Offset || (r.Start.Offset == min.Start.Offset && r.End.Offset < min.End.Offset) {
|
||||
min = r
|
||||
}
|
||||
}
|
||||
return min, true
|
||||
}
|
||||
|
||||
func (w *MatchCSVWriter) writeHeader(typ string) (bool, error) {
|
||||
if w.headerTyp == "" {
|
||||
w.headerTyp = typ
|
||||
return true, nil
|
||||
}
|
||||
if w.headerTyp != typ {
|
||||
return false, errors.Errorf("cant write result type %q since we have already written %q", typ, w.headerTyp)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
@ -3,19 +3,19 @@ package service
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/search"
|
||||
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/uploadstore"
|
||||
"github.com/sourcegraph/sourcegraph/internal/search/result"
|
||||
"github.com/sourcegraph/sourcegraph/internal/search/streaming/http"
|
||||
)
|
||||
|
||||
// NewJSONWriter creates a MatchJSONWriter which appends matches to a JSON array
|
||||
// and uploads them to the object store once the internal buffer size has
|
||||
// reached 100 MiB or Flush() is called. The object key combines a prefix with
|
||||
// the shard number, except for the first shard where the shard number is
|
||||
// excluded.
|
||||
// omitted.
|
||||
func NewJSONWriter(ctx context.Context, store uploadstore.Store, prefix string) (*MatchJSONWriter, error) {
|
||||
blobUploader := &blobUploader{
|
||||
ctx: ctx,
|
||||
@ -25,11 +25,11 @@ func NewJSONWriter(ctx context.Context, store uploadstore.Store, prefix string)
|
||||
}
|
||||
|
||||
return &MatchJSONWriter{
|
||||
w: http.NewJSONArrayBuf(1024*1024*100, blobUploader.write)}, nil
|
||||
w: newBufferedWriter(1024*1024*100, blobUploader.write)}, nil
|
||||
}
|
||||
|
||||
type MatchJSONWriter struct {
|
||||
w *http.JSONArrayBuf
|
||||
w *bufferedWriter
|
||||
}
|
||||
|
||||
func (m MatchJSONWriter) Flush() error {
|
||||
@ -66,3 +66,56 @@ func (b *blobUploader) write(p []byte) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type bufferedWriter struct {
|
||||
flushSize int
|
||||
buf bytes.Buffer
|
||||
write func([]byte) error
|
||||
}
|
||||
|
||||
func newBufferedWriter(flushSize int, write func([]byte) error) *bufferedWriter {
|
||||
b := &bufferedWriter{
|
||||
flushSize: flushSize,
|
||||
write: write,
|
||||
}
|
||||
// Grow the buffer to reduce the number of small allocations caused by
|
||||
// repeatedly growing the buffer. We expect most queries to return a fraction of
|
||||
// flushSize per revision.
|
||||
b.buf.Grow(min(flushSize, 1024*1024)) // 1 MiB
|
||||
return b
|
||||
}
|
||||
|
||||
// Append marshals v and adds it to the buffer. If the size of the buffer
|
||||
// exceeds flushSize the buffer is written out.
|
||||
func (j *bufferedWriter) Append(v any) error {
|
||||
oldLen := j.buf.Len()
|
||||
|
||||
enc := json.NewEncoder(&j.buf)
|
||||
if err := enc.Encode(v); err != nil {
|
||||
// Reset the buffer to where it was before failing to marshal
|
||||
j.buf.Truncate(oldLen)
|
||||
return err
|
||||
}
|
||||
|
||||
if j.buf.Len() >= j.flushSize {
|
||||
return j.Flush()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush writes and resets the buffer if there is data to write.
|
||||
func (j *bufferedWriter) Flush() error {
|
||||
if j.buf.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := j.buf.Bytes()
|
||||
j.buf.Reset()
|
||||
|
||||
return j.write(buf)
|
||||
}
|
||||
|
||||
func (j *bufferedWriter) Len() int {
|
||||
return j.buf.Len()
|
||||
}
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hexops/autogold/v2"
|
||||
@ -10,6 +12,9 @@ import (
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/search/result"
|
||||
"github.com/sourcegraph/sourcegraph/internal/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/uploadstore/mocks"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
"github.com/sourcegraph/sourcegraph/lib/iterator"
|
||||
)
|
||||
|
||||
func TestMatchJsonWriter(t *testing.T) {
|
||||
@ -44,7 +49,9 @@ func TestMatchJsonWriter(t *testing.T) {
|
||||
blobBytes, err := io.ReadAll(blob)
|
||||
require.NoError(t, err)
|
||||
|
||||
autogold.Expect(`[{"type":"content","path":"internal/search.go","repositoryID":1,"repository":"repo","hunks":null,"chunkMatches":[{"content":"","contentStart":{"offset":0,"line":0,"column":0},"ranges":[{"start":{"offset":0,"line":18,"column":0},"end":{"offset":0,"line":18,"column":0}}]},{"content":"","contentStart":{"offset":0,"line":0,"column":0},"ranges":[{"start":{"offset":0,"line":27,"column":0},"end":{"offset":0,"line":27,"column":0}}]}],"language":"Go"},{"type":"content","path":"internal/service.go","repositoryID":1,"repository":"repo","hunks":null,"chunkMatches":[{"content":"","contentStart":{"offset":0,"line":0,"column":0},"ranges":[{"start":{"offset":0,"line":3,"column":0},"end":{"offset":0,"line":3,"column":0}}]},{"content":"","contentStart":{"offset":0,"line":0,"column":0},"ranges":[{"start":{"offset":0,"line":7,"column":0},"end":{"offset":0,"line":7,"column":0}}]}],"language":"Go"}]`).Equal(t, string(blobBytes))
|
||||
autogold.Expect(`{"type":"content","path":"internal/search.go","repositoryID":1,"repository":"repo","hunks":null,"chunkMatches":[{"content":"","contentStart":{"offset":0,"line":0,"column":0},"ranges":[{"start":{"offset":0,"line":18,"column":0},"end":{"offset":0,"line":18,"column":0}}]},{"content":"","contentStart":{"offset":0,"line":0,"column":0},"ranges":[{"start":{"offset":0,"line":27,"column":0},"end":{"offset":0,"line":27,"column":0}}]}],"language":"Go"}
|
||||
{"type":"content","path":"internal/service.go","repositoryID":1,"repository":"repo","hunks":null,"chunkMatches":[{"content":"","contentStart":{"offset":0,"line":0,"column":0},"ranges":[{"start":{"offset":0,"line":3,"column":0},"end":{"offset":0,"line":3,"column":0}}]},{"content":"","contentStart":{"offset":0,"line":0,"column":0},"ranges":[{"start":{"offset":0,"line":7,"column":0},"end":{"offset":0,"line":7,"column":0}}]}],"language":"Go"}
|
||||
`).Equal(t, string(blobBytes))
|
||||
}
|
||||
|
||||
func mkFileMatch(repo types.MinimalRepo, path string, lineNumbers ...int) *result.FileMatch {
|
||||
@ -66,3 +73,119 @@ func mkFileMatch(repo types.MinimalRepo, path string, lineNumbers ...int) *resul
|
||||
ChunkMatches: hms,
|
||||
}
|
||||
}
|
||||
|
||||
func TestBufferedWriter(t *testing.T) {
|
||||
mockStore := setupMockStore(t)
|
||||
|
||||
uploader := blobUploader{
|
||||
ctx: context.Background(),
|
||||
store: mockStore,
|
||||
prefix: "blob",
|
||||
shard: 1,
|
||||
}
|
||||
|
||||
w := newBufferedWriter(24, uploader.write)
|
||||
|
||||
testData := func(val string) any {
|
||||
return struct{ Key string }{Key: val}
|
||||
}
|
||||
|
||||
err := w.Append(testData("a")) // {"Key":"a"}\n 12 bytes
|
||||
require.NoError(t, err)
|
||||
err = w.Append(testData("b"))
|
||||
require.NoError(t, err)
|
||||
|
||||
// We expect a new file to be created here because we have reached the max blob size.
|
||||
err = w.Append(testData("c"))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = w.Flush()
|
||||
require.NoError(t, err)
|
||||
|
||||
wantFiles := 2
|
||||
iter, err := mockStore.List(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
haveFiles := 0
|
||||
for iter.Next() {
|
||||
haveFiles++
|
||||
}
|
||||
require.Equal(t, wantFiles, haveFiles)
|
||||
|
||||
tc := []struct {
|
||||
wantKey string
|
||||
wantBlob string
|
||||
}{
|
||||
{
|
||||
wantKey: "blob",
|
||||
wantBlob: "{\"Key\":\"a\"}\n{\"Key\":\"b\"}\n",
|
||||
},
|
||||
{
|
||||
wantKey: "blob-2",
|
||||
wantBlob: "{\"Key\":\"c\"}\n",
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range tc {
|
||||
blob, err := mockStore.Get(context.Background(), c.wantKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
blobBytes, err := io.ReadAll(blob)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, c.wantBlob, string(blobBytes))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoUploadIfNotData(t *testing.T) {
|
||||
mockStore := setupMockStore(t)
|
||||
|
||||
w, err := NewJSONWriter(context.Background(), mockStore, "dummy_prefix")
|
||||
require.NoError(t, err)
|
||||
|
||||
// No data written, so no upload should happen.
|
||||
err = w.Flush()
|
||||
require.NoError(t, err)
|
||||
iter, err := mockStore.List(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
|
||||
for iter.Next() {
|
||||
t.Fatal("should not have uploaded anything")
|
||||
}
|
||||
}
|
||||
|
||||
func setupMockStore(t *testing.T) *mocks.MockStore {
|
||||
t.Helper()
|
||||
|
||||
bucket := make(map[string][]byte)
|
||||
|
||||
mockStore := mocks.NewMockStore()
|
||||
mockStore.UploadFunc.SetDefaultHook(func(ctx context.Context, key string, r io.Reader) (int64, error) {
|
||||
b, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
bucket[key] = b
|
||||
|
||||
return int64(len(b)), nil
|
||||
})
|
||||
|
||||
mockStore.ListFunc.SetDefaultHook(func(ctx context.Context, prefix string) (*iterator.Iterator[string], error) {
|
||||
var keys []string
|
||||
for k := range bucket {
|
||||
if strings.HasPrefix(k, prefix) {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
}
|
||||
return iterator.From(keys), nil
|
||||
})
|
||||
|
||||
mockStore.GetFunc.SetDefaultHook(func(ctx context.Context, key string) (io.ReadCloser, error) {
|
||||
if b, ok := bucket[key]; ok {
|
||||
return io.NopCloser(bytes.NewReader(b)), nil
|
||||
}
|
||||
return nil, errors.New("key not found")
|
||||
})
|
||||
|
||||
return mockStore
|
||||
}
|
||||
|
||||
@ -1,9 +1,7 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
@ -14,7 +12,6 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/search/result"
|
||||
types2 "github.com/sourcegraph/sourcegraph/internal/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
|
||||
"github.com/sourcegraph/sourcegraph/lib/errors"
|
||||
"github.com/sourcegraph/sourcegraph/lib/iterator"
|
||||
)
|
||||
@ -96,145 +93,6 @@ type CSVWriter interface {
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// newBlobstoreCSVWriter creates a new BlobstoreCSVWriter which writes a CSV to
|
||||
// the store. BlobstoreCSVWriter takes care of chunking the CSV into blobs of
|
||||
// 100MiB, each with the same header row. Blobs are named {prefix}-{shard}
|
||||
// except for the first blob, which is named {prefix}.
|
||||
//
|
||||
// Data is buffered in memory until the blob reaches the maximum allowed size,
|
||||
// at which point the blob is uploaded to the store.
|
||||
//
|
||||
// The caller is expected to call Close() once and only once after the last call
|
||||
// to WriteRow.
|
||||
func newBlobstoreCSVWriter(ctx context.Context, store uploadstore.Store, prefix string) *BlobstoreCSVWriter {
|
||||
|
||||
c := &BlobstoreCSVWriter{
|
||||
maxBlobSizeBytes: 100 * 1024 * 1024,
|
||||
ctx: ctx,
|
||||
prefix: prefix,
|
||||
store: store,
|
||||
// Start with "1" because we increment it before creating a new file. The second
|
||||
// shard will be called {prefix}-2.
|
||||
shard: 1,
|
||||
}
|
||||
|
||||
c.startNewFile(ctx, prefix)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
type BlobstoreCSVWriter struct {
|
||||
// ctx is the context we use for uploading blobs.
|
||||
ctx context.Context
|
||||
|
||||
maxBlobSizeBytes int64
|
||||
|
||||
prefix string
|
||||
|
||||
w *csv.Writer
|
||||
|
||||
// local buffer for the current blob.
|
||||
buf bytes.Buffer
|
||||
|
||||
store uploadstore.Store
|
||||
|
||||
// header keeps track of the header we write as the first row of a new file.
|
||||
header []string
|
||||
|
||||
// close takes care of flushing w and closing the upload.
|
||||
close func() error
|
||||
|
||||
// n is the total number of bytes we have buffered so far.
|
||||
n int64
|
||||
|
||||
// shard is incremented before we create a new shard.
|
||||
shard int
|
||||
}
|
||||
|
||||
func (c *BlobstoreCSVWriter) WriteHeader(s ...string) error {
|
||||
if c.header == nil {
|
||||
c.header = s
|
||||
}
|
||||
|
||||
// Check that c.header matches s.
|
||||
if len(c.header) != len(s) {
|
||||
return errors.Errorf("header mismatch: %v != %v", c.header, s)
|
||||
}
|
||||
for i := range c.header {
|
||||
if c.header[i] != s[i] {
|
||||
return errors.Errorf("header mismatch: %v != %v", c.header, s)
|
||||
}
|
||||
}
|
||||
|
||||
return c.write(s)
|
||||
}
|
||||
|
||||
func (c *BlobstoreCSVWriter) WriteRow(s ...string) error {
|
||||
// Create new file if we've exceeded the max blob size.
|
||||
if c.n >= c.maxBlobSizeBytes {
|
||||
// Close the current upload.
|
||||
err := c.Close()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error closing upload")
|
||||
}
|
||||
|
||||
c.shard++
|
||||
c.startNewFile(c.ctx, fmt.Sprintf("%s-%d", c.prefix, c.shard))
|
||||
err = c.WriteHeader(c.header...)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error writing header for new file")
|
||||
}
|
||||
}
|
||||
|
||||
return c.write(s)
|
||||
}
|
||||
|
||||
// startNewFile creates a new blob and sets up the CSV writer to write to it.
|
||||
//
|
||||
// The caller is expected to call c.Close() before calling startNewFile if a
|
||||
// previous file was open.
|
||||
func (c *BlobstoreCSVWriter) startNewFile(ctx context.Context, key string) {
|
||||
c.buf = bytes.Buffer{}
|
||||
csvWriter := csv.NewWriter(&c.buf)
|
||||
|
||||
closeFn := func() error {
|
||||
csvWriter.Flush()
|
||||
// Don't upload empty files.
|
||||
if c.buf.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
_, err := c.store.Upload(ctx, key, &c.buf)
|
||||
return err
|
||||
}
|
||||
|
||||
c.w = csvWriter
|
||||
c.close = closeFn
|
||||
c.n = 0
|
||||
}
|
||||
|
||||
// write wraps Write to keep track of the number of bytes written. This is
|
||||
// mainly for test purposes: The CSV writer is buffered (default 4096 bytes),
|
||||
// and we don't have access to the number of bytes in the buffer. In production,
|
||||
// we could just wrap the io.Pipe writer with a counter, ignore the buffer, and
|
||||
// accept that size of the blobs is off by a few kilobytes.
|
||||
func (c *BlobstoreCSVWriter) write(s []string) error {
|
||||
err := c.w.Write(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, field := range s {
|
||||
c.n += int64(len(field))
|
||||
}
|
||||
c.n += int64(len(s)) // len(s)-1 for the commas, +1 for the newline
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *BlobstoreCSVWriter) Close() error {
|
||||
return c.close()
|
||||
}
|
||||
|
||||
// NewSearcherFake is a convenient working implementation of SearchQuery which
|
||||
// always will write results generated from the repoRevs. It expects a query
|
||||
// string which looks like
|
||||
@ -325,7 +183,7 @@ func (s searcherFake) Search(ctx context.Context, r types.RepositoryRevision, w
|
||||
|
||||
return w.Write(&result.FileMatch{
|
||||
File: result.File{
|
||||
Repo: types2.MinimalRepo{Name: api.RepoName(strconv.Itoa(int(r.Repository)))},
|
||||
Repo: types2.MinimalRepo{ID: r.Repository, Name: "repo" + api.RepoName(strconv.Itoa(int(r.Repository)))},
|
||||
CommitID: api.CommitID(r.Revision),
|
||||
Path: "path/to/file.go",
|
||||
},
|
||||
|
||||
@ -1,21 +1,13 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/actor"
|
||||
"github.com/sourcegraph/sourcegraph/internal/search/client"
|
||||
"github.com/sourcegraph/sourcegraph/internal/uploadstore/mocks"
|
||||
"github.com/sourcegraph/sourcegraph/lib/iterator"
|
||||
)
|
||||
|
||||
func TestWrongUser(t *testing.T) {
|
||||
@ -30,142 +22,3 @@ func TestWrongUser(t *testing.T) {
|
||||
_, err := newSearcher.NewSearch(ctx, userID2, "foo")
|
||||
assert.Error(err)
|
||||
}
|
||||
|
||||
func joinStringer[T fmt.Stringer](xs []T) string {
|
||||
var parts []string
|
||||
for _, x := range xs {
|
||||
parts = append(parts, x.String())
|
||||
}
|
||||
return strings.Join(parts, " ")
|
||||
}
|
||||
|
||||
type csvBuffer struct {
|
||||
buf bytes.Buffer
|
||||
header []string
|
||||
}
|
||||
|
||||
func (c *csvBuffer) WriteHeader(header ...string) error {
|
||||
if c.header == nil {
|
||||
c.header = header
|
||||
return c.WriteRow(header...)
|
||||
}
|
||||
if !slices.Equal(c.header, header) {
|
||||
return errors.New("different header passed to WriteHeader")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *csvBuffer) WriteRow(row ...string) error {
|
||||
if len(row) != len(c.header) {
|
||||
return errors.New("row size does not match header size in WriteRow")
|
||||
}
|
||||
_, err := c.buf.WriteString(strings.Join(row, ",") + "\n")
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *csvBuffer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestBlobstoreCSVWriter(t *testing.T) {
|
||||
mockStore := setupMockStore(t)
|
||||
|
||||
csvWriter := newBlobstoreCSVWriter(context.Background(), mockStore, "blob")
|
||||
csvWriter.maxBlobSizeBytes = 12
|
||||
|
||||
err := csvWriter.WriteHeader("h", "h", "h") // 3 bytes (letters) + 2 bytes (commas) + 1 byte (newline) = 6 bytes
|
||||
require.NoError(t, err)
|
||||
err = csvWriter.WriteRow("a", "a", "a")
|
||||
require.NoError(t, err)
|
||||
// We expect a new file to be created here because we have reached the max blob size.
|
||||
err = csvWriter.WriteRow("b", "b", "b")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = csvWriter.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
wantFiles := 2
|
||||
iter, err := mockStore.List(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
haveFiles := 0
|
||||
for iter.Next() {
|
||||
haveFiles++
|
||||
}
|
||||
require.Equal(t, wantFiles, haveFiles)
|
||||
|
||||
tc := []struct {
|
||||
wantKey string
|
||||
wantBlob []byte
|
||||
}{
|
||||
{
|
||||
wantKey: "blob",
|
||||
wantBlob: []byte("h,h,h\na,a,a\n"),
|
||||
},
|
||||
{
|
||||
wantKey: "blob-2",
|
||||
wantBlob: []byte("h,h,h\nb,b,b\n"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range tc {
|
||||
blob, err := mockStore.Get(context.Background(), c.wantKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
blobBytes, err := io.ReadAll(blob)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, c.wantBlob, blobBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoUploadIfNotData(t *testing.T) {
|
||||
mockStore := setupMockStore(t)
|
||||
csvWriter := newBlobstoreCSVWriter(context.Background(), mockStore, "blob")
|
||||
|
||||
// No data written, so no upload should happen.
|
||||
err := csvWriter.Close()
|
||||
require.NoError(t, err)
|
||||
iter, err := mockStore.List(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
|
||||
for iter.Next() {
|
||||
t.Fatal("should not have uploaded anything")
|
||||
}
|
||||
}
|
||||
|
||||
func setupMockStore(t *testing.T) *mocks.MockStore {
|
||||
t.Helper()
|
||||
|
||||
bucket := make(map[string][]byte)
|
||||
|
||||
mockStore := mocks.NewMockStore()
|
||||
mockStore.UploadFunc.SetDefaultHook(func(ctx context.Context, key string, r io.Reader) (int64, error) {
|
||||
b, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
bucket[key] = b
|
||||
|
||||
return int64(len(b)), nil
|
||||
})
|
||||
|
||||
mockStore.ListFunc.SetDefaultHook(func(ctx context.Context, prefix string) (*iterator.Iterator[string], error) {
|
||||
var keys []string
|
||||
for k := range bucket {
|
||||
if strings.HasPrefix(k, prefix) {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
}
|
||||
return iterator.From(keys), nil
|
||||
})
|
||||
|
||||
mockStore.GetFunc.SetDefaultHook(func(ctx context.Context, key string) (io.ReadCloser, error) {
|
||||
if b, ok := bucket[key]; ok {
|
||||
return io.NopCloser(bytes.NewReader(b)), nil
|
||||
}
|
||||
return nil, errors.New("key not found")
|
||||
})
|
||||
|
||||
return mockStore
|
||||
}
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -38,10 +40,9 @@ func TestBackendFake(t *testing.T) {
|
||||
Query: "1@rev1 1@rev2 2@rev3",
|
||||
WantRefSpecs: "RepositoryRevSpec{1@spec} RepositoryRevSpec{2@spec}",
|
||||
WantRepoRevs: "RepositoryRevision{1@rev1} RepositoryRevision{1@rev2} RepositoryRevision{2@rev3}",
|
||||
WantCSV: autogold.Expect(`repository,revision,file_path,match_count,first_match_url
|
||||
1,rev1,path/to/file.go,0,/1@rev1/-/blob/path/to/file.go
|
||||
1,rev2,path/to/file.go,0,/1@rev2/-/blob/path/to/file.go
|
||||
2,rev3,path/to/file.go,0,/2@rev3/-/blob/path/to/file.go
|
||||
WantResults: autogold.Expect(`{"type":"path","path":"path/to/file.go","repositoryID":1,"repository":"repo1","commit":"rev1","language":"Go"}
|
||||
{"type":"path","path":"path/to/file.go","repositoryID":1,"repository":"repo1","commit":"rev2","language":"Go"}
|
||||
{"type":"path","path":"path/to/file.go","repositoryID":2,"repository":"repo2","commit":"rev3","language":"Go"}
|
||||
`),
|
||||
})
|
||||
}
|
||||
@ -50,7 +51,7 @@ type newSearcherTestCase struct {
|
||||
Query string
|
||||
WantRefSpecs string
|
||||
WantRepoRevs string
|
||||
WantCSV autogold.Value
|
||||
WantResults autogold.Value
|
||||
}
|
||||
|
||||
func TestFromSearchClient(t *testing.T) {
|
||||
@ -94,9 +95,8 @@ func TestFromSearchClient(t *testing.T) {
|
||||
Query: "content",
|
||||
WantRefSpecs: "RepositoryRevSpec{1@HEAD} RepositoryRevSpec{2@HEAD} RepositoryRevSpec{3@HEAD}",
|
||||
WantRepoRevs: "RepositoryRevision{1@HEAD} RepositoryRevision{2@HEAD} RepositoryRevision{3@HEAD}",
|
||||
WantCSV: autogold.Expect(`repository,revision,file_path,match_count,first_match_url
|
||||
foo1,commitfoo0,,1,/foo1@commitfoo0/-/blob/?L2
|
||||
bar2,commitbar0,,1,/bar2@commitbar0/-/blob/?L2
|
||||
WantResults: autogold.Expect(`{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo0","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
{"type":"content","path":"","repositoryID":2,"repository":"bar2","commit":"commitbar0","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
`),
|
||||
})
|
||||
|
||||
@ -104,9 +104,8 @@ bar2,commitbar0,,1,/bar2@commitbar0/-/blob/?L2
|
||||
Query: "content type:file",
|
||||
WantRefSpecs: "RepositoryRevSpec{1@HEAD} RepositoryRevSpec{2@HEAD} RepositoryRevSpec{3@HEAD}",
|
||||
WantRepoRevs: "RepositoryRevision{1@HEAD} RepositoryRevision{2@HEAD} RepositoryRevision{3@HEAD}",
|
||||
WantCSV: autogold.Expect(`repository,revision,file_path,match_count,first_match_url
|
||||
foo1,commitfoo0,,1,/foo1@commitfoo0/-/blob/?L2
|
||||
bar2,commitbar0,,1,/bar2@commitbar0/-/blob/?L2
|
||||
WantResults: autogold.Expect(`{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo0","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
{"type":"content","path":"","repositoryID":2,"repository":"bar2","commit":"commitbar0","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
`),
|
||||
})
|
||||
|
||||
@ -114,8 +113,7 @@ bar2,commitbar0,,1,/bar2@commitbar0/-/blob/?L2
|
||||
Query: "repo:foo content",
|
||||
WantRefSpecs: "RepositoryRevSpec{1@HEAD}",
|
||||
WantRepoRevs: "RepositoryRevision{1@HEAD}",
|
||||
WantCSV: autogold.Expect(`repository,revision,file_path,match_count,first_match_url
|
||||
foo1,commitfoo0,,1,/foo1@commitfoo0/-/blob/?L2
|
||||
WantResults: autogold.Expect(`{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo0","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
`),
|
||||
})
|
||||
|
||||
@ -123,8 +121,7 @@ foo1,commitfoo0,,1,/foo1@commitfoo0/-/blob/?L2
|
||||
Query: "repo:foo rev:dev1 content",
|
||||
WantRefSpecs: "RepositoryRevSpec{1@dev1}",
|
||||
WantRepoRevs: "RepositoryRevision{1@dev1}",
|
||||
WantCSV: autogold.Expect(`repository,revision,file_path,match_count,first_match_url
|
||||
foo1,commitfoo1,,1,/foo1@commitfoo1/-/blob/?L2
|
||||
WantResults: autogold.Expect(`{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo1","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
`),
|
||||
})
|
||||
|
||||
@ -132,9 +129,8 @@ foo1,commitfoo1,,1,/foo1@commitfoo1/-/blob/?L2
|
||||
Query: "repo:foo rev:*refs/heads/dev* content",
|
||||
WantRefSpecs: "RepositoryRevSpec{1@*refs/heads/dev*}",
|
||||
WantRepoRevs: "RepositoryRevision{1@dev1} RepositoryRevision{1@dev2}",
|
||||
WantCSV: autogold.Expect(`repository,revision,file_path,match_count,first_match_url
|
||||
foo1,commitfoo1,,1,/foo1@commitfoo1/-/blob/?L2
|
||||
foo1,commitfoo2,,1,/foo1@commitfoo2/-/blob/?L2
|
||||
WantResults: autogold.Expect(`{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo1","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo2","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
`),
|
||||
})
|
||||
|
||||
@ -142,10 +138,9 @@ foo1,commitfoo2,,1,/foo1@commitfoo2/-/blob/?L2
|
||||
Query: "repo:. rev:*refs/heads/dev* content",
|
||||
WantRefSpecs: "RepositoryRevSpec{1@*refs/heads/dev*} RepositoryRevSpec{2@*refs/heads/dev*} RepositoryRevSpec{3@*refs/heads/dev*}",
|
||||
WantRepoRevs: "RepositoryRevision{1@dev1} RepositoryRevision{1@dev2} RepositoryRevision{2@dev1}",
|
||||
WantCSV: autogold.Expect(`repository,revision,file_path,match_count,first_match_url
|
||||
foo1,commitfoo1,,1,/foo1@commitfoo1/-/blob/?L2
|
||||
foo1,commitfoo2,,1,/foo1@commitfoo2/-/blob/?L2
|
||||
bar2,commitbar1,,1,/bar2@commitbar1/-/blob/?L2
|
||||
WantResults: autogold.Expect(`{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo1","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo2","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
{"type":"content","path":"","repositoryID":2,"repository":"bar2","commit":"commitbar1","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
`),
|
||||
})
|
||||
|
||||
@ -153,8 +148,7 @@ bar2,commitbar1,,1,/bar2@commitbar1/-/blob/?L2
|
||||
Query: "repo:foo rev:*refs/heads/dev*:*!refs/heads/dev1 content",
|
||||
WantRefSpecs: "RepositoryRevSpec{1@*refs/heads/dev*:*!refs/heads/dev1}",
|
||||
WantRepoRevs: "RepositoryRevision{1@dev2}",
|
||||
WantCSV: autogold.Expect(`repository,revision,file_path,match_count,first_match_url
|
||||
foo1,commitfoo2,,1,/foo1@commitfoo2/-/blob/?L2
|
||||
WantResults: autogold.Expect(`{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo2","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
`),
|
||||
})
|
||||
|
||||
@ -171,8 +165,7 @@ foo1,commitfoo2,,1,/foo1@commitfoo2/-/blob/?L2
|
||||
Query: "repo:foo rev:dev1:missing content",
|
||||
WantRefSpecs: "RepositoryRevSpec{1@dev1:missing}",
|
||||
WantRepoRevs: "RepositoryRevision{1@dev1}",
|
||||
WantCSV: autogold.Expect(`repository,revision,file_path,match_count,first_match_url
|
||||
foo1,commitfoo1,,1,/foo1@commitfoo1/-/blob/?L2
|
||||
WantResults: autogold.Expect(`{"type":"content","path":"","repositoryID":1,"repository":"foo1","commit":"commitfoo1","hunks":null,"chunkMatches":[{"content":"line1","contentStart":{"offset":0,"line":1,"column":0},"ranges":[{"start":{"offset":1,"line":1,"column":1},"end":{"offset":3,"line":1,"column":3}}]}]}
|
||||
`),
|
||||
})
|
||||
}
|
||||
@ -363,16 +356,32 @@ func testNewSearcher(t *testing.T, ctx context.Context, newSearcher NewSearcher,
|
||||
}
|
||||
assert.Equal(tc.WantRepoRevs, joinStringer(repoRevs))
|
||||
|
||||
// Test Search
|
||||
var csv csvBuffer
|
||||
matchWriter, err := newMatchCSVWriter(&csv)
|
||||
assert.NoError(err)
|
||||
bw := &bufferedWriter{
|
||||
flushSize: 1024,
|
||||
buf: bytes.Buffer{},
|
||||
}
|
||||
|
||||
bw.write = func(p []byte) error {
|
||||
_, err := bw.buf.Write(p)
|
||||
return err
|
||||
}
|
||||
|
||||
matchWriter := MatchJSONWriter{bw}
|
||||
|
||||
// Test Search
|
||||
for _, repoRev := range repoRevs {
|
||||
err := searcher.Search(ctx, repoRev, matchWriter)
|
||||
assert.NoError(err)
|
||||
}
|
||||
if tc.WantCSV != nil {
|
||||
tc.WantCSV.Equal(t, csv.buf.String())
|
||||
if tc.WantResults != nil {
|
||||
tc.WantResults.Equal(t, bw.buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
func joinStringer[T fmt.Stringer](xs []T) string {
|
||||
var parts []string
|
||||
for _, x := range xs {
|
||||
parts = append(parts, x.String())
|
||||
}
|
||||
return strings.Join(parts, " ")
|
||||
}
|
||||
|
||||
@ -63,8 +63,8 @@ type operations struct {
|
||||
cancelSearchJob *observation.Operation
|
||||
getAggregateRepoRevState *observation.Operation
|
||||
|
||||
getSearchJobCSVWriterTo operationWithWriterTo
|
||||
getSearchJobLogsWriterTo operationWithWriterTo
|
||||
getSearchJobResultsWriterTo operationWithWriterTo
|
||||
getSearchJobLogsWriterTo operationWithWriterTo
|
||||
}
|
||||
|
||||
// operationWithWriterTo encodes our pattern around our CSV WriterTo were we
|
||||
@ -109,9 +109,9 @@ func newOperations(observationCtx *observation.Context) *operations {
|
||||
cancelSearchJob: op("CancelSearchJob"),
|
||||
getAggregateRepoRevState: op("GetAggregateRepoRevState"),
|
||||
|
||||
getSearchJobCSVWriterTo: operationWithWriterTo{
|
||||
get: op("GetSearchJobCSVWriterTo"),
|
||||
writerTo: op("GetSearchJobCSVWriterTo.WriteTo"),
|
||||
getSearchJobResultsWriterTo: operationWithWriterTo{
|
||||
get: op("GetSearchJobResultsWriterTo"),
|
||||
writerTo: op("GetSearchJobResultsWriterTo.WriteTo"),
|
||||
},
|
||||
getSearchJobLogsWriterTo: operationWithWriterTo{
|
||||
get: op("GetSearchJobLogsWriterTo"),
|
||||
@ -317,18 +317,15 @@ func (s *Service) DeleteSearchJob(ctx context.Context, id int64) (err error) {
|
||||
return s.store.DeleteExhaustiveSearchJob(ctx, id)
|
||||
}
|
||||
|
||||
// WriteSearchJobCSV copies all CSVs associated with a search job to the given
|
||||
// writer. It returns the number of bytes written and any error encountered.
|
||||
|
||||
// GetSearchJobCSVWriterTo returns a WriterTo which can be called once to
|
||||
// GetSearchJobResultsWriterTo returns a WriterTo which can be called once to
|
||||
// write all CSVs associated with a search job to the given writer for job id.
|
||||
// Note: ctx is used by WriterTo.
|
||||
//
|
||||
// io.WriterTo is a specialization of an io.Reader. We expect callers of this
|
||||
// function to want to write an http response, so we avoid an io.Pipe and
|
||||
// function to want to write a http response, so we avoid an io.Pipe and
|
||||
// instead pass a more direct use.
|
||||
func (s *Service) GetSearchJobCSVWriterTo(parentCtx context.Context, id int64) (_ io.WriterTo, err error) {
|
||||
ctx, _, endObservation := s.operations.getSearchJobCSVWriterTo.get.With(parentCtx, &err, opAttrs(
|
||||
func (s *Service) GetSearchJobResultsWriterTo(parentCtx context.Context, id int64) (_ io.WriterTo, err error) {
|
||||
ctx, _, endObservation := s.operations.getSearchJobResultsWriterTo.get.With(parentCtx, &err, opAttrs(
|
||||
attribute.Int64("id", id)))
|
||||
defer endObservation(1, observation.Args{})
|
||||
|
||||
@ -343,13 +340,13 @@ func (s *Service) GetSearchJobCSVWriterTo(parentCtx context.Context, id int64) (
|
||||
}
|
||||
|
||||
return writerToFunc(func(w io.Writer) (n int64, err error) {
|
||||
ctx, _, endObservation := s.operations.getSearchJobCSVWriterTo.writerTo.With(parentCtx, &err, opAttrs(
|
||||
ctx, _, endObservation := s.operations.getSearchJobResultsWriterTo.writerTo.With(parentCtx, &err, opAttrs(
|
||||
attribute.Int64("id", id)))
|
||||
defer func() {
|
||||
endObservation(1, opAttrs(attribute.Int64("bytesWritten", n)))
|
||||
}()
|
||||
|
||||
return writeSearchJobCSV(ctx, iter, s.uploadStore, w)
|
||||
return writeSearchJobJSON(ctx, iter, s.uploadStore, w)
|
||||
}), nil
|
||||
}
|
||||
|
||||
@ -386,25 +383,11 @@ func (s *Service) GetAggregateRepoRevState(ctx context.Context, id int64) (_ *ty
|
||||
return &stats, nil
|
||||
}
|
||||
|
||||
// discards output from br up until delim is read. If an error is encountered
|
||||
// it is returned. Note: often the error is io.EOF
|
||||
func discardUntil(br *bufio.Reader, delim byte) error {
|
||||
// This function just wraps ReadSlice which will read until delim. If we
|
||||
// get the error ErrBufferFull we didn't find delim since we need to read
|
||||
// more, so we just try again. For every other error (or nil) we can
|
||||
// return it.
|
||||
for {
|
||||
_, err := br.ReadSlice(delim)
|
||||
if err != bufio.ErrBufferFull {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func writeSearchJobCSV(ctx context.Context, iter *iterator.Iterator[string], uploadStore uploadstore.Store, w io.Writer) (int64, error) {
|
||||
func writeSearchJobJSON(ctx context.Context, iter *iterator.Iterator[string], uploadStore uploadstore.Store, w io.Writer) (int64, error) {
|
||||
// keep a single bufio.Reader so we can reuse its buffer.
|
||||
var br bufio.Reader
|
||||
writeKey := func(key string, skipHeader bool) (int64, error) {
|
||||
|
||||
writeKey := func(key string) (int64, error) {
|
||||
rc, err := uploadStore.Get(ctx, key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -413,32 +396,17 @@ func writeSearchJobCSV(ctx context.Context, iter *iterator.Iterator[string], upl
|
||||
|
||||
br.Reset(rc)
|
||||
|
||||
// skip header line
|
||||
if skipHeader {
|
||||
err := discardUntil(&br, '\n')
|
||||
if err == io.EOF {
|
||||
// reached end of file before finding the newline. Write
|
||||
// nothing
|
||||
return 0, nil
|
||||
} else if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return br.WriteTo(w)
|
||||
}
|
||||
|
||||
// For the first blob we want the header, for the rest we don't
|
||||
skipHeader := false
|
||||
var n int64
|
||||
for iter.Next() {
|
||||
key := iter.Current()
|
||||
m, err := writeKey(key, skipHeader)
|
||||
m, err := writeKey(key)
|
||||
n += m
|
||||
if err != nil {
|
||||
return n, errors.Wrapf(err, "writing csv for key %q", key)
|
||||
return n, errors.Wrapf(err, "writing JSON for key %q", key)
|
||||
}
|
||||
skipHeader = true
|
||||
}
|
||||
|
||||
return n, iter.Err()
|
||||
|
||||
@ -12,13 +12,15 @@ import (
|
||||
"github.com/sourcegraph/sourcegraph/lib/iterator"
|
||||
)
|
||||
|
||||
// Test_copyBlobs tests that we concatenate objects from the object store
|
||||
// properly.
|
||||
func Test_copyBlobs(t *testing.T) {
|
||||
keysIter := iterator.From([]string{"a", "b", "c"})
|
||||
|
||||
blobs := map[string]io.Reader{
|
||||
"a": bytes.NewReader([]byte("h/h/h\na/a/a\n")),
|
||||
"b": bytes.NewReader([]byte("h/h/h\nb/b/b\n")),
|
||||
"c": bytes.NewReader([]byte("h/h/h\nc/c/c\n")),
|
||||
"a": bytes.NewReader([]byte("{\"Key\":\"a\"}\n{\"Key\":\"b\"}\n")),
|
||||
"b": bytes.NewReader([]byte("{\"Key\":\"c\"}\n")),
|
||||
"c": bytes.NewReader([]byte("{\"Key\":\"d\"}\n{\"Key\":\"e\"}\n{\"Key\":\"f\"}\n")),
|
||||
}
|
||||
|
||||
blobstore := mocks.NewMockStore()
|
||||
@ -28,10 +30,10 @@ func Test_copyBlobs(t *testing.T) {
|
||||
|
||||
w := &bytes.Buffer{}
|
||||
|
||||
n, err := writeSearchJobCSV(context.Background(), keysIter, blobstore, w)
|
||||
n, err := writeSearchJobJSON(context.Background(), keysIter, blobstore, w)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(24), n)
|
||||
require.Equal(t, int64(72), n)
|
||||
|
||||
want := "h/h/h\na/a/a\nb/b/b\nc/c/c\n"
|
||||
want := "{\"Key\":\"a\"}\n{\"Key\":\"b\"}\n{\"Key\":\"c\"}\n{\"Key\":\"d\"}\n{\"Key\":\"e\"}\n{\"Key\":\"f\"}\n"
|
||||
require.Equal(t, want, w.String())
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user