codeintel-qa: Rewrite precise-code-intel-tester (#25412)

This commit is contained in:
Eric Fritz 2021-10-04 09:46:24 -05:00 committed by GitHub
parent ddfe44702a
commit 95cbf3c85a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 1036 additions and 1335 deletions

View File

@ -23,6 +23,11 @@ steps:
command:
- .buildkite/vagrant-run.sh sourcegraph-qa-test
- label: ':docker::brain: Code Intel QA'
command:
- dev/ci/test/code-intel/test.sh
artifact_paths: ./*.log
- label: ':docker::arrow_double_up: Sourcegraph Upgrade'
retry:
automatic:
@ -41,11 +46,3 @@ steps:
concurrency: 1
concurrency_group: "cluster-test"
timeout_in_minutes: 30
# code-intel-qa is disabled, see https://github.com/sourcegraph/sourcegraph/issues/25387
# - label: ':docker::brain: Code Intel QA'
# command:
# - .buildkite/vagrant-run.sh sourcegraph-code-intel-test
# artifact_paths: ./*.log
# agents:
# queue: 'baremetal'

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
# This script runs the backend integration tests against a running server.
# This script is invoked by ./dev/ci/run-integration.sh after running an instance.
cd "$(dirname "${BASH_SOURCE[0]}")/../.."
set -ex
echo '--- integration test ./dev/gqltest -long'
go test ./dev/gqltest -long
echo '--- sleep 5s to wait for site configuration to be restored from gqltest'
sleep 5
echo '--- integration test ./dev/authtest -long'
go test ./dev/authtest -long -email "gqltest@sourcegraph.com" -username "gqltest-admin"

View File

@ -1,73 +1,10 @@
#!/usr/bin/env bash
cd "$(dirname "${BASH_SOURCE[0]}")"/../..
# This script runs the backend integration tests against a candidate server image.
cd "$(dirname "${BASH_SOURCE[0]}")/../../"
SG_ROOT=$(pwd)
set -ex
if [ -z "$IMAGE" ]; then
echo "Must specify \$IMAGE."
exit 1
fi
URL="http://localhost:7080"
if curl --output /dev/null --silent --head --fail $URL; then
echo "❌ Can't run a new Sourcegraph instance on $URL because another instance is already running."
echo "❌ The last time this happened, there was a runaway integration test run on the same Buildkite agent and the fix was to delete the pod and rebuild."
exit 1
fi
echo "--- Running a daemonized $IMAGE as the test subject..."
CONTAINER="$(docker container run -d -e GOTRACEBACK=all "$IMAGE")"
function cleanup() {
exit_status=$?
if [ $exit_status -ne 0 ]; then
# Expand the output if our run failed.
echo "^^^ +++"
fi
jobs -p -r | xargs kill
echo "--- server logs"
docker logs --timestamps "$CONTAINER"
echo "--- docker cleanup"
docker container rm -f "$CONTAINER"
docker image rm -f "$IMAGE"
if [ $exit_status -ne 0 ]; then
# This command will fail, so our last step will be expanded. We don't want
# to expand "docker cleanup" so we add in a dummy section.
echo "--- gqltest failed"
echo "See go test section for test runner logs."
fi
}
trap cleanup EXIT
docker exec "$CONTAINER" apk add --no-cache socat
# Connect the server container's port 7080 to localhost:7080 so that integration tests
# can hit it. This is similar to port-forwarding via SSH tunneling, but uses `docker exec`
# as the transport.
socat tcp-listen:7080,reuseaddr,fork system:"docker exec -i $CONTAINER socat stdio 'tcp:localhost:7080'" &
echo "--- Waiting for $URL to be up"
set +e
timeout 120s bash -c "until curl --output /dev/null --silent --head --fail $URL; do
echo Waiting 5s for $URL...
sleep 5
done"
# shellcheck disable=SC2181
if [ $? -ne 0 ]; then
echo "^^^ +++"
echo "$URL was not accessible within 120s. Here's the output of docker inspect and docker logs:"
docker inspect "$CONTAINER"
exit 1
fi
set -e
echo "Waiting for $URL... done"
echo '--- go test ./dev/gqltest -long'
go test ./dev/gqltest -long
echo '--- sleep 5s to wait for site configuration to be restored from gqltest'
sleep 5
echo '--- go test ./dev/authtest -long'
go test ./dev/authtest -long -email "gqltest@sourcegraph.com" -username "gqltest-admin"
# Setup single-server instance and run tests
./dev/ci/run-integration.sh "${SG_ROOT}/dev/ci/backend-integration-against-server.sh"

71
dev/ci/run-integration.sh Executable file
View File

@ -0,0 +1,71 @@
#!/usr/bin/env bash
# This script sets up a Sourcegraph instance for integration testing. This script expects to be
# passed a path to a bash script that runs the actual tests against a running instance. The passed
# script will be passed a single parameter: the target URL from which the instance is accessible.
cd "$(dirname "${BASH_SOURCE[0]}")/../../"
set -ex
if [ -z "$IMAGE" ]; then
echo "Must specify \$IMAGE."
exit 1
fi
URL="http://localhost:7080"
if curl --output /dev/null --silent --head --fail $URL; then
echo "❌ Can't run a new Sourcegraph instance on $URL because another instance is already running."
echo "❌ The last time this happened, there was a runaway integration test run on the same Buildkite agent and the fix was to delete the pod and rebuild."
exit 1
fi
echo "--- Running a daemonized $IMAGE as the test subject..."
CONTAINER="$(docker container run -d -e GOTRACEBACK=all "$IMAGE")"
function cleanup() {
exit_status=$?
if [ $exit_status -ne 0 ]; then
# Expand the output if our run failed.
echo "^^^ +++"
fi
jobs -p -r | xargs kill
echo "--- server logs"
docker logs --timestamps "$CONTAINER"
echo "--- docker cleanup"
docker container rm -f "$CONTAINER"
docker image rm -f "$IMAGE"
if [ $exit_status -ne 0 ]; then
# This command will fail, so our last step will be expanded. We don't want
# to expand "docker cleanup" so we add in a dummy section.
echo "--- integration test failed"
echo "See integration test section for test runner logs."
fi
}
trap cleanup EXIT
docker exec "$CONTAINER" apk add --no-cache socat
# Connect the server container's port 7080 to localhost:7080 so that integration tests
# can hit it. This is similar to port-forwarding via SSH tunneling, but uses `docker exec`
# as the transport.
socat tcp-listen:7080,reuseaddr,fork system:"docker exec -i $CONTAINER socat stdio 'tcp:localhost:7080'" &
echo "--- Waiting for $URL to be up"
set +e
timeout 120s bash -c "until curl --output /dev/null --silent --head --fail $URL; do
echo Waiting 5s for $URL...
sleep 5
done"
# shellcheck disable=SC2181
if [ $? -ne 0 ]; then
echo "^^^ +++"
echo "$URL was not accessible within 120s. Here's the output of docker inspect and docker logs:"
docker inspect "$CONTAINER"
exit 1
fi
set -e
echo "Waiting for $URL... done"
# Run tests against instance
"${1}" "${URL}"

View File

@ -75,7 +75,7 @@ export PERCY_TOKEN=#{ENV['PERCY_TOKEN']}
export CODECOV_TOKEN=#{ENV['CODECOV_TOKEN']}
export CI=#{ENV['CI']}
# Env vars shared by e2e and QA tests - see:
# Env vars shared by e2e and QA tests
export TEST_USER_EMAIL=#{ENV['TEST_USER_EMAIL']}
export TEST_USER_PASSWORD=#{ENV['TEST_USER_PASSWORD']}
export SLOMO=#{ENV['SLOMO']}

View File

@ -0,0 +1,36 @@
#!/usr/bin/env bash
# This script runs the codeintel-qa tests against a running server.
# This script is invoked by ./dev/ci/run-integration.sh after running an instance.
cd "$(dirname "${BASH_SOURCE[0]}")/../../../.."
SG_ROOT=$(pwd)
set -ex
export SOURCEGRAPH_BASE_URL="$1"
echo '--- initializing Sourcegraph instance'
pushd internal/cmd/init-sg || exit 1
go build -o "${SG_ROOT}/init-sg"
popd || exit 1
pushd dev/ci/test/code-intel || exit 1
"${SG_ROOT}/init-sg" initSG
# Disable `-x` to avoid printing secrets
set +x
source /root/.profile
set -x
"${SG_ROOT}/init-sg" addRepos -config repos.json
popd || exit 1
pushd dev/codeintel-qa || exit 1
echo '--- downloading test data from GCS'
./scripts/download.sh
echo '--- integration test ./dev/codeintel-qa/cmd/upload'
go build ./cmd/upload
./upload --timeout=5m
echo '--- integration test ./dev/codeintel-qa/cmd/query'
go build ./cmd/query
./query
popd || exit 1

View File

@ -1,59 +1,16 @@
#!/usr/bin/env bash
# shellcheck disable=SC1091
source /root/.profile
root_dir="$(dirname "${BASH_SOURCE[0]}")/../../../.."
cd "$root_dir"
root_dir=$(pwd)
# This script runs the codeintel-qa test utility against a candidate server image.
cd "$(dirname "${BASH_SOURCE[0]}")/../../../.."
SG_ROOT=$(pwd)
set -ex
dev/ci/test/setup-deps.sh
# Use candidate image built by main pipeline
export IMAGE="us.gcr.io/sourcegraph-dev/server:${CANDIDATE_VERSION}"
# ==========================
# us.gcr.io is a private registry, ensure we can pull
yes | gcloud auth configure-docker
CONTAINER=sourcegraph-server
docker_logs() {
pushd "$root_dir"
LOGFILE=$(docker inspect ${CONTAINER} --format '{{.LogPath}}')
cp "$LOGFILE" $CONTAINER.log
chmod 744 $CONTAINER.log
popd
}
if [[ $VAGRANT_RUN_ENV = "CI" ]]; then
IMAGE=us.gcr.io/sourcegraph-dev/server:$CANDIDATE_VERSION
else
# shellcheck disable=SC2034
IMAGE=sourcegraph/server:insiders
fi
./dev/run-server-image.sh -d --name $CONTAINER
trap docker_logs exit
sleep 15
pushd internal/cmd/init-sg
go build -o /usr/local/bin/init-sg
popd
pushd dev/ci/test/code-intel
init-sg initSG
# # Load variables set up by init-server, disabling `-x` to avoid printing variables
set +x
source /root/.profile
set -x
init-sg addRepos -config repos.json
popd
echo "TEST: Checking Sourcegraph instance is accessible"
curl -f http://localhost:7080
curl -f http://localhost:7080/healthz
echo "TEST: Running tests"
pushd internal/cmd/precise-code-intel-tester
go build
./scripts/download.sh
./precise-code-intel-tester upload
sleep 10
./precise-code-intel-tester query
popd
# Setup single-server instance and run tests
./dev/ci/run-integration.sh "${SG_ROOT}/dev/ci/test/code-intel/test-against-server.sh"

3
dev/codeintel-qa/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/upload
/query
testdata/*

View File

@ -1,4 +1,3 @@
# See https://github.com/sourcegraph/codenotify for documentation.
**/* @efritz
**/* @beyang

View File

@ -45,13 +45,13 @@ Alternatively, generate them by running the following command (this takes much l
Upload the indexes to your the target instance by running the following command:
```
go build && ./precise-code-intel-tester upload
go build ./cmd/upload && ./upload
```
Then run test queries against the target instance by running the following command:
```
go build && ./precise-code-intel-tester query
go build ./cmd/query && ./query
```
## Refreshing indexes

View File

@ -0,0 +1,47 @@
package main
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/sourcegraph/sourcegraph/dev/codeintel-qa/internal"
)
var m sync.Mutex
var durations = map[string][]float64{}
// queryGraphQL performs a GraphQL request and stores its latency not the global durations
// map. If the verbose flag is set, a line with the request's latency is printed.
func queryGraphQL(ctx context.Context, queryName, query string, variables map[string]interface{}, target interface{}) error {
requestStart := time.Now()
if err := internal.GraphQLClient().GraphQL(internal.SourcegraphAccessToken, query, variables, target); err != nil {
return err
}
duration := time.Since(requestStart)
m.Lock()
durations[queryName] = append(durations[queryName], float64(duration)/float64(time.Millisecond))
m.Unlock()
if verbose {
fmt.Printf("[%5s] %s Completed %s request in %s\n", internal.TimeSince(start), internal.EmojiSuccess, queryName, duration)
}
return nil
}
// percentile returns the pth percentile duration of the given query type.
func percentile(queryName string, p float64) time.Duration {
m.Lock()
defer m.Unlock()
queryDurations := durations[queryName]
sort.Float64s(queryDurations)
index := int(float64(len(queryDurations)) * p)
return time.Duration(queryDurations[index]) * time.Millisecond
}

View File

@ -0,0 +1,113 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/sourcegraph/sourcegraph/dev/codeintel-qa/internal"
)
var (
numConcurrentRequests int
checkQueryResult bool
queryReferencesOfReferences bool
verbose bool
start = time.Now()
)
func init() {
flag.IntVar(&numConcurrentRequests, "num-concurrent-requests", 5, "The maximum number of concurrent requests")
flag.BoolVar(&checkQueryResult, "check-query-result", true, "Whether to confirm query results are correct")
flag.BoolVar(&queryReferencesOfReferences, "query-references-of-references", false, "Whether to perform reference operations on test case references")
flag.BoolVar(&verbose, "verbose", false, "Print every request")
}
func main() {
if err := flag.CommandLine.Parse(os.Args[1:]); err != nil {
fmt.Fprintf(os.Stderr, "error: %s\n", err)
os.Exit(1)
}
if err := mainErr(context.Background()); err != nil {
fmt.Printf("%s error: %s\n", internal.EmojiFailure, err.Error())
os.Exit(1)
}
}
type queryFunc func(ctx context.Context) error
func mainErr(ctx context.Context) error {
if err := internal.InitializeGraphQLClient(); err != nil {
return err
}
var wg sync.WaitGroup
var numRequestsFinished uint64
queries := buildQueries()
errCh := make(chan error)
for i := 0; i < numConcurrentRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for fn := range queries {
if err := fn(ctx); err != nil {
errCh <- err
}
atomic.AddUint64(&numRequestsFinished, 1)
}
}()
}
go func() {
wg.Wait()
close(errCh)
}()
loop:
for {
select {
case err, ok := <-errCh:
if ok {
return err
}
break loop
case <-time.After(time.Second):
if verbose {
continue
}
ps := make([]string, 0, len(durations))
for queryName := range durations {
ps = append(ps, fmt.Sprintf(
"queryName=%s\trequests=%d\tp50=%s\tp95=%s\tp99=%s",
queryName,
len(durations[queryName]),
percentile(queryName, 0.50),
percentile(queryName, 0.95),
percentile(queryName, 0.99),
))
}
sort.Strings(ps)
val := atomic.LoadUint64(&numRequestsFinished)
fmt.Printf("[%5s] %s %d queries completed\n\t%s\n", internal.TimeSince(start), internal.EmojiSuccess, val, strings.Join(ps, "\n\t"))
}
}
fmt.Printf("[%5s] %s All %d queries completed\n", internal.TimeSince(start), internal.EmojiSuccess, numRequestsFinished)
return nil
}

View File

@ -0,0 +1,66 @@
package main
import (
"context"
"github.com/cockroachdb/errors"
"github.com/google/go-cmp/cmp"
)
// buildQueries returns a channel that is fed all of the test functions that should be invoked
// as part of the test. This function depends on the flags provided by the user to alter the
// behavior of the testing functions.
func buildQueries() <-chan queryFunc {
fns := make(chan queryFunc)
go func() {
defer close(fns)
for _, testCase := range testCases {
// Definition returns defintion
fns <- makeTestFunc(queryDefinitions, testCase.Definition, []Location{testCase.Definition})
// References return definition
for _, reference := range testCase.References {
fns <- makeTestFunc(queryDefinitions, reference, []Location{testCase.Definition})
}
// Definition returns references
fns <- makeTestFunc(queryReferences, testCase.Definition, testCase.References)
// References return references
if queryReferencesOfReferences {
for _, reference := range testCase.References {
fns <- makeTestFunc(queryReferences, reference, testCase.References)
}
}
}
}()
return fns
}
type testFunc func(ctx context.Context, location Location) ([]Location, error)
// makeTestFunc returns a test function that invokes the given function f with the given
// source, then compares the result against the set of expected locations. This function
// depends on the flags provided by the user to alter the behavior of the testing
// functions.
func makeTestFunc(f testFunc, source Location, expectedLocations []Location) func(ctx context.Context) error {
return func(ctx context.Context) error {
locations, err := f(ctx, source)
if err != nil {
return err
}
if checkQueryResult {
sortLocations(locations)
if diff := cmp.Diff(expectedLocations, locations); diff != "" {
return errors.Errorf("unexpected locations (-want +got):\n%s", diff)
}
}
return nil
}
}

View File

@ -0,0 +1,159 @@
package main
import (
"context"
"sort"
"strings"
)
const definitionsQuery = `
query Definitions($repository: String!, $commit: String!, $path: String!, $line: Int!, $character: Int!) {
repository(name: $repository) {
commit(rev: $commit) {
blob(path: $path) {
lsif {
definitions(line: $line, character: $character) {
` + locationsFragment + `
}
}
}
}
}
}
`
const locationsFragment = `
nodes {
resource {
path
repository {
name
}
commit {
oid
}
}
range {
start {
line
character
}
end {
line
character
}
}
}
pageInfo {
endCursor
}
`
// queryDefinitions returns all of the LSIF definitions for the given location.
func queryDefinitions(ctx context.Context, location Location) (locations []Location, err error) {
variables := map[string]interface{}{
"repository": location.Repo,
"commit": location.Rev,
"path": location.Path,
"line": location.Line,
"character": location.Character,
}
var payload QueryResponse
if err := queryGraphQL(ctx, "CodeIntelTesterDefinitions", definitionsQuery, variables, &payload); err != nil {
return nil, err
}
for _, node := range payload.Data.Repository.Commit.Blob.LSIF.Definitions.Nodes {
locations = append(locations, Location{
Repo: node.Resource.Repository.Name,
Rev: node.Resource.Commit.Oid,
Path: node.Resource.Path,
Line: node.Range.Start.Line,
Character: node.Range.Start.Character,
})
}
return locations, nil
}
const referencesQuery = `
query References($repository: String!, $commit: String!, $path: String!, $line: Int!, $character: Int!, $after: String) {
repository(name: $repository) {
commit(rev: $commit) {
blob(path: $path) {
lsif {
references(line: $line, character: $character, after: $after) {
` + locationsFragment + `
}
}
}
}
}
}
`
// queryReferences returns all of the LSIF references for the given location.
func queryReferences(ctx context.Context, location Location) (locations []Location, err error) {
endCursor := ""
for {
variables := map[string]interface{}{
"repository": location.Repo,
"commit": location.Rev,
"path": location.Path,
"line": location.Line,
"character": location.Character,
}
if endCursor != "" {
variables["after"] = endCursor
}
var payload QueryResponse
if err := queryGraphQL(ctx, "CodeIntelTesterReferences", referencesQuery, variables, &payload); err != nil {
return nil, err
}
for _, node := range payload.Data.Repository.Commit.Blob.LSIF.References.Nodes {
locations = append(locations, Location{
Repo: node.Resource.Repository.Name,
Rev: node.Resource.Commit.Oid,
Path: node.Resource.Path,
Line: node.Range.Start.Line,
Character: node.Range.Start.Character,
})
}
if endCursor = payload.Data.Repository.Commit.Blob.LSIF.References.PageInfo.EndCursor; endCursor == "" {
break
}
}
return locations, nil
}
// sortLocations sorts a slice of Locations by repo, rev, path, line, then character.
func sortLocations(locations []Location) {
sort.Slice(locations, func(i, j int) bool {
return compareLocations(locations[i], locations[j]) < 0
})
}
// Compare returns an integer comparing two locations. The result will be 0 if a == b,
// -1 if a < b, and +1 if a > b.
func compareLocations(a, b Location) int {
fieldComparison := []int{
strings.Compare(a.Repo, b.Repo),
strings.Compare(a.Rev, b.Rev),
strings.Compare(a.Path, b.Path),
a.Line - b.Line,
a.Character - b.Character,
}
for _, cmp := range fieldComparison {
if cmp != 0 {
return cmp
}
}
return 0
}

View File

@ -0,0 +1,28 @@
package main
import (
"os"
"regexp"
)
var indexFilenamePattern = regexp.MustCompile(`^(.+)\.\d+\.([0-9A-Fa-f]{40})\.dump$`)
// commitsByRepo returns a map from repository name to a slice of commits for that repository.
// The repositories and commits are read from the filesystem state of the index directory
// supplied by the user. This method assumes that index files have been downloaded or generated
// locally.
func commitsByRepo() (map[string][]string, error) {
infos, err := os.ReadDir(indexDir)
if err != nil {
return nil, err
}
commitsByRepo := map[string][]string{}
for _, info := range infos {
if matches := indexFilenamePattern.FindStringSubmatch(info.Name()); len(matches) > 0 {
commitsByRepo[matches[1]] = append(commitsByRepo[matches[1]], matches[2])
}
}
return commitsByRepo, nil
}

View File

@ -0,0 +1,81 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"sort"
"time"
"github.com/sourcegraph/sourcegraph/dev/codeintel-qa/internal"
)
var (
numConcurrentUploads int
indexDir string
verbose bool
pollInterval time.Duration
timeout time.Duration
start = time.Now()
)
func init() {
// Default assumes running from the dev/codeintel-qa directory
flag.StringVar(&indexDir, "index-dir", "./testdata/indexes", "The location of the testdata directory")
flag.IntVar(&numConcurrentUploads, "num-concurrent-uploads", 5, "The maximum number of concurrent uploads")
flag.BoolVar(&verbose, "verbose", false, "Display full state from graphql")
flag.DurationVar(&pollInterval, "poll-interval", time.Second*5, "The time to wait between graphql requests")
flag.DurationVar(&timeout, "timeout", 0, "The time it should take to upload and process all targets")
}
func main() {
if err := flag.CommandLine.Parse(os.Args[1:]); err != nil {
fmt.Fprintf(os.Stderr, "error: %s\n", err)
os.Exit(1)
}
ctx := context.Background()
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
if err := mainErr(ctx); err != nil {
fmt.Printf("%s error: %s\n", internal.EmojiFailure, err.Error())
os.Exit(1)
}
}
func mainErr(ctx context.Context) error {
if err := internal.InitializeGraphQLClient(); err != nil {
return err
}
commitsByRepo, err := commitsByRepo()
if err != nil {
return err
}
repoNames := make([]string, 0, len(commitsByRepo))
for name := range commitsByRepo {
repoNames = append(repoNames, name)
}
sort.Strings(repoNames)
limiter := internal.NewLimiter(numConcurrentUploads)
defer limiter.Close()
uploads, err := uploadAll(ctx, commitsByRepo, limiter)
if err != nil {
return err
}
if err := monitor(ctx, repoNames, uploads); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,209 @@
package main
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/sourcegraph/sourcegraph/dev/codeintel-qa/internal"
)
// monitor periodically polls Sourcegraph via the GraphQL API for the status of each
// given repo, as well as the status of each given upload. When there is a change of
// state for a repository, it is printed. The state changes that can occur are:
//
// - An upload fails to process (returns an error)
// - An upload completes processing
// - The last upload for a repository completes processing, but the
// containing repo has a stale commit graph
// - A repository with no pending uploads has a fresh commit graph
func monitor(ctx context.Context, repoNames []string, uploads []uploadMeta) error {
var oldState map[string]repoState
waitMessageDisplayed := make(map[string]struct{}, len(repoNames))
finishedMessageDisplayed := make(map[string]struct{}, len(repoNames))
fmt.Printf("[%5s] %s Waiting for uploads to finish processing\n", internal.TimeSince(start), internal.EmojiLightbulb)
for {
state, err := queryRepoState(ctx, repoNames, uploads)
if err != nil {
return err
}
if verbose {
parts := make([]string, 0, len(repoNames))
for _, repoName := range repoNames {
states := make([]string, 0, len(state[repoName].uploadStates))
for _, uploadState := range state[repoName].uploadStates {
states = append(states, fmt.Sprintf("%s=%-10s", uploadState.upload.commit[:7], uploadState.state))
}
sort.Strings(states)
parts = append(parts, fmt.Sprintf("%s\tstale=%v\t%s", repoName, state[repoName].stale, strings.Join(states, "\t")))
}
fmt.Printf("[%5s] %s\n", internal.TimeSince(start), strings.Join(parts, "\n\t"))
}
numReposCompleted := 0
for repoName, data := range state {
oldData := oldState[repoName]
numUploadsCompleted := 0
for _, uploadState := range data.uploadStates {
if uploadState.state == "ERRORED" {
return fmt.Errorf("failed to process (%s)", uploadState.failure)
}
if uploadState.state == "COMPLETED" {
numUploadsCompleted++
var oldState string
for _, oldUploadState := range oldData.uploadStates {
if oldUploadState.upload.id == uploadState.upload.id {
oldState = oldUploadState.state
}
}
if oldState != "COMPLETED" {
fmt.Printf("[%5s] %s Finished processing index for %s@%s\n", internal.TimeSince(start), internal.EmojiSuccess, repoName, uploadState.upload.commit[:7])
}
} else if uploadState.state != "QUEUED" && uploadState.state != "PROCESSING" {
return fmt.Errorf("unexpected state '%s'", uploadState.state)
}
}
if numUploadsCompleted == len(data.uploadStates) {
if !data.stale {
numReposCompleted++
if _, ok := finishedMessageDisplayed[repoName]; !ok {
finishedMessageDisplayed[repoName] = struct{}{}
fmt.Printf("[%5s] %s Commit graph refreshed for %s\n", internal.TimeSince(start), internal.EmojiSuccess, repoName)
}
} else if _, ok := waitMessageDisplayed[repoName]; !ok {
waitMessageDisplayed[repoName] = struct{}{}
fmt.Printf("[%5s] %s Waiting for commit graph to refresh for %s\n", internal.TimeSince(start), internal.EmojiLightbulb, repoName)
}
}
}
if numReposCompleted == len(repoNames) {
break
}
oldState = state
select {
case <-time.After(pollInterval):
case <-ctx.Done():
return ctx.Err()
}
}
fmt.Printf("[%5s] %s All uploads processed\n", internal.TimeSince(start), internal.EmojiSuccess)
return nil
}
type repoState struct {
stale bool
uploadStates []uploadState
}
type uploadState struct {
upload uploadMeta
state string
failure string
}
// queryRepoState makes a GraphQL request for the given repositories and uploads and
// returns a map from repository names to the state of that repository. Each repository
// state has a flag indicating whether or not its commit graph is stale, and an entry
// for each upload belonging to that repository including that upload's state.
func queryRepoState(ctx context.Context, repoNames []string, uploads []uploadMeta) (map[string]repoState, error) {
uploadIDs := make([]string, 0, len(uploads))
for _, upload := range uploads {
uploadIDs = append(uploadIDs, upload.id)
}
sort.Strings(uploadIDs)
var payload struct{ Data map[string]jsonUploadResult }
if err := internal.GraphQLClient().GraphQL(internal.SourcegraphAccessToken, makeRepoStateQuery(repoNames, uploadIDs), nil, &payload); err != nil {
return nil, err
}
state := make(map[string]repoState, len(repoNames))
for name, data := range payload.Data {
if name[0] == 'r' {
index, _ := strconv.Atoi(name[1:])
repoName := repoNames[index]
state[repoName] = repoState{
stale: data.CommitGraph.Stale,
uploadStates: []uploadState{},
}
}
}
for name, data := range payload.Data {
if name[0] == 'u' {
index, _ := strconv.Atoi(name[1:])
upload := uploads[index]
state[upload.repoName] = repoState{
stale: state[upload.repoName].stale,
uploadStates: append(state[upload.repoName].uploadStates, uploadState{
upload: upload,
state: data.State,
failure: data.Failure,
}),
}
}
}
return state, nil
}
// makeRepoStateQuery constructs a GraphQL query for use by queryRepoState.
func makeRepoStateQuery(repoNames, uploadIDs []string) string {
fragments := make([]string, 0, len(repoNames)+len(uploadIDs))
for i, repoName := range repoNames {
fragments = append(fragments, fmt.Sprintf(repositoryQueryFragment, i, internal.MakeTestRepoName(repoName)))
}
for i, id := range uploadIDs {
fragments = append(fragments, fmt.Sprintf(uploadQueryFragment, i, id))
}
return fmt.Sprintf("{%s}", strings.Join(fragments, "\n"))
}
const repositoryQueryFragment = `
r%d: repository(name: "%s") {
codeIntelligenceCommitGraph {
stale
}
}
`
const uploadQueryFragment = `
u%d: node(id: "%s") {
... on LSIFUpload {
state
failure
}
}
`
type jsonUploadResult struct {
State string `json:"state"`
Failure string `json:"failure"`
CommitGraph jsonCommitGraphResult `json:"codeIntelligenceCommitGraph"`
}
type jsonCommitGraphResult struct {
Stale bool `json:"stale"`
}

View File

@ -0,0 +1,122 @@
package main
import (
"context"
"encoding/json"
"fmt"
"os/exec"
"strings"
"sync"
"github.com/cockroachdb/errors"
"github.com/sourcegraph/sourcegraph/dev/codeintel-qa/internal"
)
type uploadMeta struct {
id string
repoName string
commit string
}
// uploadAll uploads the dumps for the commits present in the given commitsByRepo map.
// Uploads are performed concurrently given the limiter instance as well as the set of
// flags supplied by the user. This function returns a slice of uploadMeta containing
// the graphql identifier of the uploaded resources.
func uploadAll(ctx context.Context, commitsByRepo map[string][]string, limiter *internal.Limiter) ([]uploadMeta, error) {
n := 0
for _, commits := range commitsByRepo {
n += len(commits)
}
var wg sync.WaitGroup
errCh := make(chan error, n)
uploadCh := make(chan uploadMeta, n)
for repoName, commits := range commitsByRepo {
for i, commit := range commits {
wg.Add(1)
go func(repoName, commit, file string) {
defer wg.Done()
if err := limiter.Acquire(ctx); err != nil {
errCh <- err
return
}
defer limiter.Release()
fmt.Printf("[%5s] %s Uploading index for %s@%s\n", internal.TimeSince(start), internal.EmojiLightbulb, repoName, commit[:7])
graphqlID, err := upload(ctx, internal.MakeTestRepoName(repoName), commit, file)
if err != nil {
errCh <- err
return
}
fmt.Printf("[%5s] %s Finished uploading index for %s@%s\n", internal.TimeSince(start), internal.EmojiSuccess, repoName, commit[:7])
uploadCh <- uploadMeta{
id: graphqlID,
repoName: repoName,
commit: commit,
}
}(repoName, commit, fmt.Sprintf("%s.%d.%s.dump", repoName, i, commit))
}
}
go func() {
wg.Wait()
close(errCh)
close(uploadCh)
}()
for err := range errCh {
return nil, err
}
uploads := make([]uploadMeta, 0, n)
for upload := range uploadCh {
uploads = append(uploads, upload)
}
return uploads, nil
}
// upload invokes `src lsif upload` on the host and returns the graphql identifier of
// the uploaded resource.
func upload(ctx context.Context, repoName, commit, file string) (string, error) {
argMap := map[string]string{
"root": "/",
"repo": repoName,
"commit": commit,
"file": file,
}
args := make([]string, 0, len(argMap))
for k, v := range argMap {
args = append(args, fmt.Sprintf("-%s=%s", k, v))
}
cmd := exec.CommandContext(ctx, "src", append([]string{"lsif", "upload", "-json"}, args...)...)
cmd.Dir = indexDir
cmd.Env = []string{
fmt.Sprintf("SRC_ENDPOINT=%s", internal.SourcegraphEndpoint),
fmt.Sprintf("SRC_ACCESS_TOKEN=%s", internal.SourcegraphAccessToken),
}
output, err := cmd.CombinedOutput()
if err != nil {
return "", errors.Wrap(err, fmt.Sprintf("failed to upload index: %s", output))
}
resp := struct {
UploadURL string `json:"uploadUrl"`
}{}
if err := json.Unmarshal(output, &resp); err != nil {
return "", err
}
parts := strings.Split(resp.UploadURL, "/")
return parts[len(parts)-1], nil
}

View File

@ -0,0 +1,7 @@
package internal
const (
EmojiSuccess = "✅"
EmojiFailure = "❌"
EmojiLightbulb = "💡"
)

View File

@ -0,0 +1,8 @@
package internal
import "github.com/sourcegraph/sourcegraph/internal/env"
var (
SourcegraphEndpoint = env.Get("SOURCEGRAPH_BASE_URL", "http://127.0.0.1:3080", "Sourcegraph frontend endpoint")
SourcegraphAccessToken = env.Get("SOURCEGRAPH_SUDO_TOKEN", "", "Sourcegraph access token with sudo privileges")
)

View File

@ -0,0 +1,14 @@
package internal
import "github.com/sourcegraph/sourcegraph/internal/gqltestutil"
var client *gqltestutil.Client
func InitializeGraphQLClient() (err error) {
client, err = gqltestutil.NewClient(SourcegraphEndpoint)
return err
}
func GraphQLClient() *gqltestutil.Client {
return client
}

View File

@ -1,4 +1,4 @@
package util
package internal
import "context"

View File

@ -0,0 +1,17 @@
package internal
import (
"fmt"
"time"
)
// TImeSince returns the time since the given duration rounded down to the nearest second.
func TimeSince(start time.Time) time.Duration {
return time.Since(start) / time.Second * time.Second
}
// MakeTestRepoName returns the given repo name as a fully qualified repository name in the
// sourcegraph-testing GitHub organization.
func MakeTestRepoName(repoName string) string {
return fmt.Sprintf("github.com/%s/%s", "sourcegraph-testing", repoName)
}

View File

@ -1,8 +1,8 @@
#!/usr/bin/env bash
set -eu
cd "$(dirname "${BASH_SOURCE[0]}")/../../../.."
SCRIPTDIR=$(realpath './internal/cmd/precise-code-intel-tester/scripts')
cd "$(dirname "${BASH_SOURCE[0]}")/.."
SCRIPTDIR=$(realpath './scripts')
declare -A REVS=(
# This repository has not been changed

View File

@ -1,8 +1,8 @@
#!/usr/bin/env bash
set -eu
cd "$(dirname "${BASH_SOURCE[0]}")/../../../.."
DATADIR=$(realpath './internal/cmd/precise-code-intel-tester/testdata')
cd "$(dirname "${BASH_SOURCE[0]}")/.."
DATADIR=$(realpath './testdata')
REPODIR="${DATADIR}/repos"
NAME="$1"

View File

@ -4,8 +4,8 @@
export CLOUDSDK_PYTHON=/usr/bin/python3
set -eu
cd "$(dirname "${BASH_SOURCE[0]}")/../../../.."
DATADIR=$(realpath './internal/cmd/precise-code-intel-tester/testdata')
cd "$(dirname "${BASH_SOURCE[0]}")/../"
DATADIR=$(realpath './testdata')
INDEXDIR="${DATADIR}/indexes"
# Ensure target dir exists

View File

@ -1,8 +1,8 @@
#!/usr/bin/env bash
set -eux
cd "$(dirname "${BASH_SOURCE[0]}")/../../../.."
DATADIR=$(realpath './internal/cmd/precise-code-intel-tester/testdata')
cd "$(dirname "${BASH_SOURCE[0]}")/.."
DATADIR=$(realpath './testdata')
REPODIR="${DATADIR}/repos"
INDEXDIR="${DATADIR}/indexes"

View File

@ -1,8 +1,8 @@
#!/usr/bin/env bash
set -eu
cd "$(dirname "${BASH_SOURCE[0]}")/../../../.."
DATADIR=$(realpath './internal/cmd/precise-code-intel-tester/testdata')
cd "$(dirname "${BASH_SOURCE[0]}")/.."
DATADIR=$(realpath './testdata')
INDEXDIR="${DATADIR}/indexes"
# Compress and upload all index files

View File

@ -9,7 +9,7 @@ Services:
Code intelligence-specific code:
- [lib/codeintel](https://github.com/sourcegraph/sourcegraph/tree/main/lib/codeintel)
- [internal/cmd/precise-code-intel-tester](https://github.com/sourcegraph/sourcegraph/tree/main/internal/cmd/precise-code-intel-tester)
- [dev/codeintel-qa](https://github.com/sourcegraph/sourcegraph/tree/main/dev/codeintel-qa)
- [enterprise/internal/codeintel](https://github.com/sourcegraph/sourcegraph/tree/main/enterprise/internal/codeintel)
- [enterprise/cmd/worker/internal/codeintel](https://github.com/sourcegraph/sourcegraph/tree/main/enterprise/cmd/worker/internal/codeintel)
- [enterprise/cmd/frontend/internal/codeintel](https://github.com/sourcegraph/sourcegraph/tree/main/enterprise/cmd/frontend/internal/codeintel)

View File

@ -194,7 +194,7 @@ func GeneratePipeline(c Config) (*bk.Pipeline, error) {
// ops.Append(publishExecutor(c.Time, c.Version))
// }
// Propogate changes elsewhere
// Propagate changes elsewhere
if c.RunType.Is(MainBranch) {
ops.Append(
// wait for all steps to pass

1
go.mod
View File

@ -39,7 +39,6 @@ require (
github.com/dgraph-io/ristretto v0.0.3
github.com/dineshappavoo/basex v0.0.0-20170425072625-481a6f6dc663
github.com/dnaeon/go-vcr v1.2.0
github.com/efritz/pentimento v0.0.0-20190429011147-ade47d831101
github.com/fatih/color v1.12.0
github.com/fatih/structs v1.1.0
github.com/felixge/fgprof v0.9.1

1
go.sum
View File

@ -349,7 +349,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/efritz/pentimento v0.0.0-20190429011147-ade47d831101 h1:RylpU+KNJJNEJIk3o8gZ70uPTlutxaYnikKNPko39LA=
github.com/efritz/pentimento v0.0.0-20190429011147-ade47d831101/go.mod h1:5ALWO82UZwfAtNRUtwzsWimcrcuYzyieTyyXOXrP6EQ=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=

View File

@ -1,2 +0,0 @@
testdata/*
precise-code-intel-tester

View File

@ -1,76 +0,0 @@
package main
import (
"flag"
"fmt"
"os"
"strings"
"github.com/sourcegraph/sourcegraph/internal/env"
)
var (
endpoint = env.Get("SOURCEGRAPH_BASE_URL", "http://127.0.0.1:3080", "Sourcegraph frontend endpoint")
token = env.Get("SOURCEGRAPH_SUDO_TOKEN", "", "Access token")
// Flags
indexDir string
numConcurrentUploads int
numConcurrentRequests int
checkQueryResult bool
queryReferencesOfReferences bool
// Entrypoints
commands = map[string]func() error{
"upload": uploadCommand,
"query": queryCommand,
}
)
func main() {
flag.StringVar(&indexDir, "indexDir", "./testdata/indexes", "The location of the testdata directory") // Assumes running from this directory
flag.IntVar(&numConcurrentUploads, "numConcurrentUploads", 5, "The maximum number of concurrent uploads")
flag.IntVar(&numConcurrentRequests, "numConcurrentRequests", 5, "The maximum number of concurrent requests")
flag.BoolVar(&checkQueryResult, "checkQueryResult", true, "Whether to confirm query results are correct")
flag.BoolVar(&queryReferencesOfReferences, "queryReferencesOfReferences", false, "Whether to perform reference operations on test case references")
if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr, "subcommand (one of %s) is required\n", commandNameList())
os.Exit(1)
}
command, ok := commands[os.Args[1]]
if !ok {
fmt.Fprintf(os.Stderr, "subcommand (one of %s) is required\n", commandNameList())
os.Exit(1)
}
if err := flag.CommandLine.Parse(os.Args[2:]); err != nil {
fmt.Fprintf(os.Stderr, "error: %s\n", err)
os.Exit(1)
}
if err := command(); err != nil {
fmt.Fprintf(os.Stderr, "error: %s\n", err)
os.Exit(1)
}
}
// commandNameList returns a comma-separated list of valid command names.
func commandNameList() string {
var commandNames []string
for name := range commands {
commandNames = append(commandNames, name)
}
var parts []string
for i, name := range commandNames {
if i == len(commandNames)-1 {
name = fmt.Sprintf("or %s", name)
}
parts = append(parts, name)
}
return strings.Join(parts, ", ")
}

View File

@ -1,296 +0,0 @@
package main
import (
"context"
"fmt"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/cockroachdb/errors"
"github.com/google/go-cmp/cmp"
"github.com/sourcegraph/sourcegraph/internal/cmd/precise-code-intel-tester/util"
)
// queryCommand runs the "query" command.
func queryCommand() error {
var fns []util.ParallelFn
for _, f := range queryGenerators {
fns = append(fns, f()...)
}
start := time.Now()
ctx, cleanup := util.SignalSensitiveContext()
defer cleanup()
if err := util.RunParallel(ctx, numConcurrentRequests, fns); err != nil {
return err
}
fmt.Printf("All queries completed in %s\n", time.Since(start))
return nil
}
// queryGenerators is the list of functions that create query test functions.
var queryGenerators = []func() []util.ParallelFn{
referencesFromDefinitionsQueries,
definitionsFromReferencesQueries,
referencesFromReferencesQueries,
}
// referencesFromDefinitionsQueries returns a list of test functions that queries the references of all the test cases definitions.
func referencesFromDefinitionsQueries() []util.ParallelFn {
var fns []util.ParallelFn
for _, testCase := range testCases {
fns = append(fns, makeTestQueryFunction("references", testCase.Definition, testCase.References, queryReferences))
}
return fns
}
// definitionsFromReferencesQueries returns a list of test functions that queries the definitions of all the test cases references.
func definitionsFromReferencesQueries() []util.ParallelFn {
var fns []util.ParallelFn
for _, testCase := range testCases {
for _, reference := range testCase.References {
fns = append(fns, makeTestQueryFunction("definitions", reference, []Location{testCase.Definition}, queryDefinitions))
}
}
return fns
}
// referencesFromReferencesQueries returns a list of test functions that queries the references of all the test cases references.
func referencesFromReferencesQueries() []util.ParallelFn {
if !queryReferencesOfReferences {
return nil
}
var fns []util.ParallelFn
for _, testCase := range testCases {
for _, reference := range testCase.References {
fns = append(fns, makeTestQueryFunction("references", reference, testCase.References, queryReferences))
}
}
return fns
}
// makeTestQueryFunction constructs a function for RunParallel that invokes the given query function and
// checks the returned locations against the given expected locations.
func makeTestQueryFunction(name string, location Location, expectedLocations []Location, f QueryFunc) util.ParallelFn {
var numFinished int32
fn := func(ctx context.Context) error {
locations, err := f(ctx, location)
if err != nil {
return err
}
if checkQueryResult {
sortLocations(locations)
if diff := cmp.Diff(expectedLocations, locations); diff != "" {
return errors.Errorf("unexpected locations (-want +got):\n%s", diff)
}
}
atomic.AddInt32(&numFinished, 1)
return nil
}
description := fmt.Sprintf(
"Checking %s of %s@%s %s %d:%d",
name,
strings.TrimPrefix(location.Repo, "github.com/sourcegraph-testing/"),
location.Rev[:6],
location.Path,
location.Line,
location.Character,
)
return util.ParallelFn{
Fn: fn,
Description: func() string { return description },
Total: func() int { return 1 },
Finished: func() int { return int(atomic.LoadInt32(&numFinished)) },
}
}
// QueryFunc performs a GraphQL query (definition or references) given the source location.
type QueryFunc func(context.Context, Location) ([]Location, error)
// queryDefinitions returns all of the LSIF definitions for the given location.
func queryDefinitions(ctx context.Context, location Location) (locations []Location, err error) {
var query = `
query Definitions($repository: String!, $commit: String!, $path: String!, $line: Int!, $character: Int!) {
repository(name: $repository) {
commit(rev: $commit) {
blob(path: $path) {
lsif {
definitions(line: $line, character: $character) {
nodes {
resource {
path
repository {
name
}
commit {
oid
}
}
range {
start {
line
character
}
end {
line
character
}
}
}
pageInfo {
endCursor
}
}
}
}
}
}
}
`
variables := map[string]interface{}{
"repository": location.Repo,
"commit": location.Rev,
"path": location.Path,
"line": location.Line,
"character": location.Character,
}
payload := QueryResponse{}
if err := util.QueryGraphQL(ctx, endpoint, "CodeIntelTesterDefinitions", token, query, variables, &payload); err != nil {
return nil, err
}
lsifPayload := payload.Data.Repository.Commit.Blob.LSIF
for _, node := range lsifPayload.Definitions.Nodes {
locations = append(locations, Location{
Repo: node.Resource.Repository.Name,
Rev: node.Resource.Commit.Oid,
Path: node.Resource.Path,
Line: node.Range.Start.Line,
Character: node.Range.Start.Character,
})
}
return locations, nil
}
// queryReferences returns all of the LSIF references for the given location.
func queryReferences(ctx context.Context, location Location) (locations []Location, err error) {
var query = `
query References($repository: String!, $commit: String!, $path: String!, $line: Int!, $character: Int!, $after: String) {
repository(name: $repository) {
commit(rev: $commit) {
blob(path: $path) {
lsif {
references(line: $line, character: $character, after: $after) {
nodes {
resource {
path
repository {
name
}
commit {
oid
}
}
range {
start {
line
character
}
end {
line
character
}
}
}
pageInfo {
endCursor
}
}
}
}
}
}
}
`
endCursor := ""
for {
variables := map[string]interface{}{
"repository": location.Repo,
"commit": location.Rev,
"path": location.Path,
"line": location.Line,
"character": location.Character,
}
if endCursor != "" {
variables["after"] = endCursor
}
payload := QueryResponse{}
if err := util.QueryGraphQL(ctx, endpoint, "CodeIntelTesterReferences", token, query, variables, &payload); err != nil {
return nil, err
}
lsifPayload := payload.Data.Repository.Commit.Blob.LSIF
for _, node := range lsifPayload.References.Nodes {
locations = append(locations, Location{
Repo: node.Resource.Repository.Name,
Rev: node.Resource.Commit.Oid,
Path: node.Resource.Path,
Line: node.Range.Start.Line,
Character: node.Range.Start.Character,
})
}
if endCursor = lsifPayload.References.PageInfo.EndCursor; endCursor == "" {
break
}
}
return locations, nil
}
// sortLocations sorts a slice of Locations by repo, rev, path, line, then character.
func sortLocations(locations []Location) {
sort.Slice(locations, func(i, j int) bool { return compareLocations(locations[i], locations[j]) < 0 })
}
// Compare returns an integer comparing two locations. The result will be 0 if a == b,
// -1 if a < b, and +1 if a > b.
func compareLocations(a, b Location) int {
fieldComparison := []int{
strings.Compare(a.Repo, b.Repo),
strings.Compare(a.Rev, b.Rev),
strings.Compare(a.Path, b.Path),
a.Line - b.Line,
a.Character - b.Character,
}
for _, cmp := range fieldComparison {
if cmp != 0 {
return cmp
}
}
return 0
}

View File

@ -1,453 +0,0 @@
package main
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/cockroachdb/errors"
"github.com/sourcegraph/sourcegraph/internal/cmd/precise-code-intel-tester/util"
)
// uploadCommand runs the "upload" command.
func uploadCommand() error {
ctx, cleanup := util.SignalSensitiveContext()
defer cleanup()
start := time.Now()
if err := uploadIndexes(ctx); err != nil {
return err
}
fmt.Printf("All uploads completed processing in %s\n", time.Since(start))
return nil
}
// Upload represents a fully uploaded (but possibly unprocessed) LSIF index.
type Upload struct {
Name string
Index int
Rev string
UploadID string
}
// uploadIndexes uploads each file in the index directory and blocks until each upload has
// been successfully processed.
func uploadIndexes(ctx context.Context) error {
revsByRepo, err := readRevsByRepo()
if err != nil {
return err
}
total := countRevs(revsByRepo)
uploaded := make(chan Upload, total)
processedSignals := makeProcessedSignals(revsByRepo)
refreshedSignals := makeRefreshedSignals(revsByRepo)
// Watch API for changes in state, and inform workers when their upload has been processed
go watchStateChanges(ctx, uploaded, processedSignals, refreshedSignals)
limiter := util.NewLimiter(numConcurrentUploads)
defer limiter.Close()
var fns []util.ParallelFn
for name, revs := range revsByRepo {
fns = append(fns, makeTestUploadForRepositoryFunction(name, revs, uploaded, processedSignals, refreshedSignals, limiter))
}
return util.RunParallel(ctx, total, fns)
}
// indexFilenamePattern extracts a repo name and rev from the index filename. We assume that the
// index segment here (the non-captured `.\d+.`) occupies [0,n) without gaps for each repository.
var indexFilenamePattern = regexp.MustCompile(`^(.+)\.\d+\.([0-9A-Fa-f]{40})\.dump$`)
// readRevsByRepo returns a list of revisions by repository names for which there is an index file.
func readRevsByRepo() (map[string][]string, error) {
infos, err := os.ReadDir(indexDir)
if err != nil {
return nil, err
}
revsByRepo := map[string][]string{}
for _, info := range infos {
matches := indexFilenamePattern.FindStringSubmatch(info.Name())
if len(matches) > 0 {
revsByRepo[matches[1]] = append(revsByRepo[matches[1]], matches[2])
}
}
return revsByRepo, nil
}
// countRevs returns the total number of revision.
func countRevs(revsByRepo map[string][]string) int {
total := 0
for _, revs := range revsByRepo {
total += len(revs)
}
return total
}
// makeProcessedSignals returns a map of error channels for each revision.
func makeProcessedSignals(revsByRepo map[string][]string) map[string]map[string]chan error {
processedSignals := map[string]map[string]chan error{}
for repo, revs := range revsByRepo {
revMap := make(map[string]chan error, len(revs))
for _, rev := range revs {
revMap[rev] = make(chan error, 1)
}
processedSignals[repo] = revMap
}
return processedSignals
}
type refreshState struct {
Stale bool
Err error
}
// refreshedSignals returns a map of error channels for each repository.
func makeRefreshedSignals(revsByRepo map[string][]string) map[string]chan refreshState {
refreshedSignals := map[string]chan refreshState{}
for repo, revs := range revsByRepo {
// Each channel may receive two values for each revision: a value when
// a new upload has been processed and the repository becomes stale by
// definition, and a value when the repository's commit graph has been
// refreshed (or an error occurs).
refreshedSignals[repo] = make(chan refreshState, len(revs)*2)
}
return refreshedSignals
}
// watchStateChanges maintains a list of uploaded but nonterminal upload records. This function
// polls the API and signals the worker when their upload has been processed. If an upload fails
// to process, the error will be sent to the worker.
func watchStateChanges(
ctx context.Context,
uploaded chan Upload,
processedSignals map[string]map[string]chan error,
refreshedSignals map[string]chan refreshState,
) {
send := func(err error) {
// Send err to everybody and exit
for name, revs := range processedSignals {
for rev, ch := range revs {
if err != nil {
ch <- err
}
close(ch)
delete(processedSignals[name], rev)
}
}
for name, ch := range refreshedSignals {
if err != nil {
ch <- refreshState{Err: err}
}
close(ch)
delete(refreshedSignals, name)
}
}
var uploads []Upload
repositoryMap := map[string]struct{}{}
for {
select {
case upload := <-uploaded:
// Upload complete, add to process watch list
uploads = append(uploads, upload)
case <-time.After(time.Millisecond * 500):
// Check states
case <-ctx.Done():
send(nil)
return
}
var ids []string
for _, upload := range uploads {
ids = append(ids, upload.UploadID)
}
sort.Strings(ids)
var names []string
for name := range repositoryMap {
names = append(names, name)
}
sort.Strings(names)
stateByUpload, staleCommitGraphByRepo, err := uploadStates(ctx, ids, names)
if err != nil {
send(err)
return
}
for name, stale := range staleCommitGraphByRepo {
if !stale {
// Repository is now up to date! Stop listening for updates.
// If another upload is processed for this repository, we will
// perform the same set of actions all over again; see below
// when when the upload state is COMPLETED.
refreshedSignals[name] <- refreshState{Stale: false}
delete(repositoryMap, name)
}
}
temp := uploads
uploads = uploads[:0]
for _, upload := range temp {
var err error
switch stateByUpload[upload.UploadID] {
case "ERRORED":
err = ErrProcessingFailed
fallthrough
case "COMPLETED":
// Add repository to list of repositories with a stale
// commit graph and watch until it becomes fresh again.
repositoryMap[upload.Name] = struct{}{}
refreshedSignals[upload.Name] <- refreshState{Stale: true}
// Signal to listeners that this rev has been processed
ch := processedSignals[upload.Name][upload.Rev]
delete(processedSignals[upload.Name], upload.Rev)
ch <- err
close(ch)
default:
uploads = append(uploads, upload)
}
}
}
}
// ErrProcessingFailed occurs when an upload enters the ERRORED state.
var ErrProcessingFailed = errors.New("processing failed")
const uploadQueryFragment = `
u%d: node(id: "%s") {
... on LSIFUpload { state }
}
`
const repositoryQueryFragment = `
r%d: repository(name: "%s") {
codeIntelligenceCommitGraph {
stale
}
}
`
// uploadStates returns a map from upload identifier to its current state.
func uploadStates(ctx context.Context, ids, names []string) (stateByUpload map[string]string, staleCommitGraphByRepo map[string]bool, _ error) {
var fragments []string
for i, id := range ids {
fragments = append(fragments, fmt.Sprintf(uploadQueryFragment, i, id))
}
for i, name := range names {
fullName := fmt.Sprintf("github.com/%s/%s", "sourcegraph-testing", name)
fragments = append(fragments, fmt.Sprintf(repositoryQueryFragment, i, fullName))
}
query := fmt.Sprintf("{%s}", strings.Join(fragments, "\n"))
payload := struct {
Data map[string]struct {
State string `json:"state"`
CommitGraph struct {
Stale bool `json:"stale"`
} `json:"codeIntelligenceCommitGraph"`
} `json:"data"`
}{}
if err := util.QueryGraphQL(ctx, endpoint, "CodeIntelTesterUploadStates", token, query, nil, &payload); err != nil {
return nil, nil, err
}
stateByUpload = map[string]string{}
for i, id := range ids {
stateByUpload[id] = payload.Data[fmt.Sprintf("u%d", i)].State
}
staleCommitGraphByRepo = map[string]bool{}
for i, name := range names {
staleCommitGraphByRepo[name] = payload.Data[fmt.Sprintf("r%d", i)].CommitGraph.Stale
}
return stateByUpload, staleCommitGraphByRepo, nil
}
// makeTestUploadForRepositoryFunction constructs a function for RunParallel that uploads the index files
// for the given repo, then blocks until the upload records enter a terminal state. If any upload fails to
// process, an error is returned.
func makeTestUploadForRepositoryFunction(
name string,
revs []string,
uploaded chan Upload,
processedSignals map[string]map[string]chan error,
refreshedSignals map[string]chan refreshState,
limiter *util.Limiter,
) util.ParallelFn {
var numUploaded uint32
var numProcessed uint32
return util.ParallelFn{
Fn: func(ctx context.Context) error {
var wg sync.WaitGroup
ch := make(chan error, len(revs))
for i, rev := range revs {
id, err := upload(ctx, name, i, rev, limiter)
if err != nil {
return err
}
atomic.AddUint32(&numUploaded, 1)
wg.Add(1)
go func() {
defer wg.Done()
ch <- <-processedSignals[name][rev]
}()
select {
// send id to monitor
case uploaded <- Upload{Name: name, Index: i, Rev: rev, UploadID: id}:
case <-ctx.Done():
return ctx.Err()
}
}
go func() {
wg.Wait()
close(ch)
}()
// wait for all uploads to process
processLoop:
for {
select {
case err, ok := <-ch:
if err != nil {
return err
}
if !ok {
break processLoop
}
atomic.AddUint32(&numProcessed, 1)
case <-ctx.Done():
return ctx.Err()
}
}
// consume all values from the refreshedSignals channel that have already
// been written. If the last one written is nil, then there will be no more
// updates to the commit graph. If the last one written indicates that the
// commit graph is stale, we'll continue to wait on the channel for an
// additional nil value indicating the refresh.
var lastValue refreshState
refreshLoop:
for {
select {
case err, ok := <-refreshedSignals[name]:
if !ok {
return nil
}
lastValue = err
default:
// no more values already in the channel, jump down
break refreshLoop
}
}
for {
if !lastValue.Stale {
return lastValue.Err
}
select {
case err, ok := <-refreshedSignals[name]:
if !ok {
return nil
}
lastValue = err
case <-ctx.Done():
return ctx.Err()
}
}
},
Description: func() string {
if n := atomic.LoadUint32(&numUploaded); n < uint32(len(revs)) {
return fmt.Sprintf("Uploading index %d of %d for %s...", n+1, len(revs), name)
}
if n := atomic.LoadUint32(&numProcessed); n < uint32(len(revs)) {
return fmt.Sprintf("Waiting for %d remaining uploads to process for %s...", len(revs)-int(n), name)
}
return fmt.Sprintf("Waiting for commit graph to update for %s...", name)
},
Total: func() int { return len(revs) },
Finished: func() int { return int(atomic.LoadUint32(&numProcessed)) },
}
}
// uploadIDPattern extracts a GraphQL identifier from the output of the `src lsif upload` command.
var uploadIDPattern = regexp.MustCompile(`/settings/code-intelligence/lsif-uploads/([0-9A-Za-z=]+)`)
// upload invokes the `src lsif upload` command. This requires that src is installed on the
// current user's $PATH and is relatively up to date.
func upload(ctx context.Context, name string, index int, rev string, limiter *util.Limiter) (string, error) {
if err := limiter.Acquire(ctx); err != nil {
return "", err
}
defer limiter.Release()
args := []string{
fmt.Sprintf("-endpoint=%s", endpoint),
"lsif",
"upload",
"-root=/",
fmt.Sprintf("-repo=%s", fmt.Sprintf("github.com/%s/%s", "sourcegraph-testing", name)),
fmt.Sprintf("-commit=%s", rev),
fmt.Sprintf("-file=%s", filepath.Join(fmt.Sprintf("%s.%d.%s.dump", name, index, rev))),
}
cmd := exec.CommandContext(ctx, "src", args...)
cmd.Dir = indexDir
output, err := cmd.CombinedOutput()
if err != nil {
return "", errors.Wrap(err, fmt.Sprintf("error running 'src %s':\n%s\n", strings.Join(args, " "), output))
}
match := uploadIDPattern.FindSubmatch(output)
if len(match) == 0 {
return "", errors.Errorf("failed to extract URL:\n%s", output)
}
return string(match[1]), nil
}

View File

@ -1,37 +0,0 @@
package util
import (
"context"
"os"
"os/signal"
"syscall"
)
// SignalSensitiveContext returns a background context that is canceled after receiving an
// interrupt or terminate signal. A second signal will abort the program. This function returns
// the context and a function that should be deferred by the caller to clean up internal channels.
func SignalSensitiveContext() (context.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func() {
i := 0
for range signals {
cancel()
if i > 0 {
os.Exit(1)
}
i++
}
}()
return ctx, func() {
cancel()
signal.Reset(syscall.SIGINT, syscall.SIGTERM)
close(signals)
}
}

View File

@ -1,75 +0,0 @@
package util
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/cockroachdb/errors"
"github.com/hashicorp/go-multierror"
)
type ErrorPayload struct {
Errors []GraphQLError `json:"errors"`
}
type GraphQLError struct {
Message string `json:"message"`
}
// QueryGraphQL performs GraphQL query on the frontend.
//
// The queryName is the name of the GraphQL query, which uniquely identifies the source of the
// GraphQL query and helps e.g. a site admin know where such a query may be coming from. Importantly,
// unnamed queries (empty string) are considered to be unknown end-user API requests and as such will
// have the entire GraphQL request logged by the frontend, and cannot be uniquely identified in monitoring.
func QueryGraphQL(ctx context.Context, endpoint, queryName string, token, query string, variables map[string]interface{}, target interface{}) error {
body, err := json.Marshal(map[string]interface{}{
"query": query,
"variables": variables,
})
if err != nil {
return err
}
if queryName != "" {
queryName = "?" + queryName
}
req, err := http.NewRequest("POST", fmt.Sprintf("%s/.api/graphql%s", endpoint, queryName), bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Authorization", fmt.Sprintf("token %s", token))
// Note: We do not use req.Context(ctx) here as it causes the frontend
// to output long error logs, which is very noisy under high concurrency.
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.Errorf("unexpected status code: %d", resp.StatusCode)
}
contents, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
var errorPayload ErrorPayload
if err := json.Unmarshal(contents, &errorPayload); err == nil && len(errorPayload.Errors) > 0 {
var combined error
for _, err := range errorPayload.Errors {
combined = multierror.Append(combined, errors.Errorf("%s", err.Message))
}
return combined
}
return json.Unmarshal(contents, &target)
}

View File

@ -1,189 +0,0 @@
package util
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
"github.com/efritz/pentimento"
)
// MaxDisplayLines is the number of lines that will be displayed before truncation.
const MaxDisplayLines = 50
// MaxDisplayWidth is the number of columns that can be used to draw a progress bar.
const MaxDisplayWidth = 80
// ParallelFn groups an error-returning function with a description that can be displayed
// by runParallel.
type ParallelFn struct {
Fn func(ctx context.Context) error
Description func() string
Total func() int
Finished func() int
}
// braille is an animated spinner based off of the characters used by yarn.
var braille = pentimento.NewAnimatedString([]string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"}, pentimento.DefaultInterval)
// RunParallel runs each function in parallel. Returns the first error to occur. The
// number of invocations is limited by concurrency.
func RunParallel(ctx context.Context, concurrency int, fns []ParallelFn) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := make(chan int, len(fns)) // worker input
errs := make(chan errPair, len(fns)) // worker output
var wg sync.WaitGroup // denotes active writers of errs channel
pendingMap := newPendingMap(len(fns)) // state tracker
// queue all work up front
for i := range fns {
queue <- i
}
close(queue)
// launch workers
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
runFunctions(ctx, fns, pendingMap, queue, errs)
}()
}
// block until completion or error
err := monitor(ctx, fns, pendingMap, errs, concurrency)
if err != nil {
cancel() // stop workers
wg.Wait() // wait for workers to drain
close(errs) // close output channel
}
return err
}
// errPair bundles an error value with the function index from which it was produced.
type errPair struct {
i int
err error
}
// runFunctions is the worker body. It will pull an index off of the work queue,
// mark that index as pending, then send the index and the value resulting from
// the invocation of the function at that index onto the errors channel.
func runFunctions(ctx context.Context, fns []ParallelFn, pendingMap *pendingMap, queue <-chan int, errs chan<- errPair) {
for {
select {
case i, ok := <-queue:
if !ok {
return
}
pendingMap.set(i)
errs <- errPair{i, fns[i].Fn(ctx)}
case <-ctx.Done():
return
}
}
}
// monitor waits for all functions to complete, an error, or the context to be
// canceled. The first error encountered is returned. The current state of the
// pending map is periodically written to the screen. All content written to the
// screen is removed at exit of this function.
func monitor(ctx context.Context, fns []ParallelFn, pendingMap *pendingMap, errs <-chan errPair, concurrency int) error {
return pentimento.PrintProgress(func(p *pentimento.Printer) error {
defer func() {
// Clear last progress update on exit
_ = p.Reset()
}()
for pendingMap.size() != 0 {
select {
case pair := <-errs:
if pair.err != nil && pair.err != context.Canceled {
return pair.err
}
// Nil-valued error, remove it from the pending map
pendingMap.remove(pair.i)
case <-time.After(time.Millisecond * 250):
// Update screen
case <-ctx.Done():
return ctx.Err()
}
_ = p.WriteContent(formatUpdate(fns, pendingMap, concurrency))
}
return nil
})
}
// formatUpdate constructs a content object with a number of lines indicating the in progress
// and head-of-queue tasks, as well as a progress bar.
func formatUpdate(fns []ParallelFn, pendingMap *pendingMap, concurrency int) *pentimento.Content {
keys := pendingMap.keys()
content := pentimento.NewContent()
for _, i := range keys[:numLines(concurrency, len(keys))] {
if pendingMap.get(i) {
content.AddLine(fmt.Sprintf("%s %s", braille, fns[i].Description()))
} else {
content.AddLine(fmt.Sprintf("%s %s", " ", fns[i].Description()))
}
}
total := 0
finished := 0
for _, fn := range fns {
total += fn.Total()
finished += fn.Finished()
}
content.AddLine("")
content.AddLine(formatProgressBar(total, finished))
return content
}
// numLines determines how many lines to display in formatUpdate.
func numLines(concurrency, numTasks int) int {
return int(math.Min(float64(concurrency*2), math.Min(float64(numTasks), float64(MaxDisplayLines))))
}
// formatProgressBar constructs a progress bar string based on the relationship between the
// total and finished parameters.
func formatProgressBar(total, finished int) string {
maxWidth := MaxDisplayWidth - 4 - digits(total) - digits(finished)
width := int(float64(maxWidth) * float64(finished) / float64(total))
var arrow string
if width < maxWidth {
arrow = ">"
}
return fmt.Sprintf(
"[%s%s%s] %d/%d",
strings.Repeat("=", width),
arrow,
strings.Repeat(" ", maxWidth-width-len(arrow)),
finished,
total,
)
}
// digits returns the number of digits of n.
func digits(n int) int {
if n >= 10 {
return 1 + digits(n/10)
}
return 1
}

View File

@ -1,56 +0,0 @@
package util
import (
"sort"
"sync"
)
type pendingMap struct {
sync.RWMutex
pending map[int]bool
}
// newPendingMap creates a new pending map with n pending tasks.
func newPendingMap(n int) *pendingMap {
pending := make(map[int]bool, n)
for i := 0; i < n; i++ {
pending[i] = false
}
return &pendingMap{pending: pending}
}
func (m *pendingMap) remove(i int) {
m.Lock()
defer m.Unlock()
delete(m.pending, i)
}
func (m *pendingMap) keys() (keys []int) {
m.RLock()
defer m.RUnlock()
for k := range m.pending {
keys = append(keys, k)
}
sort.Ints(keys)
return keys
}
func (m *pendingMap) set(i int) {
m.Lock()
defer m.Unlock()
m.pending[i] = true
}
func (m *pendingMap) get(i int) bool {
m.RLock()
defer m.RUnlock()
return m.pending[i]
}
func (m *pendingMap) size() int {
m.RLock()
defer m.RUnlock()
return len(m.pending)
}

View File

@ -60,7 +60,7 @@ func SignIn(baseURL, email, password string) (*Client, error) {
// authenticate initializes an authenticated client with given request body.
func authenticate(baseURL, path string, body interface{}) (*Client, error) {
client, err := newClient(baseURL)
client, err := NewClient(baseURL)
if err != nil {
return nil, errors.Wrap(err, "new client")
}
@ -101,9 +101,9 @@ type Client struct {
userID string
}
// newClient instantiates a new client by performing a GET request then obtains the
// NewClient instantiates a new client by performing a GET request then obtains the
// CSRF token and cookie from its response.
func newClient(baseURL string) (*Client, error) {
func NewClient(baseURL string) (*Client, error) {
resp, err := http.Get(baseURL)
if err != nil {
return nil, errors.Wrap(err, "get URL")