codeintel: Upload QOL improvements (#29538)

This commit is contained in:
Eric Fritz 2022-01-11 08:01:14 -06:00 committed by GitHub
parent ded569ea18
commit 57cc8bf307
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 123 additions and 52 deletions

View File

@ -26,7 +26,7 @@ var DefaultValidatorByCodeHost = AuthValidatorMap{
"github.com": enforceAuthViaGitHub,
}
var errVerificaitonNotSupported = errors.New("verification not supported for code host - see https://github.com/sourcegraph/sourcegraph/issues/4967")
var errVerificationNotSupported = errors.New("verification not supported for code host - see https://github.com/sourcegraph/sourcegraph/issues/4967")
// authMiddleware wraps the given upload handler with an authorization check. On each initial upload
// request, the target repository is checked against the supplied auth validators. The matching validator
@ -60,7 +60,7 @@ func authMiddleware(next http.Handler, db database.DB, authValidators AuthValida
return validator(ctx, query, repositoryName)
}
return http.StatusUnprocessableEntity, errVerificaitonNotSupported
return http.StatusUnprocessableEntity, errVerificationNotSupported
}()
if err != nil {
if statusCode >= 500 {

View File

@ -31,8 +31,17 @@ func newProgressCallbackReader(r io.Reader, readerLen int64, progress output.Pro
progressCallback := func(totalRead int64) {
if debounceInterval <= time.Since(lastUpdated) {
// Calculate progress through the reader; do not ever complete
// as we wait for the HTTP request finish the remaining small
// percentage.
p := float64(totalRead) / float64(readerLen)
if p >= 1 {
p = 1 - 10e-3
}
lastUpdated = time.Now()
progress.SetValue(barIndex, float64(totalRead)/float64(readerLen))
progress.SetValue(barIndex, p)
}
}

View File

@ -1,6 +1,8 @@
package upload
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
@ -32,13 +34,13 @@ var ErrUnauthorized = errors.New("unauthorized upload")
// If target is a non-nil pointer, it will be assigned the value of the upload identifier present
// in the response body. This function returns an error as well as a boolean flag indicating if the
// function can be retried.
func performUploadRequest(httpClient Client, opts uploadRequestOptions) (bool, error) {
func performUploadRequest(ctx context.Context, httpClient Client, opts uploadRequestOptions) (bool, error) {
req, err := makeUploadRequest(opts)
if err != nil {
return false, err
}
resp, body, err := performRequest(req, httpClient, opts.OutputOptions.Logger)
resp, body, err := performRequest(ctx, req, httpClient, opts.OutputOptions.Logger)
if err != nil {
return false, err
}
@ -73,13 +75,13 @@ func makeUploadRequest(opts uploadRequestOptions) (*http.Request, error) {
// performRequest performs an HTTP request and returns the HTTP response as well as the entire
// body as a byte slice. If a logger is supplied, the request, response, and response body will
// be logged.
func performRequest(req *http.Request, httpClient Client, logger RequestLogger) (*http.Response, []byte, error) {
func performRequest(ctx context.Context, req *http.Request, httpClient Client, logger RequestLogger) (*http.Response, []byte, error) {
started := time.Now()
if logger != nil {
logger.LogRequest(req)
}
resp, err := httpClient.Do(req)
resp, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, nil, err
}
@ -105,8 +107,13 @@ func decodeUploadPayload(resp *http.Response, body []byte, target *int) (bool, e
return false, ErrUnauthorized
}
suffix := ""
if !bytes.HasPrefix(bytes.TrimSpace(body), []byte{'<'}) {
suffix = fmt.Sprintf(" (%s)", bytes.TrimSpace(body))
}
// Do not retry client errors
return resp.StatusCode >= 500, errors.Errorf("unexpected status code: %d", resp.StatusCode)
return resp.StatusCode >= 500, errors.Errorf("unexpected status code: %d%s", resp.StatusCode, suffix)
}
if target == nil {

View File

@ -2,9 +2,9 @@ package upload
import "time"
// RetryableFunc is a function that returns an error as well as a boolean-value flag indicating
// whether or not the error is retryable.
type RetryableFunc = func() (bool, error)
// RetryableFunc is a function that takes the invocation index and returns an error as well as a
// boolean-value flag indicating whether or not the error is considered retryable.
type RetryableFunc = func(attempt int) (bool, error)
// makeRetry returns a function that calls retry with the given max attempt and interval values.
func makeRetry(n int, interval time.Duration) func(f RetryableFunc) error {
@ -18,8 +18,8 @@ func makeRetry(n int, interval time.Duration) func(f RetryableFunc) error {
// retries have been attempted. The returned error will be the last error to occur.
func retry(f RetryableFunc, n int, interval time.Duration) (err error) {
var retry bool
for i := n; i >= 0; i-- {
if retry, err = f(); err == nil || !retry {
for i := 0; i <= n; i++ {
if retry, err = f(i); err == nil || !retry {
break
}

View File

@ -1,11 +1,14 @@
package upload
import (
"context"
"fmt"
"io"
"os"
"sync"
"github.com/cockroachdb/errors"
"github.com/sourcegraph/sourcegraph/lib/output"
)
@ -13,7 +16,7 @@ import (
// instance. If the upload file is large, it may be split into multiple segments and
// uploaded over multiple requests. The identifier of the upload is returned after a
// successful upload.
func UploadIndex(filename string, httpClient Client, opts UploadOptions) (int, error) {
func UploadIndex(ctx context.Context, filename string, httpClient Client, opts UploadOptions) (int, error) {
originalReader, originalSize, err := openFileAndGetSize(filename)
if err != nil {
return 0, err
@ -23,7 +26,7 @@ func UploadIndex(filename string, httpClient Client, opts UploadOptions) (int, e
}()
bars := []output.ProgressBar{{Label: "Compressing", Max: 1.0}}
progress, cleanup := logProgress(
progress, _, cleanup := logProgress(
opts.Output,
bars,
"Index compressed",
@ -61,18 +64,18 @@ func UploadIndex(filename string, httpClient Client, opts UploadOptions) (int, e
}
if compressedSize <= opts.MaxPayloadSizeBytes {
return uploadIndex(httpClient, opts, compressedReader, compressedSize)
return uploadIndex(ctx, httpClient, opts, compressedReader, compressedSize)
}
return uploadMultipartIndex(httpClient, opts, compressedReader, compressedSize)
return uploadMultipartIndex(ctx, httpClient, opts, compressedReader, compressedSize)
}
// uploadIndex uploads the index file described by the given options to a Sourcegraph
// instance via a single HTTP POST request. The identifier of the upload is returned
// after a successful upload.
func uploadIndex(httpClient Client, opts UploadOptions, r io.ReaderAt, readerLen int64) (id int, err error) {
func uploadIndex(ctx context.Context, httpClient Client, opts UploadOptions, r io.ReaderAt, readerLen int64) (id int, err error) {
bars := []output.ProgressBar{{Label: "Upload", Max: 1.0}}
progress, complete := logProgress(
progress, retry, complete := logProgress(
opts.Output,
bars,
"Index uploaded",
@ -87,7 +90,7 @@ func uploadIndex(httpClient Client, opts UploadOptions, r io.ReaderAt, readerLen
UploadOptions: opts,
Target: &id,
}
err = uploadIndexFile(httpClient, opts, readerFactory, readerLen, requestOptions, progress, 0)
err = uploadIndexFile(ctx, httpClient, opts, readerFactory, readerLen, requestOptions, progress, retry, 0, 1)
if progress != nil {
// Mark complete in case we debounced our last updates
@ -98,9 +101,25 @@ func uploadIndex(httpClient Client, opts UploadOptions, r io.ReaderAt, readerLen
}
// uploadIndexFile uploads the contents available via the given reader factory to a
// Sourcegraph instance with the given request options.
func uploadIndexFile(httpClient Client, uploadOptions UploadOptions, readerFactory func() io.Reader, readerLen int64, requestOptions uploadRequestOptions, progress output.Progress, barIndex int) error {
return makeRetry(uploadOptions.MaxRetries, uploadOptions.RetryInterval)(func() (_ bool, err error) {
// Sourcegraph instance with the given request options.i
func uploadIndexFile(ctx context.Context, httpClient Client, uploadOptions UploadOptions, readerFactory func() io.Reader, readerLen int64, requestOptions uploadRequestOptions, progress output.Progress, retry func(message string) output.Progress, barIndex int, numParts int) error {
return makeRetry(uploadOptions.MaxRetries, uploadOptions.RetryInterval)(func(attempt int) (_ bool, err error) {
defer func() {
if err != nil && !errors.Is(err, ctx.Err()) {
progress.SetValue(barIndex, 0)
}
}()
if attempt != 0 {
suffix := ""
if numParts != 1 {
suffix = fmt.Sprintf(" %d of %d", barIndex+1, numParts)
}
progress.SetValue(barIndex, 0)
progress = retry(fmt.Sprintf("Failed to upload index file%s (will retry; attempt #%d)", suffix, attempt))
}
// Create fresh reader on each attempt
reader := readerFactory()
@ -108,32 +127,32 @@ func uploadIndexFile(httpClient Client, uploadOptions UploadOptions, readerFacto
requestOptions.Payload = newProgressCallbackReader(reader, readerLen, progress, barIndex)
// Perform upload
return performUploadRequest(httpClient, requestOptions)
return performUploadRequest(ctx, httpClient, requestOptions)
})
}
// uploadMultipartIndex uploads the index file described by the given options to a
// Sourcegraph instance over multiple HTTP POST requests. The identifier of the upload
// is returned after a successful upload.
func uploadMultipartIndex(httpClient Client, opts UploadOptions, r io.ReaderAt, readerLen int64) (_ int, err error) {
func uploadMultipartIndex(ctx context.Context, httpClient Client, opts UploadOptions, r io.ReaderAt, readerLen int64) (_ int, err error) {
// Create a slice of functions that can re-create our reader for an
// upload part for retries. This allows us to both read concurrently
// from the same reader, but also retry reads from arbitrary offsets.
readerFactories := splitReader(r, readerLen, opts.MaxPayloadSizeBytes)
// Perform initial request that gives us our upload identifier
id, err := uploadMultipartIndexInit(httpClient, opts, len(readerFactories))
id, err := uploadMultipartIndexInit(ctx, httpClient, opts, len(readerFactories))
if err != nil {
return 0, err
}
// Upload each payload of the multipart index
if err := uploadMultipartIndexParts(httpClient, opts, readerFactories, id, readerLen); err != nil {
if err := uploadMultipartIndexParts(ctx, httpClient, opts, readerFactories, id, readerLen); err != nil {
return 0, err
}
// Finalize the upload and mark it as ready for processing
if err := uploadMultipartIndexFinalize(httpClient, opts, id); err != nil {
if err := uploadMultipartIndexFinalize(ctx, httpClient, opts, id); err != nil {
return 0, err
}
@ -144,8 +163,8 @@ func uploadMultipartIndex(httpClient Client, opts UploadOptions, r io.ReaderAt,
// parts via additional HTTP requests. This upload will be in a pending state until all upload
// parts are received and the multipart upload is finalized, or until the record is deleted by
// a background process after an expiry period.
func uploadMultipartIndexInit(httpClient Client, opts UploadOptions, numParts int) (id int, err error) {
complete := logPending(
func uploadMultipartIndexInit(ctx context.Context, httpClient Client, opts UploadOptions, numParts int) (id int, err error) {
retry, complete := logPending(
opts.Output,
"Preparing multipart upload",
"Prepared multipart upload",
@ -153,8 +172,12 @@ func uploadMultipartIndexInit(httpClient Client, opts UploadOptions, numParts in
)
defer func() { complete(err) }()
err = makeRetry(opts.MaxRetries, opts.RetryInterval)(func() (bool, error) {
return performUploadRequest(httpClient, uploadRequestOptions{
err = makeRetry(opts.MaxRetries, opts.RetryInterval)(func(attempt int) (bool, error) {
if attempt != 0 {
retry(fmt.Sprintf("Failed to prepare multipart upload (will retry; attempt #%d)", attempt))
}
return performUploadRequest(ctx, httpClient, uploadRequestOptions{
UploadOptions: opts,
Target: &id,
MultiPart: true,
@ -168,13 +191,13 @@ func uploadMultipartIndexInit(httpClient Client, opts UploadOptions, numParts in
// uploadMultipartIndexParts uploads the contents available via each of the given reader
// factories to a Sourcegraph instance as part of the same multipart upload as indiciated
// by the given identifier.
func uploadMultipartIndexParts(httpClient Client, opts UploadOptions, readerFactories []func() io.Reader, id int, readerLen int64) (err error) {
func uploadMultipartIndexParts(ctx context.Context, httpClient Client, opts UploadOptions, readerFactories []func() io.Reader, id int, readerLen int64) (err error) {
var bars []output.ProgressBar
for i := range readerFactories {
label := fmt.Sprintf("Upload part %d of %d", i+1, len(readerFactories))
bars = append(bars, output.ProgressBar{Label: label, Max: 1.0})
}
progress, complete := logProgress(
progress, retry, complete := logProgress(
opts.Output,
bars,
"Index parts uploaded",
@ -185,6 +208,9 @@ func uploadMultipartIndexParts(httpClient Client, opts UploadOptions, readerFact
var wg sync.WaitGroup
errs := make(chan error, len(readerFactories))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i, readerFactory := range readerFactories {
wg.Add(1)
@ -204,8 +230,9 @@ func uploadMultipartIndexParts(httpClient Client, opts UploadOptions, readerFact
Index: i,
}
if err := uploadIndexFile(httpClient, opts, readerFactory, partReaderLen, requestOptions, progress, i); err != nil {
if err := uploadIndexFile(ctx, httpClient, opts, readerFactory, partReaderLen, requestOptions, progress, retry, i, len(readerFactories)); err != nil {
errs <- err
cancel()
} else if progress != nil {
// Mark complete in case we debounced our last updates
progress.SetValue(i, 1)
@ -227,8 +254,8 @@ func uploadMultipartIndexParts(httpClient Client, opts UploadOptions, readerFact
// uploadMultipartIndexFinalize performs the request to stitch the uploaded parts together and
// mark it ready as processing in the backend.
func uploadMultipartIndexFinalize(httpClient Client, opts UploadOptions, id int) (err error) {
complete := logPending(
func uploadMultipartIndexFinalize(ctx context.Context, httpClient Client, opts UploadOptions, id int) (err error) {
retry, complete := logPending(
opts.Output,
"Finalizing multipart upload",
"Finalized multipart upload",
@ -236,8 +263,12 @@ func uploadMultipartIndexFinalize(httpClient Client, opts UploadOptions, id int)
)
defer func() { complete(err) }()
return makeRetry(opts.MaxRetries, opts.RetryInterval)(func() (bool, error) {
return performUploadRequest(httpClient, uploadRequestOptions{
return makeRetry(opts.MaxRetries, opts.RetryInterval)(func(attempt int) (bool, error) {
if attempt != 0 {
retry(fmt.Sprintf("Failed to finalize multipart upload (will retry; attempt #%d)", attempt))
}
return performUploadRequest(ctx, httpClient, uploadRequestOptions{
UploadOptions: opts,
UploadID: id,
Done: true,
@ -285,39 +316,60 @@ func openFileAndGetSize(filename string) (*os.File, int64, error) {
return file, fileInfo.Size(), err
}
// logPending creates a pending object from the given output value and returns a complete function
// that should be called once the work attached to this log call has completed. This complete function
// takes an error value that determines whether the success or failure message is displayed. If the
// given output value is nil then a no-op complete function is returned.
func logPending(out *output.Output, pendingMessage, successMessage, failureMessage string) func(error) {
// logPending creates a pending object from the given output value and returns a retry function that
// can be called to print a message then reset the pending display, and a complete function that should
// be called once the work attached to this log call has completed. This complete function takes an error
// value that determines whether the success or failure message is displayed. If the given output value is
// nil then a no-op complete function is returned.
func logPending(out *output.Output, pendingMessage, successMessage, failureMessage string) (func(message string), func(error)) {
if out == nil {
return func(err error) {}
return func(message string) {}, func(err error) {}
}
pending := out.Pending(output.Line("", output.StylePending, pendingMessage))
return func(err error) {
retry := func(message string) {
pending.Destroy()
out.WriteLine(output.Line(output.EmojiFailure, output.StyleReset, message))
pending = out.Pending(output.Line("", output.StylePending, pendingMessage))
}
complete := func(err error) {
if err == nil {
pending.Complete(output.Line(output.EmojiSuccess, output.StyleSuccess, successMessage))
} else {
pending.Complete(output.Line(output.EmojiFailure, output.StyleBold, failureMessage))
}
}
return retry, complete
}
// logProgress creates and returns a progress from the given output value and bars configuration.
// This function also returns a complete function that should be called once the work attached to
// This function also returns a retry function that can be called to print a message then reset the
// progress bar display, and a complete function that should be called once the work attached to
// this log call has completed. This complete function takes an error value that determines whether
// the success or failure message is displayed. If the given output value is nil then a no-op complete
// function is returned.
func logProgress(out *output.Output, bars []output.ProgressBar, successMessage, failureMessage string) (output.Progress, func(error)) {
func logProgress(out *output.Output, bars []output.ProgressBar, successMessage, failureMessage string) (output.Progress, func(message string) output.Progress, func(error)) {
if out == nil {
return nil, func(err error) {}
return nil, func(message string) output.Progress { return nil }, func(err error) {}
}
var mu sync.Mutex
progress := out.Progress(bars, nil)
return progress, func(err error) {
retry := func(message string) output.Progress {
mu.Lock()
defer mu.Unlock()
progress.Destroy()
out.WriteLine(output.Line(output.EmojiFailure, output.StyleReset, message))
progress = out.Progress(bars, nil)
return progress
}
complete := func(err error) {
progress.Destroy()
if err == nil {
@ -326,4 +378,6 @@ func logProgress(out *output.Output, bars []output.ProgressBar, successMessage,
out.WriteLine(output.Line(output.EmojiFailure, output.StyleBold, failureMessage))
}
}
return progress, retry, complete
}

View File

@ -3,6 +3,7 @@ package upload
import (
"bytes"
"compress/gzip"
"context"
"io"
"net/http"
"net/http/httptest"
@ -60,7 +61,7 @@ func TestUploadIndex(t *testing.T) {
_, _ = io.Copy(f, bytes.NewReader(expectedPayload))
_ = f.Close()
id, err := UploadIndex(f.Name(), http.DefaultClient, UploadOptions{
id, err := UploadIndex(context.Background(), f.Name(), http.DefaultClient, UploadOptions{
UploadRecordOptions: UploadRecordOptions{
Repo: "foo/bar",
Commit: "deadbeef",
@ -123,7 +124,7 @@ func TestUploadIndexMultipart(t *testing.T) {
_, _ = io.Copy(f, bytes.NewReader(expectedPayload))
_ = f.Close()
id, err := UploadIndex(f.Name(), http.DefaultClient, UploadOptions{
id, err := UploadIndex(context.Background(), f.Name(), http.DefaultClient, UploadOptions{
UploadRecordOptions: UploadRecordOptions{
Repo: "foo/bar",
Commit: "deadbeef",