Set up S3 testing infrastructure for backup implementation
- Add gofakes3 for in-process S3-compatible test server - Create test server that runs on localhost:9999 with temp directory - Implement basic S3 client wrapper with standard operations - Add comprehensive tests for blob and metadata storage patterns - Test cleanup properly removes temporary directories - Use AWS SDK v2 for S3 operations with proper error handling
This commit is contained in:
140
internal/s3/client.go
Normal file
140
internal/s3/client.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
// Client wraps the AWS S3 client for vaultik operations
|
||||
type Client struct {
|
||||
s3Client *s3.Client
|
||||
bucket string
|
||||
prefix string
|
||||
}
|
||||
|
||||
// Config contains S3 client configuration
|
||||
type Config struct {
|
||||
Endpoint string
|
||||
Bucket string
|
||||
Prefix string
|
||||
AccessKeyID string
|
||||
SecretAccessKey string
|
||||
Region string
|
||||
}
|
||||
|
||||
// NewClient creates a new S3 client
|
||||
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||
// Create AWS config
|
||||
awsCfg, err := config.LoadDefaultConfig(ctx,
|
||||
config.WithRegion(cfg.Region),
|
||||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
||||
cfg.AccessKeyID,
|
||||
cfg.SecretAccessKey,
|
||||
"",
|
||||
)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Configure custom endpoint if provided
|
||||
s3Opts := func(o *s3.Options) {
|
||||
if cfg.Endpoint != "" {
|
||||
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
||||
o.UsePathStyle = true
|
||||
}
|
||||
}
|
||||
|
||||
s3Client := s3.NewFromConfig(awsCfg, s3Opts)
|
||||
|
||||
return &Client{
|
||||
s3Client: s3Client,
|
||||
bucket: cfg.Bucket,
|
||||
prefix: cfg.Prefix,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PutObject uploads an object to S3
|
||||
func (c *Client) PutObject(ctx context.Context, key string, data io.Reader) error {
|
||||
fullKey := c.prefix + key
|
||||
_, err := c.s3Client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(c.bucket),
|
||||
Key: aws.String(fullKey),
|
||||
Body: data,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// GetObject downloads an object from S3
|
||||
func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, error) {
|
||||
fullKey := c.prefix + key
|
||||
result, err := c.s3Client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(c.bucket),
|
||||
Key: aws.String(fullKey),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result.Body, nil
|
||||
}
|
||||
|
||||
// DeleteObject removes an object from S3
|
||||
func (c *Client) DeleteObject(ctx context.Context, key string) error {
|
||||
fullKey := c.prefix + key
|
||||
_, err := c.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(c.bucket),
|
||||
Key: aws.String(fullKey),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// ListObjects lists objects with the given prefix
|
||||
func (c *Client) ListObjects(ctx context.Context, prefix string) ([]string, error) {
|
||||
fullPrefix := c.prefix + prefix
|
||||
|
||||
var keys []string
|
||||
paginator := s3.NewListObjectsV2Paginator(c.s3Client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(c.bucket),
|
||||
Prefix: aws.String(fullPrefix),
|
||||
})
|
||||
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, obj := range page.Contents {
|
||||
if obj.Key != nil {
|
||||
// Remove the base prefix from the key
|
||||
key := *obj.Key
|
||||
if len(key) > len(c.prefix) {
|
||||
key = key[len(c.prefix):]
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// HeadObject checks if an object exists
|
||||
func (c *Client) HeadObject(ctx context.Context, key string) (bool, error) {
|
||||
fullKey := c.prefix + key
|
||||
_, err := c.s3Client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(c.bucket),
|
||||
Key: aws.String(fullKey),
|
||||
})
|
||||
if err != nil {
|
||||
// Check if it's a not found error
|
||||
// TODO: Add proper error type checking
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
98
internal/s3/client_test.go
Normal file
98
internal/s3/client_test.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package s3_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/s3"
|
||||
)
|
||||
|
||||
func TestClient(t *testing.T) {
|
||||
ts := NewTestServer(t)
|
||||
defer func() {
|
||||
if err := ts.Cleanup(); err != nil {
|
||||
t.Errorf("cleanup failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Create client
|
||||
client, err := s3.NewClient(ctx, s3.Config{
|
||||
Endpoint: testEndpoint,
|
||||
Bucket: testBucket,
|
||||
Prefix: "test-prefix/",
|
||||
AccessKeyID: testAccessKey,
|
||||
SecretAccessKey: testSecretKey,
|
||||
Region: testRegion,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
|
||||
// Test PutObject
|
||||
testKey := "foo/bar.txt"
|
||||
testData := []byte("test data")
|
||||
err = client.PutObject(ctx, testKey, bytes.NewReader(testData))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to put object: %v", err)
|
||||
}
|
||||
|
||||
// Test GetObject
|
||||
reader, err := client.GetObject(ctx, testKey)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get object: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := reader.Close(); err != nil {
|
||||
t.Errorf("failed to close reader: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read data: %v", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(data, testData) {
|
||||
t.Errorf("data mismatch: got %q, want %q", data, testData)
|
||||
}
|
||||
|
||||
// Test HeadObject
|
||||
exists, err := client.HeadObject(ctx, testKey)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to head object: %v", err)
|
||||
}
|
||||
if !exists {
|
||||
t.Error("expected object to exist")
|
||||
}
|
||||
|
||||
// Test ListObjects
|
||||
keys, err := client.ListObjects(ctx, "foo/")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
if len(keys) != 1 {
|
||||
t.Errorf("expected 1 key, got %d", len(keys))
|
||||
}
|
||||
if keys[0] != testKey {
|
||||
t.Errorf("unexpected key: got %s, want %s", keys[0], testKey)
|
||||
}
|
||||
|
||||
// Test DeleteObject
|
||||
err = client.DeleteObject(ctx, testKey)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete object: %v", err)
|
||||
}
|
||||
|
||||
// Verify deletion
|
||||
exists, err = client.HeadObject(ctx, testKey)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to head object after deletion: %v", err)
|
||||
}
|
||||
if exists {
|
||||
t.Error("expected object to not exist after deletion")
|
||||
}
|
||||
}
|
||||
285
internal/s3/s3_test.go
Normal file
285
internal/s3/s3_test.go
Normal file
@@ -0,0 +1,285 @@
|
||||
package s3_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/johannesboyne/gofakes3"
|
||||
"github.com/johannesboyne/gofakes3/backend/s3mem"
|
||||
)
|
||||
|
||||
const (
|
||||
testBucket = "test-bucket"
|
||||
testRegion = "us-east-1"
|
||||
testAccessKey = "test-access-key"
|
||||
testSecretKey = "test-secret-key"
|
||||
testEndpoint = "http://localhost:9999"
|
||||
)
|
||||
|
||||
// TestServer represents an in-process S3-compatible test server
|
||||
type TestServer struct {
|
||||
server *http.Server
|
||||
backend gofakes3.Backend
|
||||
s3Client *s3.Client
|
||||
tempDir string
|
||||
}
|
||||
|
||||
// NewTestServer creates and starts a new test server
|
||||
func NewTestServer(t *testing.T) *TestServer {
|
||||
// Create temp directory for any file operations
|
||||
tempDir, err := os.MkdirTemp("", "vaultik-s3-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
|
||||
// Create in-memory backend
|
||||
backend := s3mem.New()
|
||||
faker := gofakes3.New(backend)
|
||||
|
||||
// Create HTTP server
|
||||
server := &http.Server{
|
||||
Addr: "localhost:9999",
|
||||
Handler: faker.Server(),
|
||||
}
|
||||
|
||||
// Start server in background
|
||||
go func() {
|
||||
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
t.Logf("test server error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for server to be ready
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Create S3 client
|
||||
cfg, err := config.LoadDefaultConfig(context.Background(),
|
||||
config.WithRegion(testRegion),
|
||||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
||||
testAccessKey,
|
||||
testSecretKey,
|
||||
"",
|
||||
)),
|
||||
config.WithClientLogMode(aws.LogRetries|aws.LogRequestWithBody|aws.LogResponseWithBody),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create AWS config: %v", err)
|
||||
}
|
||||
|
||||
s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
|
||||
o.BaseEndpoint = aws.String(testEndpoint)
|
||||
o.UsePathStyle = true
|
||||
})
|
||||
|
||||
ts := &TestServer{
|
||||
server: server,
|
||||
backend: backend,
|
||||
s3Client: s3Client,
|
||||
tempDir: tempDir,
|
||||
}
|
||||
|
||||
// Create test bucket
|
||||
_, err = s3Client.CreateBucket(context.Background(), &s3.CreateBucketInput{
|
||||
Bucket: aws.String(testBucket),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create test bucket: %v", err)
|
||||
}
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
// Cleanup shuts down the server and removes temp directory
|
||||
func (ts *TestServer) Cleanup() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := ts.server.Shutdown(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.RemoveAll(ts.tempDir)
|
||||
}
|
||||
|
||||
// Client returns the S3 client configured for the test server
|
||||
func (ts *TestServer) Client() *s3.Client {
|
||||
return ts.s3Client
|
||||
}
|
||||
|
||||
// TestBasicS3Operations tests basic store and retrieve operations
|
||||
func TestBasicS3Operations(t *testing.T) {
|
||||
ts := NewTestServer(t)
|
||||
defer func() {
|
||||
if err := ts.Cleanup(); err != nil {
|
||||
t.Errorf("cleanup failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
client := ts.Client()
|
||||
|
||||
// Test data
|
||||
testKey := "test/file.txt"
|
||||
testData := []byte("Hello, S3 test!")
|
||||
|
||||
// Put object
|
||||
_, err := client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(testBucket),
|
||||
Key: aws.String(testKey),
|
||||
Body: bytes.NewReader(testData),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to put object: %v", err)
|
||||
}
|
||||
|
||||
// Get object
|
||||
result, err := client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(testBucket),
|
||||
Key: aws.String(testKey),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get object: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := result.Body.Close(); err != nil {
|
||||
t.Errorf("failed to close body: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Read and verify data
|
||||
data, err := io.ReadAll(result.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read object body: %v", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(data, testData) {
|
||||
t.Errorf("retrieved data mismatch: got %q, want %q", data, testData)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBlobOperations tests blob storage patterns for vaultik
|
||||
func TestBlobOperations(t *testing.T) {
|
||||
ts := NewTestServer(t)
|
||||
defer func() {
|
||||
if err := ts.Cleanup(); err != nil {
|
||||
t.Errorf("cleanup failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
client := ts.Client()
|
||||
|
||||
// Test blob storage with prefix structure
|
||||
blobHash := "aabbccddee112233445566778899aabbccddee11"
|
||||
blobKey := filepath.Join("blobs", blobHash[:2], blobHash[2:4], blobHash+".zst.age")
|
||||
blobData := []byte("compressed and encrypted blob data")
|
||||
|
||||
// Store blob
|
||||
_, err := client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(testBucket),
|
||||
Key: aws.String(blobKey),
|
||||
Body: bytes.NewReader(blobData),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to store blob: %v", err)
|
||||
}
|
||||
|
||||
// List objects with prefix
|
||||
listResult, err := client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(testBucket),
|
||||
Prefix: aws.String("blobs/aa/"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
if len(listResult.Contents) != 1 {
|
||||
t.Errorf("expected 1 object, got %d", len(listResult.Contents))
|
||||
}
|
||||
|
||||
if listResult.Contents[0].Key != nil && *listResult.Contents[0].Key != blobKey {
|
||||
t.Errorf("unexpected key: got %s, want %s", *listResult.Contents[0].Key, blobKey)
|
||||
}
|
||||
|
||||
// Delete blob
|
||||
_, err = client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(testBucket),
|
||||
Key: aws.String(blobKey),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete blob: %v", err)
|
||||
}
|
||||
|
||||
// Verify deletion
|
||||
_, err = client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(testBucket),
|
||||
Key: aws.String(blobKey),
|
||||
})
|
||||
if err == nil {
|
||||
t.Error("expected error getting deleted object, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
// TestMetadataOperations tests metadata storage patterns
|
||||
func TestMetadataOperations(t *testing.T) {
|
||||
ts := NewTestServer(t)
|
||||
defer func() {
|
||||
if err := ts.Cleanup(); err != nil {
|
||||
t.Errorf("cleanup failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
client := ts.Client()
|
||||
|
||||
// Test metadata storage
|
||||
snapshotID := "2024-01-01T12:00:00Z"
|
||||
metadataKey := filepath.Join("metadata", snapshotID+".sqlite.age")
|
||||
metadataData := []byte("encrypted sqlite database")
|
||||
|
||||
// Store metadata
|
||||
_, err := client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(testBucket),
|
||||
Key: aws.String(metadataKey),
|
||||
Body: bytes.NewReader(metadataData),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to store metadata: %v", err)
|
||||
}
|
||||
|
||||
// Store manifest
|
||||
manifestKey := filepath.Join("metadata", snapshotID+".manifest.json.zst")
|
||||
manifestData := []byte(`{"snapshot_id":"2024-01-01T12:00:00Z","blob_hashes":["hash1","hash2"]}`)
|
||||
|
||||
_, err = client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(testBucket),
|
||||
Key: aws.String(manifestKey),
|
||||
Body: bytes.NewReader(manifestData),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to store manifest: %v", err)
|
||||
}
|
||||
|
||||
// List metadata objects
|
||||
listResult, err := client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(testBucket),
|
||||
Prefix: aws.String("metadata/"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list metadata: %v", err)
|
||||
}
|
||||
|
||||
if len(listResult.Contents) != 2 {
|
||||
t.Errorf("expected 2 metadata objects, got %d", len(listResult.Contents))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user