mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 19:21:50 +00:00
codeintel: Split worker file into worker and processor (#11085)
This commit is contained in:
parent
f4411d8aaa
commit
32f87ec190
252
cmd/precise-code-intel-worker/internal/worker/processor.go
Normal file
252
cmd/precise-code-intel-worker/internal/worker/processor.go
Normal file
@ -0,0 +1,252 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/inconshreveable/log15"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sourcegraph/sourcegraph/cmd/precise-code-intel-worker/internal/correlation"
|
||||
"github.com/sourcegraph/sourcegraph/cmd/precise-code-intel-worker/internal/existence"
|
||||
bundles "github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/client"
|
||||
jsonserializer "github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/serializer/json"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/writer"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/db"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/gitserver"
|
||||
)
|
||||
|
||||
// Processor converts raw uploads into dumps.
|
||||
type Processor interface {
|
||||
Process(ctx context.Context, tx db.DB, upload db.Upload) error
|
||||
}
|
||||
|
||||
type processor struct {
|
||||
bundleManagerClient bundles.BundleManagerClient
|
||||
gitserverClient gitserver.Client
|
||||
}
|
||||
|
||||
// process converts a raw upload into a dump within the given transaction context.
|
||||
func (p *processor) Process(ctx context.Context, tx db.DB, upload db.Upload) (err error) {
|
||||
// Create scratch directory that we can clean on completion/failure
|
||||
name, err := ioutil.TempDir("", "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if cleanupErr := os.RemoveAll(name); cleanupErr != nil {
|
||||
log15.Warn("Failed to remove temporary directory", "path", name, "err", cleanupErr)
|
||||
}
|
||||
}()
|
||||
|
||||
// Pull raw uploaded data from bundle manager
|
||||
filename, err := p.bundleManagerClient.GetUpload(ctx, upload.ID, name)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "bundleManager.GetUpload")
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// Remove upload file on error instead of waiting for it to expire
|
||||
if deleteErr := p.bundleManagerClient.DeleteUpload(ctx, upload.ID); deleteErr != nil {
|
||||
log15.Warn("Failed to delete upload file", "err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Create target file for converted database
|
||||
uuid, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newFilename := filepath.Join(name, uuid.String())
|
||||
|
||||
// Read raw upload and write converted database to newFilename. This process also correlates
|
||||
// and returns the data we need to insert into Postgres to support cross-dump/repo queries.
|
||||
packages, packageReferences, err := convert(
|
||||
ctx,
|
||||
filename,
|
||||
newFilename,
|
||||
upload.ID,
|
||||
upload.Root,
|
||||
func(dirnames []string) (map[string][]string, error) {
|
||||
directoryChildren, err := p.gitserverClient.DirectoryChildren(ctx, tx, upload.RepositoryID, upload.Commit, dirnames)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "gitserverClient.DirectoryChildren")
|
||||
}
|
||||
return directoryChildren, nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// At this point we haven't touched the database. We're going to start a nested transaction
|
||||
// with Postgres savepoints. In the event that something after this point fails, we want to
|
||||
// update the upload record with an error message but do not want to alter any other data in
|
||||
// the database. Rolling back to this savepoint will allow us to discard any other changes
|
||||
// but still commit the transaction as a whole.
|
||||
savepointID, err := tx.Savepoint(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "db.Savepoint")
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if rollbackErr := tx.RollbackToSavepoint(ctx, savepointID); rollbackErr != nil {
|
||||
err = multierror.Append(err, rollbackErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Update package and package reference data to support cross-repo queries.
|
||||
if err := tx.UpdatePackages(ctx, packages); err != nil {
|
||||
return errors.Wrap(err, "db.UpdatePackages")
|
||||
}
|
||||
if err := tx.UpdatePackageReferences(ctx, packageReferences); err != nil {
|
||||
return errors.Wrap(err, "db.UpdatePackageReferences")
|
||||
}
|
||||
|
||||
// Before we mark the upload as complete, we need to delete any existing completed uploads
|
||||
// that have the same repository_id, commit, root, and indexer values. Otherwise the transaction
|
||||
// will fail as these values form a unique constraint.
|
||||
if err := tx.DeleteOverlappingDumps(ctx, upload.RepositoryID, upload.Commit, upload.Root, upload.Indexer); err != nil {
|
||||
return errors.Wrap(err, "db.DeleteOverlappingDumps")
|
||||
}
|
||||
|
||||
// Almost-success: we need to mark this upload as complete at this point as the next step changes
|
||||
// the visibility of the dumps for this repository. This requires that the new dump be available in
|
||||
// the lsif_dumps view, which requires a change of state. In the event of a future failure we can
|
||||
// still roll back to the save point and mark the upload as errored.
|
||||
if err := tx.MarkComplete(ctx, upload.ID); err != nil {
|
||||
return errors.Wrap(err, "db.MarkComplete")
|
||||
}
|
||||
|
||||
// Discover commits around the current tip commit and the commit of this upload. Upsert these
|
||||
// commits into the lsif_commits table, then update the visibility of all dumps for this repository.
|
||||
if err := p.updateCommitsAndVisibility(ctx, tx, upload.RepositoryID, upload.Commit); err != nil {
|
||||
return errors.Wrap(err, "updateCommitsAndVisibility")
|
||||
}
|
||||
|
||||
// Send converted database file to bundle manager
|
||||
if err := p.bundleManagerClient.SendDB(ctx, upload.ID, newFilename); err != nil {
|
||||
return errors.Wrap(err, "bundleManager.SendDB")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateCommits updates the lsif_commits table with the current data known to gitserver, then updates the
|
||||
// visibility of all dumps for the given repository.
|
||||
func (p *processor) updateCommitsAndVisibility(ctx context.Context, db db.DB, repositoryID int, commit string) error {
|
||||
tipCommit, err := p.gitserverClient.Head(ctx, db, repositoryID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "gitserver.Head")
|
||||
}
|
||||
newCommits, err := p.gitserverClient.CommitsNear(ctx, db, repositoryID, tipCommit)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "gitserver.CommitsNear")
|
||||
}
|
||||
|
||||
if tipCommit != commit {
|
||||
// If the tip is ahead of this commit, we also want to discover all of the commits between this
|
||||
// commit and the tip so that we can accurately determine what is visible from the tip. If we
|
||||
// do not do this before the updateDumpsVisibleFromTip call below, no dumps will be reachable
|
||||
// from the tip and all dumps will be invisible.
|
||||
additionalCommits, err := p.gitserverClient.CommitsNear(ctx, db, repositoryID, commit)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "gitserver.CommitsNear")
|
||||
}
|
||||
|
||||
for k, vs := range additionalCommits {
|
||||
newCommits[k] = append(newCommits[k], vs...)
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.UpdateCommits(ctx, repositoryID, newCommits); err != nil {
|
||||
return errors.Wrap(err, "db.UpdateCommits")
|
||||
}
|
||||
|
||||
if err := db.UpdateDumpsVisibleFromTip(ctx, repositoryID, tipCommit); err != nil {
|
||||
return errors.Wrap(err, "db.UpdateDumpsVisibleFromTip")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// convert correlates the raw input data and commits the correlated data to disk.
|
||||
func convert(
|
||||
ctx context.Context,
|
||||
filename string,
|
||||
newFilename string,
|
||||
dumpID int,
|
||||
root string,
|
||||
getChildren existence.GetChildrenFunc,
|
||||
) (_ []types.Package, _ []types.PackageReference, err error) {
|
||||
groupedBundleData, err := correlation.Correlate(filename, dumpID, root, getChildren)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "correlation.Correlate")
|
||||
}
|
||||
|
||||
if err := write(ctx, newFilename, groupedBundleData); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return groupedBundleData.Packages, groupedBundleData.PackageReferences, nil
|
||||
}
|
||||
|
||||
// write commits the correlated data to disk.
|
||||
func write(ctx context.Context, filename string, groupedBundleData *correlation.GroupedBundleData) error {
|
||||
writer, err := writer.NewSQLiteWriter(filename, jsonserializer.New())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := writer.Close(); closeErr != nil {
|
||||
err = multierror.Append(err, closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
writers := []func() error{
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteMeta(ctx, groupedBundleData.LSIFVersion, groupedBundleData.NumResultChunks), "writer.WriteMeta")
|
||||
},
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteDocuments(ctx, groupedBundleData.Documents), "writer.WriteDocuments")
|
||||
},
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteResultChunks(ctx, groupedBundleData.ResultChunks), "writer.WriteResultChunks")
|
||||
},
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteDefinitions(ctx, groupedBundleData.Definitions), "writer.WriteDefinitions")
|
||||
},
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteReferences(ctx, groupedBundleData.References), "writer.WriteReferences")
|
||||
},
|
||||
}
|
||||
|
||||
errs := make(chan error, len(writers))
|
||||
defer close(errs)
|
||||
|
||||
for _, w := range writers {
|
||||
go func(w func() error) { errs <- w() }(w)
|
||||
}
|
||||
|
||||
var writeErr error
|
||||
for i := 0; i < len(writers); i++ {
|
||||
if err := <-errs; err != nil {
|
||||
writeErr = multierror.Append(writeErr, err)
|
||||
}
|
||||
}
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
|
||||
if err := writer.Flush(ctx); err != nil {
|
||||
return errors.Wrap(err, "writer.Flush")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
219
cmd/precise-code-intel-worker/internal/worker/processor_test.go
Normal file
219
cmd/precise-code-intel-worker/internal/worker/processor_test.go
Normal file
@ -0,0 +1,219 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/bloomfilter"
|
||||
bundlemocks "github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/mocks"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/db"
|
||||
dbmocks "github.com/sourcegraph/sourcegraph/internal/codeintel/db/mocks"
|
||||
gitservermocks "github.com/sourcegraph/sourcegraph/internal/codeintel/gitserver/mocks"
|
||||
"github.com/sourcegraph/sourcegraph/internal/sqliteutil"
|
||||
)
|
||||
|
||||
func init() {
|
||||
sqliteutil.SetLocalLibpath()
|
||||
sqliteutil.MustRegisterSqlite3WithPcre()
|
||||
}
|
||||
|
||||
func TestProcess(t *testing.T) {
|
||||
upload := db.Upload{
|
||||
ID: 42,
|
||||
Root: "root/",
|
||||
Commit: makeCommit(1),
|
||||
RepositoryID: 50,
|
||||
Indexer: "lsif-go",
|
||||
}
|
||||
|
||||
mockDB := dbmocks.NewMockDB()
|
||||
bundleManagerClient := bundlemocks.NewMockBundleManagerClient()
|
||||
gitserverClient := gitservermocks.NewMockClient()
|
||||
|
||||
// Give correlation package a valid input dump
|
||||
bundleManagerClient.GetUploadFunc.SetDefaultHook(copyTestDump)
|
||||
|
||||
// Whitelist all files in dump
|
||||
gitserverClient.DirectoryChildrenFunc.SetDefaultReturn(map[string][]string{
|
||||
"": {"foo.go", "bar.go"},
|
||||
}, nil)
|
||||
|
||||
// Set a different tip commit
|
||||
gitserverClient.HeadFunc.SetDefaultReturn(makeCommit(30), nil)
|
||||
|
||||
// Return some ancestors for each commit args
|
||||
gitserverClient.CommitsNearFunc.SetDefaultHook(func(ctx context.Context, db db.DB, repositoryID int, commit string) (map[string][]string, error) {
|
||||
offset, err := strconv.ParseInt(commit, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
commits := map[string][]string{}
|
||||
for i := 0; i < 10; i++ {
|
||||
commits[makeCommit(int(offset)+i)] = []string{makeCommit(int(offset) + i + 1)}
|
||||
}
|
||||
|
||||
return commits, nil
|
||||
})
|
||||
|
||||
processor := &processor{
|
||||
bundleManagerClient: bundleManagerClient,
|
||||
gitserverClient: gitserverClient,
|
||||
}
|
||||
|
||||
err := processor.Process(context.Background(), mockDB, upload)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error processing upload: %s", err)
|
||||
}
|
||||
|
||||
expectedPackages := []types.Package{
|
||||
{DumpID: 42,
|
||||
Scheme: "scheme B",
|
||||
Name: "pkg B",
|
||||
Version: "v1.2.3",
|
||||
},
|
||||
}
|
||||
if len(mockDB.UpdatePackagesFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of UpdatePackages calls. want=%d have=%d", 1, len(mockDB.UpdatePackagesFunc.History()))
|
||||
} else if diff := cmp.Diff(expectedPackages, mockDB.UpdatePackagesFunc.History()[0].Arg1); diff != "" {
|
||||
t.Errorf("unexpected UpdatePackagesFuncargs (-want +got):\n%s", diff)
|
||||
}
|
||||
|
||||
filter, err := bloomfilter.CreateFilter([]string{"ident A"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating filter: %s", err)
|
||||
}
|
||||
expectedPackageReferences := []types.PackageReference{
|
||||
{DumpID: 42,
|
||||
Scheme: "scheme A",
|
||||
Name: "pkg A",
|
||||
Version: "v0.1.0",
|
||||
Filter: filter,
|
||||
},
|
||||
}
|
||||
if len(mockDB.UpdatePackageReferencesFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of UpdatePackageReferences calls. want=%d have=%d", 1, len(mockDB.UpdatePackageReferencesFunc.History()))
|
||||
} else if diff := cmp.Diff(expectedPackageReferences, mockDB.UpdatePackageReferencesFunc.History()[0].Arg1); diff != "" {
|
||||
t.Errorf("unexpected UpdatePackageReferencesFunc args (-want +got):\n%s", diff)
|
||||
}
|
||||
|
||||
if len(mockDB.DeleteOverlappingDumpsFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of DeleteOverlappingDumps calls. want=%d have=%d", 1, len(mockDB.DeleteOverlappingDumpsFunc.History()))
|
||||
} else if mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg1 != 50 {
|
||||
t.Errorf("unexpected value for repository id. want=%d have=%d", 50, mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg1)
|
||||
} else if mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg2 != makeCommit(1) {
|
||||
t.Errorf("unexpected value for commit. want=%s have=%s", makeCommit(1), mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg2)
|
||||
} else if mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg3 != "root/" {
|
||||
t.Errorf("unexpected value for root. want=%s have=%s", "root/", mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg3)
|
||||
} else if mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg4 != "lsif-go" {
|
||||
t.Errorf("unexpected value for indexer. want=%s have=%s", "lsif-go", mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg4)
|
||||
}
|
||||
|
||||
offsets := []int{1, 30}
|
||||
expectedCommits := map[string][]string{}
|
||||
for i := 0; i < 10; i++ {
|
||||
for _, offset := range offsets {
|
||||
expectedCommits[makeCommit(offset+i)] = []string{makeCommit(offset + i + 1)}
|
||||
}
|
||||
}
|
||||
if len(mockDB.UpdateCommitsFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of update UpdateCommits calls. want=%d have=%d", 1, len(mockDB.UpdateCommitsFunc.History()))
|
||||
} else if diff := cmp.Diff(expectedCommits, mockDB.UpdateCommitsFunc.History()[0].Arg2); diff != "" {
|
||||
t.Errorf("unexpected update UpdateCommitsFunc args (-want +got):\n%s", diff)
|
||||
}
|
||||
|
||||
if len(mockDB.UpdateDumpsVisibleFromTipFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of UpdateDumpsVisibleFromTip calls. want=%d have=%d", 1, len(mockDB.UpdateDumpsVisibleFromTipFunc.History()))
|
||||
} else if mockDB.UpdateDumpsVisibleFromTipFunc.History()[0].Arg1 != 50 {
|
||||
t.Errorf("unexpected value for repository id. want=%d have=%d", 50, mockDB.UpdateDumpsVisibleFromTipFunc.History()[0].Arg1)
|
||||
} else if mockDB.UpdateDumpsVisibleFromTipFunc.History()[0].Arg2 != makeCommit(30) {
|
||||
t.Errorf("unexpected value for tip commit. want=%s have=%s", makeCommit(30), mockDB.UpdateDumpsVisibleFromTipFunc.History()[0].Arg2)
|
||||
}
|
||||
|
||||
if len(bundleManagerClient.SendDBFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of SendDB calls. want=%d have=%d", 1, len(bundleManagerClient.SendDBFunc.History()))
|
||||
} else if bundleManagerClient.SendDBFunc.History()[0].Arg1 != 42 {
|
||||
t.Errorf("unexpected SendDBFunc args. want=%d have=%d", 42, bundleManagerClient.SendDBFunc.History()[0].Arg1)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessError(t *testing.T) {
|
||||
upload := db.Upload{
|
||||
ID: 42,
|
||||
Root: "root/",
|
||||
Commit: makeCommit(1),
|
||||
RepositoryID: 50,
|
||||
Indexer: "lsif-go",
|
||||
}
|
||||
|
||||
mockDB := dbmocks.NewMockDB()
|
||||
bundleManagerClient := bundlemocks.NewMockBundleManagerClient()
|
||||
gitserverClient := gitservermocks.NewMockClient()
|
||||
|
||||
// Give correlation package a valid input dump
|
||||
bundleManagerClient.GetUploadFunc.SetDefaultHook(copyTestDump)
|
||||
|
||||
// Set a different tip commit
|
||||
gitserverClient.HeadFunc.SetDefaultReturn("", fmt.Errorf("uh-oh!"))
|
||||
|
||||
processor := &processor{
|
||||
bundleManagerClient: bundleManagerClient,
|
||||
gitserverClient: gitserverClient,
|
||||
}
|
||||
|
||||
err := processor.Process(context.Background(), mockDB, upload)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected nil error processing upload")
|
||||
} else if !strings.Contains(err.Error(), "uh-oh!") {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
if len(mockDB.RollbackToSavepointFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of RollbackToLastSavepoint calls. want=%d have=%d", 1, len(mockDB.RollbackToSavepointFunc.History()))
|
||||
}
|
||||
|
||||
if len(bundleManagerClient.DeleteUploadFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of DeleteUpload calls. want=%d have=%d", 1, len(mockDB.RollbackToSavepointFunc.History()))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//
|
||||
//
|
||||
|
||||
func makeCommit(i int) string {
|
||||
return fmt.Sprintf("%040d", i)
|
||||
}
|
||||
|
||||
func copyTestDump(ctx context.Context, uploadID int, dir string) (string, error) {
|
||||
src, err := os.Open("../../testdata/dump1.lsif")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer src.Close()
|
||||
|
||||
filename := filepath.Join(dir, "dump.lsif")
|
||||
dst, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer dst.Close()
|
||||
|
||||
gzipWriter := gzip.NewWriter(dst)
|
||||
defer gzipWriter.Close()
|
||||
|
||||
if _, err := io.Copy(gzipWriter, src); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return filename, nil
|
||||
}
|
||||
@ -2,22 +2,12 @@ package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/inconshreveable/log15"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sourcegraph/sourcegraph/cmd/precise-code-intel-worker/internal/correlation"
|
||||
"github.com/sourcegraph/sourcegraph/cmd/precise-code-intel-worker/internal/existence"
|
||||
bundles "github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/client"
|
||||
jsonserializer "github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/serializer/json"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/writer"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/db"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/gitserver"
|
||||
)
|
||||
@ -109,234 +99,3 @@ func (w *Worker) dequeueAndProcess(ctx context.Context) (_ bool, err error) {
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Processor converts raw uploads into dumps.
|
||||
type Processor interface {
|
||||
Process(ctx context.Context, tx db.DB, upload db.Upload) error
|
||||
}
|
||||
|
||||
type processor struct {
|
||||
bundleManagerClient bundles.BundleManagerClient
|
||||
gitserverClient gitserver.Client
|
||||
}
|
||||
|
||||
// process converts a raw upload into a dump within the given transaction context.
|
||||
func (p *processor) Process(ctx context.Context, tx db.DB, upload db.Upload) (err error) {
|
||||
// Create scratch directory that we can clean on completion/failure
|
||||
name, err := ioutil.TempDir("", "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if cleanupErr := os.RemoveAll(name); cleanupErr != nil {
|
||||
log15.Warn("Failed to remove temporary directory", "path", name, "err", cleanupErr)
|
||||
}
|
||||
}()
|
||||
|
||||
// Pull raw uploaded data from bundle manager
|
||||
filename, err := p.bundleManagerClient.GetUpload(ctx, upload.ID, name)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "bundleManager.GetUpload")
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// Remove upload file on error instead of waiting for it to expire
|
||||
if deleteErr := p.bundleManagerClient.DeleteUpload(ctx, upload.ID); deleteErr != nil {
|
||||
log15.Warn("Failed to delete upload file", "err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Create target file for converted database
|
||||
uuid, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newFilename := filepath.Join(name, uuid.String())
|
||||
|
||||
// Read raw upload and write converted database to newFilename. This process also correlates
|
||||
// and returns the data we need to insert into Postgres to support cross-dump/repo queries.
|
||||
packages, packageReferences, err := convert(
|
||||
ctx,
|
||||
filename,
|
||||
newFilename,
|
||||
upload.ID,
|
||||
upload.Root,
|
||||
func(dirnames []string) (map[string][]string, error) {
|
||||
directoryChildren, err := p.gitserverClient.DirectoryChildren(ctx, tx, upload.RepositoryID, upload.Commit, dirnames)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "gitserverClient.DirectoryChildren")
|
||||
}
|
||||
return directoryChildren, nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// At this point we haven't touched the database. We're going to start a nested transaction
|
||||
// with Postgres savepoints. In the event that something after this point fails, we want to
|
||||
// update the upload record with an error message but do not want to alter any other data in
|
||||
// the database. Rolling back to this savepoint will allow us to discard any other changes
|
||||
// but still commit the transaction as a whole.
|
||||
savepointID, err := tx.Savepoint(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "db.Savepoint")
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if rollbackErr := tx.RollbackToSavepoint(ctx, savepointID); rollbackErr != nil {
|
||||
err = multierror.Append(err, rollbackErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Update package and package reference data to support cross-repo queries.
|
||||
if err := tx.UpdatePackages(ctx, packages); err != nil {
|
||||
return errors.Wrap(err, "db.UpdatePackages")
|
||||
}
|
||||
if err := tx.UpdatePackageReferences(ctx, packageReferences); err != nil {
|
||||
return errors.Wrap(err, "db.UpdatePackageReferences")
|
||||
}
|
||||
|
||||
// Before we mark the upload as complete, we need to delete any existing completed uploads
|
||||
// that have the same repository_id, commit, root, and indexer values. Otherwise the transaction
|
||||
// will fail as these values form a unique constraint.
|
||||
if err := tx.DeleteOverlappingDumps(ctx, upload.RepositoryID, upload.Commit, upload.Root, upload.Indexer); err != nil {
|
||||
return errors.Wrap(err, "db.DeleteOverlappingDumps")
|
||||
}
|
||||
|
||||
// Almost-success: we need to mark this upload as complete at this point as the next step changes
|
||||
// the visibility of the dumps for this repository. This requires that the new dump be available in
|
||||
// the lsif_dumps view, which requires a change of state. In the event of a future failure we can
|
||||
// still roll back to the save point and mark the upload as errored.
|
||||
if err := tx.MarkComplete(ctx, upload.ID); err != nil {
|
||||
return errors.Wrap(err, "db.MarkComplete")
|
||||
}
|
||||
|
||||
// Discover commits around the current tip commit and the commit of this upload. Upsert these
|
||||
// commits into the lsif_commits table, then update the visibility of all dumps for this repository.
|
||||
if err := p.updateCommitsAndVisibility(ctx, tx, upload.RepositoryID, upload.Commit); err != nil {
|
||||
return errors.Wrap(err, "updateCommitsAndVisibility")
|
||||
}
|
||||
|
||||
// Send converted database file to bundle manager
|
||||
if err := p.bundleManagerClient.SendDB(ctx, upload.ID, newFilename); err != nil {
|
||||
return errors.Wrap(err, "bundleManager.SendDB")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateCommits updates the lsif_commits table with the current data known to gitserver, then updates the
|
||||
// visibility of all dumps for the given repository.
|
||||
func (p *processor) updateCommitsAndVisibility(ctx context.Context, db db.DB, repositoryID int, commit string) error {
|
||||
tipCommit, err := p.gitserverClient.Head(ctx, db, repositoryID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "gitserver.Head")
|
||||
}
|
||||
newCommits, err := p.gitserverClient.CommitsNear(ctx, db, repositoryID, tipCommit)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "gitserver.CommitsNear")
|
||||
}
|
||||
|
||||
if tipCommit != commit {
|
||||
// If the tip is ahead of this commit, we also want to discover all of the commits between this
|
||||
// commit and the tip so that we can accurately determine what is visible from the tip. If we
|
||||
// do not do this before the updateDumpsVisibleFromTip call below, no dumps will be reachable
|
||||
// from the tip and all dumps will be invisible.
|
||||
additionalCommits, err := p.gitserverClient.CommitsNear(ctx, db, repositoryID, commit)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "gitserver.CommitsNear")
|
||||
}
|
||||
|
||||
for k, vs := range additionalCommits {
|
||||
newCommits[k] = append(newCommits[k], vs...)
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.UpdateCommits(ctx, repositoryID, newCommits); err != nil {
|
||||
return errors.Wrap(err, "db.UpdateCommits")
|
||||
}
|
||||
|
||||
if err := db.UpdateDumpsVisibleFromTip(ctx, repositoryID, tipCommit); err != nil {
|
||||
return errors.Wrap(err, "db.UpdateDumpsVisibleFromTip")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// convert correlates the raw input data and commits the correlated data to disk.
|
||||
func convert(
|
||||
ctx context.Context,
|
||||
filename string,
|
||||
newFilename string,
|
||||
dumpID int,
|
||||
root string,
|
||||
getChildren existence.GetChildrenFunc,
|
||||
) (_ []types.Package, _ []types.PackageReference, err error) {
|
||||
groupedBundleData, err := correlation.Correlate(filename, dumpID, root, getChildren)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "correlation.Correlate")
|
||||
}
|
||||
|
||||
if err := write(ctx, newFilename, groupedBundleData); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return groupedBundleData.Packages, groupedBundleData.PackageReferences, nil
|
||||
}
|
||||
|
||||
// write commits the correlated data to disk.
|
||||
func write(ctx context.Context, filename string, groupedBundleData *correlation.GroupedBundleData) error {
|
||||
writer, err := writer.NewSQLiteWriter(filename, jsonserializer.New())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := writer.Close(); closeErr != nil {
|
||||
err = multierror.Append(err, closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
writers := []func() error{
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteMeta(ctx, groupedBundleData.LSIFVersion, groupedBundleData.NumResultChunks), "writer.WriteMeta")
|
||||
},
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteDocuments(ctx, groupedBundleData.Documents), "writer.WriteDocuments")
|
||||
},
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteResultChunks(ctx, groupedBundleData.ResultChunks), "writer.WriteResultChunks")
|
||||
},
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteDefinitions(ctx, groupedBundleData.Definitions), "writer.WriteDefinitions")
|
||||
},
|
||||
func() error {
|
||||
return errors.Wrap(writer.WriteReferences(ctx, groupedBundleData.References), "writer.WriteReferences")
|
||||
},
|
||||
}
|
||||
|
||||
errs := make(chan error, len(writers))
|
||||
defer close(errs)
|
||||
|
||||
for _, w := range writers {
|
||||
go func(w func() error) { errs <- w() }(w)
|
||||
}
|
||||
|
||||
var writeErr error
|
||||
for i := 0; i < len(writers); i++ {
|
||||
if err := <-errs; err != nil {
|
||||
writeErr = multierror.Append(writeErr, err)
|
||||
}
|
||||
}
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
|
||||
if err := writer.Flush(ctx); err != nil {
|
||||
return errors.Wrap(err, "writer.Flush")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1,34 +1,19 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/inconshreveable/log15"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/bloomfilter"
|
||||
bundlemocks "github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/mocks"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/bundles/types"
|
||||
"github.com/sourcegraph/sourcegraph/internal/codeintel/db"
|
||||
dbmocks "github.com/sourcegraph/sourcegraph/internal/codeintel/db/mocks"
|
||||
gitservermocks "github.com/sourcegraph/sourcegraph/internal/codeintel/gitserver/mocks"
|
||||
"github.com/sourcegraph/sourcegraph/internal/metrics"
|
||||
"github.com/sourcegraph/sourcegraph/internal/sqliteutil"
|
||||
)
|
||||
|
||||
func init() {
|
||||
sqliteutil.SetLocalLibpath()
|
||||
sqliteutil.MustRegisterSqlite3WithPcre()
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
if !testing.Verbose() {
|
||||
@ -191,195 +176,3 @@ func TestDequeueAndProcessDoneFailure(t *testing.T) {
|
||||
t.Errorf("unexpected error to Done. want=%q have=%q", "db failure", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcess(t *testing.T) {
|
||||
upload := db.Upload{
|
||||
ID: 42,
|
||||
Root: "root/",
|
||||
Commit: makeCommit(1),
|
||||
RepositoryID: 50,
|
||||
Indexer: "lsif-go",
|
||||
}
|
||||
|
||||
mockDB := dbmocks.NewMockDB()
|
||||
bundleManagerClient := bundlemocks.NewMockBundleManagerClient()
|
||||
gitserverClient := gitservermocks.NewMockClient()
|
||||
|
||||
// Give correlation package a valid input dump
|
||||
bundleManagerClient.GetUploadFunc.SetDefaultHook(copyTestDump)
|
||||
|
||||
// Whitelist all files in dump
|
||||
gitserverClient.DirectoryChildrenFunc.SetDefaultReturn(map[string][]string{
|
||||
"": {"foo.go", "bar.go"},
|
||||
}, nil)
|
||||
|
||||
// Set a different tip commit
|
||||
gitserverClient.HeadFunc.SetDefaultReturn(makeCommit(30), nil)
|
||||
|
||||
// Return some ancestors for each commit args
|
||||
gitserverClient.CommitsNearFunc.SetDefaultHook(func(ctx context.Context, db db.DB, repositoryID int, commit string) (map[string][]string, error) {
|
||||
offset, err := strconv.ParseInt(commit, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
commits := map[string][]string{}
|
||||
for i := 0; i < 10; i++ {
|
||||
commits[makeCommit(int(offset)+i)] = []string{makeCommit(int(offset) + i + 1)}
|
||||
}
|
||||
|
||||
return commits, nil
|
||||
})
|
||||
|
||||
processor := &processor{
|
||||
bundleManagerClient: bundleManagerClient,
|
||||
gitserverClient: gitserverClient,
|
||||
}
|
||||
|
||||
err := processor.Process(context.Background(), mockDB, upload)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error processing upload: %s", err)
|
||||
}
|
||||
|
||||
expectedPackages := []types.Package{
|
||||
{DumpID: 42,
|
||||
Scheme: "scheme B",
|
||||
Name: "pkg B",
|
||||
Version: "v1.2.3",
|
||||
},
|
||||
}
|
||||
if len(mockDB.UpdatePackagesFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of UpdatePackages calls. want=%d have=%d", 1, len(mockDB.UpdatePackagesFunc.History()))
|
||||
} else if diff := cmp.Diff(expectedPackages, mockDB.UpdatePackagesFunc.History()[0].Arg1); diff != "" {
|
||||
t.Errorf("unexpected UpdatePackagesFuncargs (-want +got):\n%s", diff)
|
||||
}
|
||||
|
||||
filter, err := bloomfilter.CreateFilter([]string{"ident A"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating filter: %s", err)
|
||||
}
|
||||
expectedPackageReferences := []types.PackageReference{
|
||||
{DumpID: 42,
|
||||
Scheme: "scheme A",
|
||||
Name: "pkg A",
|
||||
Version: "v0.1.0",
|
||||
Filter: filter,
|
||||
},
|
||||
}
|
||||
if len(mockDB.UpdatePackageReferencesFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of UpdatePackageReferences calls. want=%d have=%d", 1, len(mockDB.UpdatePackageReferencesFunc.History()))
|
||||
} else if diff := cmp.Diff(expectedPackageReferences, mockDB.UpdatePackageReferencesFunc.History()[0].Arg1); diff != "" {
|
||||
t.Errorf("unexpected UpdatePackageReferencesFunc args (-want +got):\n%s", diff)
|
||||
}
|
||||
|
||||
if len(mockDB.DeleteOverlappingDumpsFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of DeleteOverlappingDumps calls. want=%d have=%d", 1, len(mockDB.DeleteOverlappingDumpsFunc.History()))
|
||||
} else if mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg1 != 50 {
|
||||
t.Errorf("unexpected value for repository id. want=%d have=%d", 50, mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg1)
|
||||
} else if mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg2 != makeCommit(1) {
|
||||
t.Errorf("unexpected value for commit. want=%s have=%s", makeCommit(1), mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg2)
|
||||
} else if mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg3 != "root/" {
|
||||
t.Errorf("unexpected value for root. want=%s have=%s", "root/", mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg3)
|
||||
} else if mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg4 != "lsif-go" {
|
||||
t.Errorf("unexpected value for indexer. want=%s have=%s", "lsif-go", mockDB.DeleteOverlappingDumpsFunc.History()[0].Arg4)
|
||||
}
|
||||
|
||||
offsets := []int{1, 30}
|
||||
expectedCommits := map[string][]string{}
|
||||
for i := 0; i < 10; i++ {
|
||||
for _, offset := range offsets {
|
||||
expectedCommits[makeCommit(offset+i)] = []string{makeCommit(offset + i + 1)}
|
||||
}
|
||||
}
|
||||
if len(mockDB.UpdateCommitsFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of update UpdateCommits calls. want=%d have=%d", 1, len(mockDB.UpdateCommitsFunc.History()))
|
||||
} else if diff := cmp.Diff(expectedCommits, mockDB.UpdateCommitsFunc.History()[0].Arg2); diff != "" {
|
||||
t.Errorf("unexpected update UpdateCommitsFunc args (-want +got):\n%s", diff)
|
||||
}
|
||||
|
||||
if len(mockDB.UpdateDumpsVisibleFromTipFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of UpdateDumpsVisibleFromTip calls. want=%d have=%d", 1, len(mockDB.UpdateDumpsVisibleFromTipFunc.History()))
|
||||
} else if mockDB.UpdateDumpsVisibleFromTipFunc.History()[0].Arg1 != 50 {
|
||||
t.Errorf("unexpected value for repository id. want=%d have=%d", 50, mockDB.UpdateDumpsVisibleFromTipFunc.History()[0].Arg1)
|
||||
} else if mockDB.UpdateDumpsVisibleFromTipFunc.History()[0].Arg2 != makeCommit(30) {
|
||||
t.Errorf("unexpected value for tip commit. want=%s have=%s", makeCommit(30), mockDB.UpdateDumpsVisibleFromTipFunc.History()[0].Arg2)
|
||||
}
|
||||
|
||||
if len(bundleManagerClient.SendDBFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of SendDB calls. want=%d have=%d", 1, len(bundleManagerClient.SendDBFunc.History()))
|
||||
} else if bundleManagerClient.SendDBFunc.History()[0].Arg1 != 42 {
|
||||
t.Errorf("unexpected SendDBFunc args. want=%d have=%d", 42, bundleManagerClient.SendDBFunc.History()[0].Arg1)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessError(t *testing.T) {
|
||||
upload := db.Upload{
|
||||
ID: 42,
|
||||
Root: "root/",
|
||||
Commit: makeCommit(1),
|
||||
RepositoryID: 50,
|
||||
Indexer: "lsif-go",
|
||||
}
|
||||
|
||||
mockDB := dbmocks.NewMockDB()
|
||||
bundleManagerClient := bundlemocks.NewMockBundleManagerClient()
|
||||
gitserverClient := gitservermocks.NewMockClient()
|
||||
|
||||
// Give correlation package a valid input dump
|
||||
bundleManagerClient.GetUploadFunc.SetDefaultHook(copyTestDump)
|
||||
|
||||
// Set a different tip commit
|
||||
gitserverClient.HeadFunc.SetDefaultReturn("", fmt.Errorf("uh-oh!"))
|
||||
|
||||
processor := &processor{
|
||||
bundleManagerClient: bundleManagerClient,
|
||||
gitserverClient: gitserverClient,
|
||||
}
|
||||
|
||||
err := processor.Process(context.Background(), mockDB, upload)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected nil error processing upload")
|
||||
} else if !strings.Contains(err.Error(), "uh-oh!") {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
if len(mockDB.RollbackToSavepointFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of RollbackToLastSavepoint calls. want=%d have=%d", 1, len(mockDB.RollbackToSavepointFunc.History()))
|
||||
}
|
||||
|
||||
if len(bundleManagerClient.DeleteUploadFunc.History()) != 1 {
|
||||
t.Errorf("unexpected number of DeleteUpload calls. want=%d have=%d", 1, len(mockDB.RollbackToSavepointFunc.History()))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//
|
||||
//
|
||||
|
||||
func makeCommit(i int) string {
|
||||
return fmt.Sprintf("%040d", i)
|
||||
}
|
||||
|
||||
func copyTestDump(ctx context.Context, uploadID int, dir string) (string, error) {
|
||||
src, err := os.Open("../../testdata/dump1.lsif")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer src.Close()
|
||||
|
||||
filename := filepath.Join(dir, "dump.lsif")
|
||||
dst, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer dst.Close()
|
||||
|
||||
gzipWriter := gzip.NewWriter(dst)
|
||||
defer gzipWriter.Close()
|
||||
|
||||
if _, err := io.Copy(gzipWriter, src); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return filename, nil
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user