mirror of
https://github.com/sourcegraph/sourcegraph.git
synced 2026-02-06 14:51:44 +00:00
Use uncached doer for downloading packages (#48977)
We've encountered that all our external requests are cached in redis, which means (1) that we need to read the entire body into memory so we can serialize it for redis, and (2) that we store potentially gigantic binary files in redis, which we never need to fetch again. This introduces an uncached version of the external doer, adds comments on when to use which, and modifies each of the packages clients to use that doer for the Download operation. ## Test plan The unit tests --------- Co-authored-by: Noah Santschi-Cooney <noah@santschi-cooney.ch>
This commit is contained in:
parent
f2b84d90d6
commit
674217f83d
@ -264,10 +264,7 @@ func newTestClient(t testing.TB, name string, update bool) *pypi.Client {
|
||||
}
|
||||
})
|
||||
|
||||
doer, err := httpcli.NewFactory(nil, httptestutil.NewRecorderOpt(rec)).Doer()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
doer := httpcli.NewFactory(nil, httptestutil.NewRecorderOpt(rec))
|
||||
|
||||
c := pypi.NewClient("urn", []string{"https://pypi.org/simple"}, doer)
|
||||
return c
|
||||
|
||||
@ -57,20 +57,20 @@ func (rubyDependencySource) ParsePackageFromRepoName(repoName api.RepoName) (rep
|
||||
}
|
||||
|
||||
func (s *rubyDependencySource) Download(ctx context.Context, dir string, dep reposource.VersionedPackage) error {
|
||||
pkgContents, packageURL, err := s.client.GetPackageContents(ctx, dep)
|
||||
pkgContents, err := s.client.GetPackageContents(ctx, dep)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error downloading RubyGem with URL '%s'", packageURL)
|
||||
return errors.Wrapf(err, "error downloading RubyGem %q", dep.VersionedPackageSyntax())
|
||||
}
|
||||
defer pkgContents.Close()
|
||||
|
||||
if err = unpackRubyPackage(packageURL, pkgContents, dir); err != nil {
|
||||
return errors.Wrapf(err, "failed to unzip ruby module from URL %s", packageURL)
|
||||
if err = unpackRubyPackage(pkgContents, dir); err != nil {
|
||||
return errors.Wrapf(err, "failed to unzip ruby module %q", dep.VersionedPackageSyntax())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func unpackRubyPackage(packageURL string, pkg io.Reader, workDir string) error {
|
||||
func unpackRubyPackage(pkg io.Reader, workDir string) error {
|
||||
opts := unpack.Opts{
|
||||
SkipInvalid: true,
|
||||
SkipDuplicates: true,
|
||||
@ -86,10 +86,10 @@ func unpackRubyPackage(packageURL string, pkg io.Reader, workDir string) error {
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
if err := unpack.Tar(pkg, tmpDir, opts); err != nil {
|
||||
return errors.Wrapf(err, "failed to tar downloaded bytes from URL %s", packageURL)
|
||||
return errors.Wrap(err, "failed to unpack downloaded tar")
|
||||
}
|
||||
|
||||
err = unpackRubyDataTarGz(packageURL, filepath.Join(tmpDir, "data.tar.gz"), workDir)
|
||||
err = unpackRubyDataTarGz(filepath.Join(tmpDir, "data.tar.gz"), workDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -109,10 +109,10 @@ func unpackRubyPackage(packageURL string, pkg io.Reader, workDir string) error {
|
||||
}
|
||||
|
||||
// unpackRubyDataTarGz unpacks the given `data.tar.gz` from a downloaded RubyGem.
|
||||
func unpackRubyDataTarGz(packageURL, path string, workDir string) error {
|
||||
func unpackRubyDataTarGz(path string, workDir string) error {
|
||||
r, err := os.Open(path)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to read file from downloaded URL %s", packageURL)
|
||||
return errors.Wrapf(err, "failed to read data archive file %q", path)
|
||||
}
|
||||
defer r.Close()
|
||||
opts := unpack.Opts{
|
||||
|
||||
@ -491,7 +491,7 @@ func getVCSSyncer(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cli := npm.NewHTTPClient(urn, c.Registry, c.Credentials, httpcli.ExternalDoer)
|
||||
cli := npm.NewHTTPClient(urn, c.Registry, c.Credentials, httpcli.ExternalClientFactory)
|
||||
return server.NewNpmPackagesSyncer(c, depsSvc, cli), nil
|
||||
case extsvc.TypeGoModules:
|
||||
var c schema.GoModulesConnection
|
||||
@ -499,7 +499,7 @@ func getVCSSyncer(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cli := gomodproxy.NewClient(urn, c.Urls, httpcli.ExternalDoer)
|
||||
cli := gomodproxy.NewClient(urn, c.Urls, httpcli.ExternalClientFactory)
|
||||
return server.NewGoModulesSyncer(&c, depsSvc, cli), nil
|
||||
case extsvc.TypePythonPackages:
|
||||
var c schema.PythonPackagesConnection
|
||||
@ -507,7 +507,7 @@ func getVCSSyncer(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cli := pypi.NewClient(urn, c.Urls, httpcli.ExternalDoer)
|
||||
cli := pypi.NewClient(urn, c.Urls, httpcli.ExternalClientFactory)
|
||||
return server.NewPythonPackagesSyncer(&c, depsSvc, cli, reposDir), nil
|
||||
case extsvc.TypeRustPackages:
|
||||
var c schema.RustPackagesConnection
|
||||
@ -515,7 +515,7 @@ func getVCSSyncer(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cli := crates.NewClient(urn, httpcli.ExternalDoer)
|
||||
cli := crates.NewClient(urn, httpcli.ExternalClientFactory)
|
||||
return server.NewRustPackagesSyncer(&c, depsSvc, cli), nil
|
||||
case extsvc.TypeRubyPackages:
|
||||
var c schema.RubyPackagesConnection
|
||||
@ -523,7 +523,7 @@ func getVCSSyncer(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cli := rubygems.NewClient(urn, c.Repository, httpcli.ExternalDoer)
|
||||
cli := rubygems.NewClient(urn, c.Repository, httpcli.ExternalClientFactory)
|
||||
return server.NewRubyPackagesSyncer(&c, depsSvc, cli), nil
|
||||
}
|
||||
return &server.GitRepoSyncer{}, nil
|
||||
|
||||
@ -11,16 +11,17 @@ import (
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
cli httpcli.Doer
|
||||
uncachedClient httpcli.Doer
|
||||
|
||||
// Self-imposed rate-limiter.
|
||||
limiter *ratelimit.InstrumentedLimiter
|
||||
}
|
||||
|
||||
func NewClient(urn string, cli httpcli.Doer) *Client {
|
||||
func NewClient(urn string, httpfactory *httpcli.Factory) *Client {
|
||||
uncached, _ := httpfactory.Doer(httpcli.NewCachedTransportOpt(httpcli.NoopCache{}, false))
|
||||
return &Client{
|
||||
cli: cli,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
uncachedClient: uncached,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,7 +36,7 @@ func (c *Client) Get(ctx context.Context, url string) (io.ReadCloser, error) {
|
||||
}
|
||||
req.Header.Add("User-Agent", "sourcegraph-crates-syncer (sourcegraph.com)")
|
||||
|
||||
b, err := c.do(req)
|
||||
b, err := c.do(c.uncachedClient, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -56,8 +57,8 @@ func (e *Error) NotFound() bool {
|
||||
return e.code == http.StatusNotFound
|
||||
}
|
||||
|
||||
func (c *Client) do(req *http.Request) (io.ReadCloser, error) {
|
||||
resp, err := c.cli.Do(req)
|
||||
func (c *Client) do(doer httpcli.Doer, req *http.Request) (io.ReadCloser, error) {
|
||||
resp, err := doer.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -9,9 +9,10 @@ import (
|
||||
"net/url"
|
||||
"path"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/reposource"
|
||||
"golang.org/x/mod/module"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/reposource"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/errcode"
|
||||
"github.com/sourcegraph/sourcegraph/internal/httpcli"
|
||||
"github.com/sourcegraph/sourcegraph/internal/ratelimit"
|
||||
@ -20,18 +21,22 @@ import (
|
||||
|
||||
// A Client to Go module proxies.
|
||||
type Client struct {
|
||||
urls []string // list of proxy URLs
|
||||
cli httpcli.Doer
|
||||
limiter *ratelimit.InstrumentedLimiter
|
||||
urls []string // list of proxy URLs
|
||||
uncachedClient httpcli.Doer
|
||||
cachedClient httpcli.Doer
|
||||
limiter *ratelimit.InstrumentedLimiter
|
||||
}
|
||||
|
||||
// NewClient returns a new Client for the given urls. urn represents the
|
||||
// unique urn of the external service this client's config is from.
|
||||
func NewClient(urn string, urls []string, cli httpcli.Doer) *Client {
|
||||
func NewClient(urn string, urls []string, httpfactory *httpcli.Factory) *Client {
|
||||
uncached, _ := httpfactory.Doer(httpcli.NewCachedTransportOpt(httpcli.NoopCache{}, false))
|
||||
cached, _ := httpfactory.Doer()
|
||||
return &Client{
|
||||
urls: urls,
|
||||
cli: cli,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
urls: urls,
|
||||
cachedClient: cached,
|
||||
uncachedClient: uncached,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,13 +53,13 @@ func (c *Client) GetVersion(ctx context.Context, mod reposource.PackageName, ver
|
||||
paths = []string{"@latest"}
|
||||
}
|
||||
|
||||
respBody, err := c.get(ctx, mod, paths...)
|
||||
respBody, err := c.get(ctx, c.cachedClient, mod, paths...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var v struct{ Version string }
|
||||
if err = json.Unmarshal(respBody, &v); err != nil {
|
||||
if err = json.NewDecoder(respBody).Decode(&v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -68,7 +73,13 @@ func (c *Client) GetZip(ctx context.Context, mod reposource.PackageName, version
|
||||
return nil, errors.Wrap(err, "failed to escape version")
|
||||
}
|
||||
|
||||
zipBytes, err := c.get(ctx, mod, "@v", escapedVersion+".zip")
|
||||
zip, err := c.get(ctx, c.uncachedClient, mod, "@v", escapedVersion+".zip")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: remove and return io.Reader
|
||||
zipBytes, err := io.ReadAll(zip)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -76,12 +87,13 @@ func (c *Client) GetZip(ctx context.Context, mod reposource.PackageName, version
|
||||
return zipBytes, nil
|
||||
}
|
||||
|
||||
func (c *Client) get(ctx context.Context, mod reposource.PackageName, paths ...string) (respBody []byte, err error) {
|
||||
func (c *Client) get(ctx context.Context, doer httpcli.Doer, mod reposource.PackageName, paths ...string) (respBody io.ReadCloser, err error) {
|
||||
escapedMod, err := module.EscapePath(string(mod))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to escape module path")
|
||||
}
|
||||
|
||||
// so err isnt shadowed below
|
||||
var (
|
||||
reqURL *url.URL
|
||||
req *http.Request
|
||||
@ -103,24 +115,19 @@ func (c *Client) get(ctx context.Context, mod reposource.PackageName, paths ...s
|
||||
return nil, err
|
||||
}
|
||||
|
||||
respBody, err = c.do(req)
|
||||
respBody, err = c.do(doer, req)
|
||||
if err == nil || !errcode.IsNotFound(err) {
|
||||
break
|
||||
} else if respBody != nil {
|
||||
respBody.Close()
|
||||
}
|
||||
}
|
||||
|
||||
return respBody, err
|
||||
}
|
||||
|
||||
func (c *Client) do(req *http.Request) ([]byte, error) {
|
||||
resp, err := c.cli.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
bs, err := io.ReadAll(resp.Body)
|
||||
func (c *Client) do(doer httpcli.Doer, req *http.Request) (io.ReadCloser, error) {
|
||||
resp, err := doer.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -133,10 +140,15 @@ func (c *Client) do(req *http.Request) ([]byte, error) {
|
||||
// Error responses should have content type text/plain with charset either utf-8 or us-ascii.
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
bs, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
bs = []byte(errors.Wrap(err, "failed to read body").Error())
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil, &Error{Path: req.URL.Path, Code: resp.StatusCode, Message: string(bs)}
|
||||
}
|
||||
|
||||
return bs, nil
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// Error returned from an HTTP request to a Go module proxy.
|
||||
|
||||
@ -15,9 +15,10 @@ import (
|
||||
|
||||
"github.com/grafana/regexp"
|
||||
"github.com/inconshreveable/log15"
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/reposource"
|
||||
"golang.org/x/mod/module"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/conf/reposource"
|
||||
|
||||
"github.com/sourcegraph/sourcegraph/internal/httpcli"
|
||||
"github.com/sourcegraph/sourcegraph/internal/httptestutil"
|
||||
"github.com/sourcegraph/sourcegraph/internal/lazyregexp"
|
||||
@ -131,10 +132,7 @@ func newTestClient(t testing.TB, name string, update bool) *Client {
|
||||
}
|
||||
})
|
||||
|
||||
hc, err := httpcli.NewFactory(nil, httptestutil.NewRecorderOpt(rec)).Doer()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
hc := httpcli.NewFactory(nil, httptestutil.NewRecorderOpt(rec))
|
||||
|
||||
c := &schema.GoModulesConnection{
|
||||
Urls: []string{"https://proxy.golang.org"},
|
||||
|
||||
@ -36,11 +36,6 @@ type Client interface {
|
||||
FetchTarball(ctx context.Context, dep *reposource.NpmVersionedPackage) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
func init() {
|
||||
// The HTTP client will transparently handle caching,
|
||||
// so we don't need to set up any on-disk caching here.
|
||||
}
|
||||
|
||||
func FetchSources(ctx context.Context, client Client, dependency *reposource.NpmVersionedPackage) (_ io.ReadCloser, err error) {
|
||||
operations := getOperations()
|
||||
|
||||
@ -53,20 +48,24 @@ func FetchSources(ctx context.Context, client Client, dependency *reposource.Npm
|
||||
}
|
||||
|
||||
type HTTPClient struct {
|
||||
registryURL string
|
||||
doer httpcli.Doer
|
||||
limiter *ratelimit.InstrumentedLimiter
|
||||
credentials string
|
||||
registryURL string
|
||||
uncachedClient httpcli.Doer
|
||||
cachedClient httpcli.Doer
|
||||
limiter *ratelimit.InstrumentedLimiter
|
||||
credentials string
|
||||
}
|
||||
|
||||
var _ Client = &HTTPClient{}
|
||||
|
||||
func NewHTTPClient(urn string, registryURL string, credentials string, doer httpcli.Doer) *HTTPClient {
|
||||
func NewHTTPClient(urn string, registryURL string, credentials string, httpfactory *httpcli.Factory) *HTTPClient {
|
||||
uncached, _ := httpfactory.Doer(httpcli.NewCachedTransportOpt(httpcli.NoopCache{}, false))
|
||||
cached, _ := httpfactory.Doer()
|
||||
return &HTTPClient{
|
||||
registryURL: registryURL,
|
||||
doer: doer,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
credentials: credentials,
|
||||
registryURL: registryURL,
|
||||
uncachedClient: uncached,
|
||||
cachedClient: cached,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
credentials: credentials,
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,7 +76,7 @@ type PackageInfo struct {
|
||||
|
||||
func (client *HTTPClient) GetPackageInfo(ctx context.Context, pkg *reposource.NpmPackageName) (info *PackageInfo, err error) {
|
||||
url := fmt.Sprintf("%s/%s", client.registryURL, pkg.PackageSyntax())
|
||||
body, err := client.makeGetRequest(ctx, url)
|
||||
body, err := client.makeGetRequest(ctx, client.uncachedClient, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -111,19 +110,6 @@ func (i illFormedJSONError) Error() string {
|
||||
return fmt.Sprintf("unexpected JSON output from npm request: url=%s", i.url)
|
||||
}
|
||||
|
||||
func (client *HTTPClient) do(ctx context.Context, req *http.Request) (*http.Response, error) {
|
||||
req, ht := nethttp.TraceRequest(ot.GetTracer(ctx), //nolint:staticcheck // Drop once we get rid of OpenTracing
|
||||
req.WithContext(ctx),
|
||||
nethttp.OperationName("npm"),
|
||||
nethttp.ClientTrace(false))
|
||||
defer ht.Finish()
|
||||
|
||||
if err := client.limiter.Wait(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client.doer.Do(req)
|
||||
}
|
||||
|
||||
type npmError struct {
|
||||
statusCode int
|
||||
err error
|
||||
@ -140,7 +126,7 @@ func (n npmError) NotFound() bool {
|
||||
return n.statusCode == http.StatusNotFound
|
||||
}
|
||||
|
||||
func (client *HTTPClient) makeGetRequest(ctx context.Context, url string) (io.ReadCloser, error) {
|
||||
func (client *HTTPClient) makeGetRequest(ctx context.Context, doer httpcli.Doer, url string) (io.ReadCloser, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -150,7 +136,20 @@ func (client *HTTPClient) makeGetRequest(ctx context.Context, url string) (io.Re
|
||||
req.Header.Set("Authorization", "Bearer "+client.credentials)
|
||||
}
|
||||
|
||||
resp, err := client.do(ctx, req)
|
||||
do := func() (*http.Response, error) {
|
||||
req, ht := nethttp.TraceRequest(ot.GetTracer(ctx), //nolint:staticcheck // Drop once we get rid of OpenTracing
|
||||
req.WithContext(ctx),
|
||||
nethttp.OperationName("npm"),
|
||||
nethttp.ClientTrace(false))
|
||||
defer ht.Finish()
|
||||
|
||||
if err := client.limiter.Wait(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return doer.Do(req)
|
||||
}
|
||||
|
||||
resp, err := do()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -171,7 +170,7 @@ func (client *HTTPClient) makeGetRequest(ctx context.Context, url string) (io.Re
|
||||
func (client *HTTPClient) GetDependencyInfo(ctx context.Context, dep *reposource.NpmVersionedPackage) (*DependencyInfo, error) {
|
||||
// https://github.com/npm/registry/blob/master/docs/REGISTRY-API.md#getVersionedPackage
|
||||
url := fmt.Sprintf("%s/%s/%s", client.registryURL, dep.PackageSyntax(), dep.Version)
|
||||
body, err := client.makeGetRequest(ctx, url)
|
||||
body, err := client.makeGetRequest(ctx, client.cachedClient, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -189,5 +188,7 @@ func (client *HTTPClient) FetchTarball(ctx context.Context, dep *reposource.NpmV
|
||||
if dep.TarballURL == "" {
|
||||
return nil, errors.New("empty TarballURL")
|
||||
}
|
||||
return client.makeGetRequest(ctx, dep.TarballURL)
|
||||
|
||||
// don't want to store these in redis
|
||||
return client.makeGetRequest(ctx, client.uncachedClient, dep.TarballURL)
|
||||
}
|
||||
|
||||
@ -34,10 +34,7 @@ func newTestHTTPClient(t *testing.T) (client *HTTPClient, stop func()) {
|
||||
t.Helper()
|
||||
recorderFactory, stop := httptestutil.NewRecorderFactory(t, *updateRecordings, t.Name())
|
||||
|
||||
doer, err := recorderFactory.Doer()
|
||||
require.Nil(t, err)
|
||||
|
||||
client = NewHTTPClient("urn", "https://registry.npmjs.org", "", doer)
|
||||
client = NewHTTPClient("urn", "https://registry.npmjs.org", "", recorderFactory)
|
||||
return client, stop
|
||||
}
|
||||
|
||||
@ -76,7 +73,7 @@ func TestCredentials(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
client := NewHTTPClient("urn", server.URL, credentials, httpcli.ExternalDoer)
|
||||
client := NewHTTPClient("urn", server.URL, credentials, httpcli.ExternalClientFactory)
|
||||
|
||||
presentDep, err := reposource.ParseNpmVersionedPackage("left-pad@1.3.0")
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -48,24 +48,28 @@ type Client struct {
|
||||
// A list of PyPI proxies. Each url should point to the root of the simple-API.
|
||||
// For example for pypi.org the url should be https://pypi.org/simple with or
|
||||
// without a trailing slash.
|
||||
urls []string
|
||||
cli httpcli.Doer
|
||||
urls []string
|
||||
uncachedClient httpcli.Doer
|
||||
cachedClient httpcli.Doer
|
||||
|
||||
// Self-imposed rate-limiter. pypi.org does not impose a rate limiting policy.
|
||||
limiter *ratelimit.InstrumentedLimiter
|
||||
}
|
||||
|
||||
func NewClient(urn string, urls []string, cli httpcli.Doer) *Client {
|
||||
func NewClient(urn string, urls []string, httpfactory *httpcli.Factory) *Client {
|
||||
uncached, _ := httpfactory.Doer(httpcli.NewCachedTransportOpt(httpcli.NoopCache{}, false))
|
||||
cached, _ := httpfactory.Doer()
|
||||
return &Client{
|
||||
urls: urls,
|
||||
cli: cli,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
urls: urls,
|
||||
uncachedClient: uncached,
|
||||
cachedClient: cached,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
}
|
||||
}
|
||||
|
||||
// Project returns the Files of the simple-API /<project>/ endpoint.
|
||||
func (c *Client) Project(ctx context.Context, project reposource.PackageName) ([]File, error) {
|
||||
data, err := c.get(ctx, reposource.PackageName(normalize(string(project))))
|
||||
data, err := c.get(ctx, c.cachedClient, reposource.PackageName(normalize(string(project))))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "PyPI")
|
||||
}
|
||||
@ -275,7 +279,7 @@ func (c *Client) Download(ctx context.Context, url string) (io.ReadCloser, error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b, err := c.do(req)
|
||||
b, err := c.do(c.uncachedClient, req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "PyPI")
|
||||
}
|
||||
@ -401,7 +405,7 @@ func ToWheel(f File) (*Wheel, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) get(ctx context.Context, project reposource.PackageName) (respBody io.ReadCloser, err error) {
|
||||
func (c *Client) get(ctx context.Context, doer httpcli.Doer, project reposource.PackageName) (respBody io.ReadCloser, err error) {
|
||||
var (
|
||||
reqURL *url.URL
|
||||
req *http.Request
|
||||
@ -429,7 +433,7 @@ func (c *Client) get(ctx context.Context, project reposource.PackageName) (respB
|
||||
return nil, err
|
||||
}
|
||||
|
||||
respBody, err = c.do(req)
|
||||
respBody, err = c.do(doer, req)
|
||||
if err == nil || !errcode.IsNotFound(err) {
|
||||
break
|
||||
}
|
||||
@ -438,8 +442,8 @@ func (c *Client) get(ctx context.Context, project reposource.PackageName) (respB
|
||||
return respBody, err
|
||||
}
|
||||
|
||||
func (c *Client) do(req *http.Request) (io.ReadCloser, error) {
|
||||
resp, err := c.cli.Do(req)
|
||||
func (c *Client) do(doer httpcli.Doer, req *http.Request) (io.ReadCloser, error) {
|
||||
resp, err := doer.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -446,10 +446,7 @@ func newTestClient(t testing.TB, name string, update bool) *Client {
|
||||
}
|
||||
})
|
||||
|
||||
doer, err := httpcli.NewFactory(nil, httptestutil.NewRecorderOpt(rec)).Doer()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
doer := httpcli.NewFactory(nil, httptestutil.NewRecorderOpt(rec))
|
||||
|
||||
c := NewClient("urn", []string{"https://pypi.org/simple"}, doer)
|
||||
return c
|
||||
|
||||
4
internal/extsvc/rubygems/BUILD.bazel
generated
4
internal/extsvc/rubygems/BUILD.bazel
generated
@ -14,9 +14,9 @@ go_library(
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "rubygems_test",
|
||||
name = "client_test",
|
||||
timeout = "short",
|
||||
srcs = ["rubygems_test.go"],
|
||||
srcs = ["client_test.go"],
|
||||
data = glob(["testdata/**"]),
|
||||
embed = [":rubygems"],
|
||||
deps = [
|
||||
|
||||
@ -16,38 +16,39 @@ import (
|
||||
type Client struct {
|
||||
registryURL string
|
||||
|
||||
cli httpcli.Doer
|
||||
uncachedClient httpcli.Doer
|
||||
|
||||
// Self-imposed rate-limiter.
|
||||
limiter *ratelimit.InstrumentedLimiter
|
||||
}
|
||||
|
||||
func NewClient(urn string, registryURL string, cli httpcli.Doer) *Client {
|
||||
func NewClient(urn string, registryURL string, httpfactory *httpcli.Factory) *Client {
|
||||
uncached, _ := httpfactory.Doer(httpcli.NewCachedTransportOpt(httpcli.NoopCache{}, false))
|
||||
return &Client{
|
||||
registryURL: registryURL,
|
||||
cli: cli,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
registryURL: registryURL,
|
||||
uncachedClient: uncached,
|
||||
limiter: ratelimit.DefaultRegistry.Get(urn),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) GetPackageContents(ctx context.Context, dep reposource.VersionedPackage) (body io.ReadCloser, url string, err error) {
|
||||
url = fmt.Sprintf("%s/gems/%s-%s.gem", strings.TrimSuffix(c.registryURL, "/"), dep.PackageSyntax(), dep.PackageVersion())
|
||||
func (c *Client) GetPackageContents(ctx context.Context, dep reposource.VersionedPackage) (body io.ReadCloser, err error) {
|
||||
url := fmt.Sprintf("%s/gems/%s-%s.gem", strings.TrimSuffix(c.registryURL, "/"), dep.PackageSyntax(), dep.PackageVersion())
|
||||
|
||||
if err := c.limiter.Wait(ctx); err != nil {
|
||||
return nil, url, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, url, err
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Add("User-Agent", "sourcegraph-rubygems-syncer (sourcegraph.com)")
|
||||
|
||||
body, err = c.do(req)
|
||||
body, err = c.do(c.uncachedClient, req)
|
||||
if err != nil {
|
||||
return nil, url, err
|
||||
return nil, err
|
||||
}
|
||||
return body, url, nil
|
||||
return body, nil
|
||||
}
|
||||
|
||||
type Error struct {
|
||||
@ -64,8 +65,8 @@ func (e *Error) NotFound() bool {
|
||||
return e.code == http.StatusNotFound
|
||||
}
|
||||
|
||||
func (c *Client) do(req *http.Request) (io.ReadCloser, error) {
|
||||
resp, err := c.cli.Do(req)
|
||||
func (c *Client) do(doer httpcli.Doer, req *http.Request) (io.ReadCloser, error) {
|
||||
resp, err := doer.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -28,10 +28,7 @@ func newTestHTTPClient(t *testing.T) (client *Client, stop func()) {
|
||||
t.Helper()
|
||||
recorderFactory, stop := httptestutil.NewRecorderFactory(t, *updateRecordings, t.Name())
|
||||
|
||||
doer, err := recorderFactory.Doer()
|
||||
require.Nil(t, err)
|
||||
|
||||
return NewClient("rubygems_urn", "https://rubygems.org", doer), stop
|
||||
return NewClient("rubygems_urn", "https://rubygems.org", recorderFactory), stop
|
||||
}
|
||||
|
||||
func TestGetPackageContents(t *testing.T) {
|
||||
@ -39,7 +36,7 @@ func TestGetPackageContents(t *testing.T) {
|
||||
client, stop := newTestHTTPClient(t)
|
||||
defer stop()
|
||||
dep := reposource.ParseRubyVersionedPackage("hola@0.1.0")
|
||||
readCloser, _, err := client.GetPackageContents(ctx, dep)
|
||||
readCloser, err := client.GetPackageContents(ctx, dep)
|
||||
require.Nil(t, err)
|
||||
defer readCloser.Close()
|
||||
|
||||
@ -93,8 +93,16 @@ var CachedTransportOpt = NewCachedTransportOpt(redisCache, true)
|
||||
|
||||
// ExternalClientFactory is a httpcli.Factory with common options
|
||||
// and middleware pre-set for communicating with external services.
|
||||
// WARN: Clients from this factory cache entire responses for etag matching. Do not
|
||||
// use them for one-off requests if possible, and definitely not for larger payloads,
|
||||
// like downloading arbitrarily sized files! See UncachedExternalClientFactory instead.
|
||||
var ExternalClientFactory = NewExternalClientFactory()
|
||||
|
||||
// UncachedExternalClientFactory is a httpcli.Factory with common options
|
||||
// and middleware pre-set for communicating with external services, but with caching
|
||||
// responses disabled.
|
||||
var UncachedExternalClientFactory = newExternalClientFactory(false)
|
||||
|
||||
var (
|
||||
externalTimeout, _ = time.ParseDuration(env.Get("SRC_HTTP_CLI_EXTERNAL_TIMEOUT", "5m", "Timeout for external HTTP requests"))
|
||||
externalRetryDelayBase, _ = time.ParseDuration(env.Get("SRC_HTTP_CLI_EXTERNAL_RETRY_DELAY_BASE", "200ms", "Base retry delay duration for external HTTP requests"))
|
||||
@ -105,7 +113,19 @@ var (
|
||||
// NewExternalClientFactory returns a httpcli.Factory with common options
|
||||
// and middleware pre-set for communicating with external services. Additional
|
||||
// middleware can also be provided to e.g. enable logging with NewLoggingMiddleware.
|
||||
// WARN: Clients from this factory cache entire responses for etag matching. Do not
|
||||
// use them for one-off requests if possible, and definitely not for larger payloads,
|
||||
// like downloading arbitrarily sized files!
|
||||
func NewExternalClientFactory(middleware ...Middleware) *Factory {
|
||||
return newExternalClientFactory(true, middleware...)
|
||||
}
|
||||
|
||||
// NewExternalClientFactory returns a httpcli.Factory with common options
|
||||
// and middleware pre-set for communicating with external services. Additional
|
||||
// middleware can also be provided to e.g. enable logging with NewLoggingMiddleware.
|
||||
// If cache is true, responses will be cached in redis for improved rate limiting
|
||||
// and reduced byte transfer sizes.
|
||||
func newExternalClientFactory(cache bool, middleware ...Middleware) *Factory {
|
||||
mw := []Middleware{
|
||||
ContextErrorMiddleware,
|
||||
HeadersMiddleware("User-Agent", "Sourcegraph-Bot"),
|
||||
@ -113,8 +133,7 @@ func NewExternalClientFactory(middleware ...Middleware) *Factory {
|
||||
}
|
||||
mw = append(mw, middleware...)
|
||||
|
||||
return NewFactory(
|
||||
NewMiddleware(mw...),
|
||||
opts := []Opt{
|
||||
NewTimeoutOpt(externalTimeout),
|
||||
// ExternalTransportOpt needs to be before TracedTransportOpt and
|
||||
// NewCachedTransportOpt since it wants to extract a http.Transport,
|
||||
@ -125,16 +144,29 @@ func NewExternalClientFactory(middleware ...Middleware) *Factory {
|
||||
ExpJitterDelay(externalRetryDelayBase, externalRetryDelayMax),
|
||||
),
|
||||
TracedTransportOpt,
|
||||
CachedTransportOpt,
|
||||
}
|
||||
if cache {
|
||||
opts = append(opts, CachedTransportOpt)
|
||||
}
|
||||
|
||||
return NewFactory(
|
||||
NewMiddleware(mw...),
|
||||
opts...,
|
||||
)
|
||||
}
|
||||
|
||||
// ExternalDoer is a shared client for external communication. This is a
|
||||
// convenience for existing uses of http.DefaultClient.
|
||||
// WARN: This client caches entire responses for etag matching. Do not use it for
|
||||
// one-off requests if possible, and definitely not for larger payloads, like
|
||||
// downloading arbitrarily sized files! See UncachedExternalClient instead.
|
||||
var ExternalDoer, _ = ExternalClientFactory.Doer()
|
||||
|
||||
// ExternalClient returns a shared client for external communication. This is
|
||||
// a convenience for existing uses of http.DefaultClient.
|
||||
// WARN: This client caches entire responses for etag matching. Do not use it for
|
||||
// one-off requests if possible, and definitely not for larger payloads, like
|
||||
// downloading arbitrarily sized files! See UncachedExternalClient instead.
|
||||
var ExternalClient, _ = ExternalClientFactory.Client()
|
||||
|
||||
// InternalClientFactory is a httpcli.Factory with common options
|
||||
@ -518,7 +550,6 @@ func NewRetryPolicy(max int) rehttp.RetryFn {
|
||||
if retry || a.Error == nil || a.Index == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
if a.Response != nil {
|
||||
|
||||
7
internal/httpcli/noop_response_cache.go
Normal file
7
internal/httpcli/noop_response_cache.go
Normal file
@ -0,0 +1,7 @@
|
||||
package httpcli
|
||||
|
||||
type NoopCache struct{}
|
||||
|
||||
func (c NoopCache) Get(key string) (responseBytes []byte, ok bool) { return nil, false }
|
||||
func (c NoopCache) Set(key string, responseBytes []byte) {}
|
||||
func (c NoopCache) Delete(key string) {}
|
||||
@ -25,17 +25,12 @@ func NewGoPackagesSource(ctx context.Context, svc *types.ExternalService, cf *ht
|
||||
return nil, errors.Errorf("external service id=%d config error: %s", svc.ID, err)
|
||||
}
|
||||
|
||||
cli, err := cf.Doer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &PackagesSource{
|
||||
svc: svc,
|
||||
configDeps: c.Dependencies,
|
||||
scheme: dependencies.GoPackagesScheme,
|
||||
src: &goPackagesSource{
|
||||
client: gomodproxy.NewClient(svc.URN(), c.Urls, cli),
|
||||
client: gomodproxy.NewClient(svc.URN(), c.Urls, cf),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -27,18 +27,13 @@ func NewNpmPackagesSource(ctx context.Context, svc *types.ExternalService, cf *h
|
||||
return nil, errors.Errorf("external service id=%d config error: %s", svc.ID, err)
|
||||
}
|
||||
|
||||
cli, err := cf.Doer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &PackagesSource{
|
||||
svc: svc,
|
||||
configDeps: c.Dependencies,
|
||||
scheme: dependencies.NpmPackagesScheme,
|
||||
/* depsSvc initialized in SetDependenciesService */
|
||||
src: &npmPackagesSource{
|
||||
client: npm.NewHTTPClient(svc.URN(), c.Registry, c.Credentials, cli),
|
||||
client: npm.NewHTTPClient(svc.URN(), c.Registry, c.Credentials, cf),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -25,16 +25,11 @@ func NewPythonPackagesSource(ctx context.Context, svc *types.ExternalService, cf
|
||||
return nil, errors.Errorf("external service id=%d config error: %s", svc.ID, err)
|
||||
}
|
||||
|
||||
cli, err := cf.Doer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &PackagesSource{
|
||||
svc: svc,
|
||||
configDeps: c.Dependencies,
|
||||
scheme: dependencies.PythonPackagesScheme,
|
||||
src: &pythonPackagesSource{client: pypi.NewClient(svc.URN(), c.Urls, cli)},
|
||||
src: &pythonPackagesSource{client: pypi.NewClient(svc.URN(), c.Urls, cf)},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@ -25,16 +25,11 @@ func NewRubyPackagesSource(ctx context.Context, svc *types.ExternalService, cf *
|
||||
return nil, errors.Errorf("external service id=%d config error: %s", svc.ID, err)
|
||||
}
|
||||
|
||||
cli, err := cf.Doer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &PackagesSource{
|
||||
svc: svc,
|
||||
configDeps: c.Dependencies,
|
||||
scheme: dependencies.RubyPackagesScheme,
|
||||
src: &rubyPackagesSource{client: rubygems.NewClient(svc.URN(), c.Repository, cli)},
|
||||
src: &rubyPackagesSource{client: rubygems.NewClient(svc.URN(), c.Repository, cf)},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@ -25,16 +25,11 @@ func NewRustPackagesSource(ctx context.Context, svc *types.ExternalService, cf *
|
||||
return nil, errors.Errorf("external service id=%d config error: %s", svc.ID, err)
|
||||
}
|
||||
|
||||
cli, err := cf.Doer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &PackagesSource{
|
||||
svc: svc,
|
||||
configDeps: c.Dependencies,
|
||||
scheme: dependencies.RustPackagesScheme,
|
||||
src: &rustPackagesSource{client: crates.NewClient(svc.URN(), cli)},
|
||||
src: &rustPackagesSource{client: crates.NewClient(svc.URN(), cf)},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user