sourcegraph/internal/object/s3_client_test.go
Varun Gandhi 9145768648
chore: Rename uploadstore packages for clarity (#63931)
- The `internal/uploadstore` package is renamed to `object` indicating
   that it is meant to provide a generic object storage wrapper.
- The `search/exhaustive/uploadstore` package is used in very few places
  so I've merged into the `internal/search` package similar to
  `internal/embeddings`.

There are a few reasons to do the renaming.

1. The word `upload` in a more general context is ambiguous (just in
    `internal/`) - in the codeintel context, it means "SCIP index" but it
    can also be interpreted generically ("upload of _some_ data").
2. Better readability - `object.Storage` is much shorter than
    `uploadstore.Store`. Additionally, we use the term `Store` A LOT
    in the codebase, and usually, these refer to wrappers over some
    tables in some DB.

    Making things worse, some of our code also has:

    ```
    uploadsstore
"github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/internal/store"
    ```

And code which says `uploadsstore.Store` (notice the extra `s` 😢), which
is actually a wrapper over some key DB tables like `lsif_uploads`.
2024-07-22 08:57:56 +08:00

373 lines
12 KiB
Go

package object
import (
"bytes"
"context"
"fmt"
"io"
"sort"
"strconv"
"testing"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/google/go-cmp/cmp"
"github.com/grafana/regexp"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
func TestS3Init(t *testing.T) {
s3Client := NewMockS3API()
client := testS3Client(s3Client, nil)
if err := client.Init(context.Background()); err != nil {
t.Fatalf("unexpected error initializing client: %s", err)
}
if calls := s3Client.CreateBucketFunc.History(); len(calls) != 1 {
t.Fatalf("unexpected number of CreateBucket calls. want=%d have=%d", 1, len(calls))
} else if value := *calls[0].Arg1.Bucket; value != "test-bucket" {
t.Errorf("unexpected bucket argument. want=%s have=%s", "test-bucket", value)
}
}
func TestS3InitBucketExists(t *testing.T) {
for _, err := range []error{&s3types.BucketAlreadyExists{}, &s3types.BucketAlreadyOwnedByYou{}} {
s3Client := NewMockS3API()
s3Client.CreateBucketFunc.SetDefaultReturn(nil, err)
client := testS3Client(s3Client, nil)
if err := client.Init(context.Background()); err != nil {
t.Fatalf("unexpected error initializing client: %s", err)
}
if calls := s3Client.CreateBucketFunc.History(); len(calls) != 1 {
t.Fatalf("unexpected number of CreateBucket calls. want=%d have=%d", 1, len(calls))
} else if value := *calls[0].Arg1.Bucket; value != "test-bucket" {
t.Errorf("unexpected bucket argument. want=%s have=%s", "test-bucket", value)
}
}
}
func TestS3UnmanagedInit(t *testing.T) {
s3Client := NewMockS3API()
client := newS3WithClients(s3Client, nil, "test-bucket", false, NewOperations(observation.TestContextTB(t), "test", "brittleStore"))
if err := client.Init(context.Background()); err != nil {
t.Fatalf("unexpected error initializing client: %s", err)
}
if calls := s3Client.CreateBucketFunc.History(); len(calls) != 0 {
t.Fatalf("unexpected number of CreateBucket calls. want=%d have=%d", 0, len(calls))
}
}
func TestS3Get(t *testing.T) {
s3Client := NewMockS3API()
s3Client.GetObjectFunc.SetDefaultReturn(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("TEST PAYLOAD"))),
}, nil)
client := newS3WithClients(s3Client, nil, "test-bucket", false, NewOperations(observation.TestContextTB(t), "test", "brittleStore"))
rc, err := client.Get(context.Background(), "test-key")
if err != nil {
t.Fatalf("unexpected error getting key: %s", err)
}
defer rc.Close()
contents, err := io.ReadAll(rc)
if err != nil {
t.Fatalf("unexpected error reading object: %s", err)
}
if string(contents) != "TEST PAYLOAD" {
t.Fatalf("unexpected contents. want=%s have=%s", "TEST PAYLOAD", contents)
}
if calls := s3Client.GetObjectFunc.History(); len(calls) != 1 {
t.Fatalf("unexpected number of GetObject calls. want=%d have=%d", 1, len(calls))
} else if value := *calls[0].Arg1.Bucket; value != "test-bucket" {
t.Errorf("unexpected bucket argument. want=%s have=%s", "test-bucket", value)
} else if value := *calls[0].Arg1.Key; value != "test-key" {
t.Errorf("unexpected key argument. want=%s have=%s", "test-key", value)
} else if value := calls[0].Arg1.Range; value != nil {
t.Errorf("unexpected range argument. want=%v have=%v", nil, value)
}
}
var bytesPattern = regexp.MustCompile(`bytes=(\d+)-`)
func TestS3GetTransientErrors(t *testing.T) {
// read 50 bytes then return a connection reset error
ioCopyHook = func(w io.Writer, r io.Reader) (int64, error) {
var buf bytes.Buffer
_, readErr := io.CopyN(&buf, r, 50)
if readErr != nil && readErr != io.EOF {
return 0, readErr
}
n, writeErr := io.Copy(w, bytes.NewReader(buf.Bytes()))
if writeErr != nil {
return 0, writeErr
}
if readErr == io.EOF {
readErr = nil
} else {
readErr = errors.New("read: connection reset by peer")
}
return n, readErr
}
s3Client := fullContentsS3API()
client := newS3WithClients(s3Client, nil, "test-bucket", false, NewOperations(observation.TestContextTB(t), "test", "brittleStore"))
rc, err := client.Get(context.Background(), "test-key")
if err != nil {
t.Fatalf("unexpected error getting key: %s", err)
}
defer rc.Close()
contents, err := io.ReadAll(rc)
if err != nil {
t.Fatalf("unexpected error reading object: %s", err)
}
if diff := cmp.Diff(fullContents, contents); diff != "" {
t.Errorf("unexpected payload (-want +got):\n%s", diff)
}
expectedGetObjectCalls := len(fullContents)/50 + 1
if calls := s3Client.GetObjectFunc.History(); len(calls) != expectedGetObjectCalls {
t.Fatalf("unexpected number of GetObject calls. want=%d have=%d", expectedGetObjectCalls, len(calls))
}
}
func TestS3GetReadNothingLoop(t *testing.T) {
// read nothing then return a connection reset error
ioCopyHook = func(_ io.Writer, _ io.Reader) (int64, error) {
return 0, errors.New("read: connection reset by peer")
}
s3Client := fullContentsS3API()
client := newS3WithClients(s3Client, nil, "test-bucket", false, NewOperations(observation.TestContextTB(t), "test", "brittleStore"))
rc, err := client.Get(context.Background(), "test-key")
if err != nil {
t.Fatalf("unexpected error getting key: %s", err)
}
defer rc.Close()
if _, err := io.ReadAll(rc); err != errNoDownloadProgress {
t.Fatalf("unexpected error reading object. want=%q have=%q", errNoDownloadProgress, err)
}
}
var fullContents = func() []byte {
var fullContents []byte
for i := range 1000 {
fullContents = append(fullContents, []byte(fmt.Sprintf("payload %d\n", i))...)
}
return fullContents
}()
func fullContentsS3API() *MockS3API {
s3Client := NewMockS3API()
s3Client.GetObjectFunc.SetDefaultHook(func(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
offset := 0
if input.Range != nil {
match := bytesPattern.FindStringSubmatch(*input.Range)
if len(match) != 0 {
offset, _ = strconv.Atoi(match[1])
}
}
out := &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader(fullContents[offset:])),
}
return out, nil
})
return s3Client
}
func TestS3Upload(t *testing.T) {
s3Client := NewMockS3API()
uploaderClient := NewMockS3Uploader()
uploaderClient.UploadFunc.SetDefaultHook(func(ctx context.Context, input *s3.PutObjectInput) error {
// Synchronously read the reader so that we trigger the
// counting reader inside the Upload method and test the
// count.
contents, err := io.ReadAll(input.Body)
if err != nil {
return err
}
if string(contents) != "TEST PAYLOAD" {
t.Fatalf("unexpected contents. want=%s have=%s", "TEST PAYLOAD", contents)
}
return nil
})
client := testS3Client(s3Client, uploaderClient)
size, err := client.Upload(context.Background(), "test-key", bytes.NewReader([]byte("TEST PAYLOAD")))
if err != nil {
t.Fatalf("unexpected error getting key: %s", err)
} else if size != 12 {
t.Errorf("unexpected size. want=%d have=%d", 12, size)
}
if calls := uploaderClient.UploadFunc.History(); len(calls) != 1 {
t.Fatalf("unexpected number of Upload calls. want=%d have=%d", 1, len(calls))
} else if value := *calls[0].Arg1.Bucket; value != "test-bucket" {
t.Errorf("unexpected bucket argument. want=%s have=%s", "test-bucket", value)
} else if value := *calls[0].Arg1.Key; value != "test-key" {
t.Errorf("unexpected key argument. want=%s have=%s", "test-key", value)
}
}
func TestS3Combine(t *testing.T) {
s3Client := NewMockS3API()
s3Client.CreateMultipartUploadFunc.SetDefaultReturn(&s3.CreateMultipartUploadOutput{
Bucket: aws.String("test-bucket"),
Key: aws.String("test-key"),
UploadId: aws.String("uid"),
}, nil)
s3Client.UploadPartCopyFunc.SetDefaultHook(func(ctx context.Context, input *s3.UploadPartCopyInput) (*s3.UploadPartCopyOutput, error) {
return &s3.UploadPartCopyOutput{
CopyPartResult: &s3types.CopyPartResult{
ETag: aws.String(fmt.Sprintf("etag-%s", *input.CopySource)),
},
}, nil
})
s3Client.HeadObjectFunc.SetDefaultReturn(&s3.HeadObjectOutput{ContentLength: int64(42)}, nil)
client := testS3Client(s3Client, nil)
size, err := client.Compose(context.Background(), "test-key", "test-src1", "test-src2", "test-src3")
if err != nil {
t.Fatalf("unexpected error getting key: %s", err)
} else if size != 42 {
t.Errorf("unexpected size. want=%d have=%d", 42, size)
}
if calls := s3Client.UploadPartCopyFunc.History(); len(calls) != 3 {
t.Fatalf("unexpected number of UploadPartCopy calls. want=%d have=%d", 3, len(calls))
} else {
parts := map[int32]string{}
for _, call := range calls {
if value := *call.Arg1.Bucket; value != "test-bucket" {
t.Errorf("unexpected bucket argument. want=%s have=%s", "test-bucket", value)
}
if value := *call.Arg1.Key; value != "test-key" {
t.Errorf("unexpected key argument. want=%s have=%s", "test-key", value)
}
if value := *call.Arg1.UploadId; value != "uid" {
t.Errorf("unexpected key argument. want=%s have=%s", "uid", value)
}
parts[call.Arg1.PartNumber] = *call.Arg1.CopySource
}
expectedParts := map[int32]string{
1: "test-bucket/test-src1",
2: "test-bucket/test-src2",
3: "test-bucket/test-src3",
}
if diff := cmp.Diff(expectedParts, parts); diff != "" {
t.Fatalf("unexpected parts payloads (-want, +got):\n%s", diff)
}
}
if calls := s3Client.CreateMultipartUploadFunc.History(); len(calls) != 1 {
t.Fatalf("unexpected number of CreateMultipartUpload calls. want=%d have=%d", 1, len(calls))
} else if value := *calls[0].Arg1.Bucket; value != "test-bucket" {
t.Errorf("unexpected bucket argument. want=%s have=%s", "test-bucket", value)
} else if value := *calls[0].Arg1.Key; value != "test-key" {
t.Errorf("unexpected key argument. want=%s have=%s", "test-key", value)
}
if calls := s3Client.CompleteMultipartUploadFunc.History(); len(calls) != 1 {
t.Fatalf("unexpected number of CompleteMultipartUpload calls. want=%d have=%d", 1, len(calls))
} else if value := *calls[0].Arg1.Bucket; value != "test-bucket" {
t.Errorf("unexpected bucket argument. want=%s have=%s", "test-bucket", value)
} else if value := *calls[0].Arg1.Key; value != "test-key" {
t.Errorf("unexpected key argument. want=%s have=%s", "test-key", value)
} else if value := *calls[0].Arg1.UploadId; value != "uid" {
t.Errorf("unexpected uploadId argument. want=%s have=%s", "uid", value)
} else {
parts := map[int32]string{}
for _, part := range calls[0].Arg1.MultipartUpload.Parts {
parts[part.PartNumber] = *part.ETag
}
expectedParts := map[int32]string{
1: "etag-test-bucket/test-src1",
2: "etag-test-bucket/test-src2",
3: "etag-test-bucket/test-src3",
}
if diff := cmp.Diff(expectedParts, parts); diff != "" {
t.Fatalf("unexpected parts payloads (-want, +got):\n%s", diff)
}
}
if calls := s3Client.AbortMultipartUploadFunc.History(); len(calls) != 0 {
t.Fatalf("unexpected number of AbortMultipartUpload calls. want=%d have=%d", 0, len(calls))
}
if calls := s3Client.DeleteObjectFunc.History(); len(calls) != 3 {
t.Fatalf("unexpected number of DeleteObject calls. want=%d have=%d", 3, len(calls))
} else {
var keys []string
for _, call := range calls {
if value := *call.Arg1.Bucket; value != "test-bucket" {
t.Errorf("unexpected bucket argument. want=%s have=%s", "test-bucket", value)
}
keys = append(keys, *call.Arg1.Key)
}
sort.Strings(keys)
expectedKeys := []string{
"test-src1",
"test-src2",
"test-src3",
}
if diff := cmp.Diff(expectedKeys, keys); diff != "" {
t.Fatalf("unexpected keys (-want, +got):\n%s", diff)
}
}
}
func TestS3Delete(t *testing.T) {
s3Client := NewMockS3API()
s3Client.GetObjectFunc.SetDefaultReturn(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("TEST PAYLOAD"))),
}, nil)
client := testS3Client(s3Client, nil)
if err := client.Delete(context.Background(), "test-key"); err != nil {
t.Fatalf("unexpected error getting key: %s", err)
}
if calls := s3Client.DeleteObjectFunc.History(); len(calls) != 1 {
t.Fatalf("unexpected number of DeleteObject calls. want=%d have=%d", 1, len(calls))
} else if value := *calls[0].Arg1.Bucket; value != "test-bucket" {
t.Errorf("unexpected bucket argument. want=%s have=%s", "test-bucket", value)
} else if value := *calls[0].Arg1.Key; value != "test-key" {
t.Errorf("unexpected key argument. want=%s have=%s", "test-key", value)
}
}
func testS3Client(client s3API, uploader s3Uploader) Storage {
return newLazyStore(rawS3Client(client, uploader))
}
func rawS3Client(client s3API, uploader s3Uploader) *s3Store {
return newS3WithClients(client, uploader, "test-bucket", true, NewOperations(&observation.TestContext, "test", "brittleStore"))
}