This commit represents a significant architectural overhaul of vaultik: Database Schema Changes: - Switch files table to use UUID primary keys instead of path-based keys - Add UUID primary keys to blobs table for immediate chunk association - Update all foreign key relationships to use UUIDs - Add comprehensive schema documentation in DATAMODEL.md - Add SQLite busy timeout handling for concurrent operations Streaming and Performance Improvements: - Implement true streaming blob packing without intermediate storage - Add streaming chunk processing to reduce memory usage - Improve progress reporting with real-time metrics - Add upload metrics tracking in new uploads table CLI Refactoring: - Restructure CLI to use subcommands: snapshot create/list/purge/verify - Add store info command for S3 configuration display - Add custom duration parser supporting days/weeks/months/years - Remove old backup.go in favor of enhanced snapshot.go - Add --cron flag for silent operation Configuration Changes: - Remove unused index_prefix configuration option - Add support for snapshot pruning retention policies - Improve configuration validation and error messages Testing Improvements: - Add comprehensive repository tests with edge cases - Add cascade delete debugging tests - Fix concurrent operation tests to use SQLite busy timeout - Remove tolerance for SQLITE_BUSY errors in tests Documentation: - Add MIT LICENSE file - Update README with new command structure - Add comprehensive DATAMODEL.md explaining database schema - Update DESIGN.md with UUID-based architecture Other Changes: - Add test-config.yml for testing - Update Makefile with better test output formatting - Fix various race conditions in concurrent operations - Improve error handling throughout
327 lines
9.4 KiB
Go
327 lines
9.4 KiB
Go
package s3
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"sync/atomic"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
)
|
|
|
|
// Client wraps the AWS S3 client for vaultik operations.
|
|
// It provides a simplified interface for S3 operations with automatic
|
|
// prefix handling and connection management. All operations are performed
|
|
// within the configured bucket and prefix.
|
|
type Client struct {
|
|
s3Client *s3.Client
|
|
bucket string
|
|
prefix string
|
|
endpoint string
|
|
}
|
|
|
|
// Config contains S3 client configuration.
|
|
// All fields are required except Prefix, which defaults to an empty string.
|
|
// The Endpoint field should include the protocol (http:// or https://).
|
|
type Config struct {
|
|
Endpoint string
|
|
Bucket string
|
|
Prefix string
|
|
AccessKeyID string
|
|
SecretAccessKey string
|
|
Region string
|
|
}
|
|
|
|
// NewClient creates a new S3 client with the provided configuration.
|
|
// It establishes a connection to the S3-compatible storage service and
|
|
// validates the credentials. The client uses static credentials and
|
|
// path-style URLs for compatibility with various S3-compatible services.
|
|
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
|
// Create AWS config
|
|
awsCfg, err := config.LoadDefaultConfig(ctx,
|
|
config.WithRegion(cfg.Region),
|
|
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
|
cfg.AccessKeyID,
|
|
cfg.SecretAccessKey,
|
|
"",
|
|
)),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Configure custom endpoint if provided
|
|
s3Opts := func(o *s3.Options) {
|
|
if cfg.Endpoint != "" {
|
|
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
|
o.UsePathStyle = true
|
|
}
|
|
}
|
|
|
|
s3Client := s3.NewFromConfig(awsCfg, s3Opts)
|
|
|
|
return &Client{
|
|
s3Client: s3Client,
|
|
bucket: cfg.Bucket,
|
|
prefix: cfg.Prefix,
|
|
endpoint: cfg.Endpoint,
|
|
}, nil
|
|
}
|
|
|
|
// PutObject uploads an object to S3 with the specified key.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// The data parameter should be a reader containing the object data.
|
|
// Returns an error if the upload fails.
|
|
func (c *Client) PutObject(ctx context.Context, key string, data io.Reader) error {
|
|
fullKey := c.prefix + key
|
|
_, err := c.s3Client.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
Body: data,
|
|
})
|
|
return err
|
|
}
|
|
|
|
// ProgressCallback is called during upload progress with bytes uploaded so far.
|
|
// The callback should return an error to cancel the upload.
|
|
type ProgressCallback func(bytesUploaded int64) error
|
|
|
|
// PutObjectWithProgress uploads an object to S3 with progress tracking.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// The size parameter must be the exact size of the data to upload.
|
|
// The progress callback is called periodically with the number of bytes uploaded.
|
|
// Returns an error if the upload fails.
|
|
func (c *Client) PutObjectWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress ProgressCallback) error {
|
|
fullKey := c.prefix + key
|
|
|
|
// Create an uploader with the S3 client
|
|
uploader := manager.NewUploader(c.s3Client, func(u *manager.Uploader) {
|
|
// Set part size to 10MB for better progress granularity
|
|
u.PartSize = 10 * 1024 * 1024
|
|
})
|
|
|
|
// Create a progress reader that tracks upload progress
|
|
pr := &progressReader{
|
|
reader: data,
|
|
size: size,
|
|
callback: progress,
|
|
read: 0,
|
|
}
|
|
|
|
// Upload the file
|
|
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
Body: pr,
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// GetObject downloads an object from S3 with the specified key.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// Returns a ReadCloser containing the object data. The caller must
|
|
// close the returned reader when done to avoid resource leaks.
|
|
func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, error) {
|
|
fullKey := c.prefix + key
|
|
result, err := c.s3Client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.Body, nil
|
|
}
|
|
|
|
// DeleteObject removes an object from S3 with the specified key.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// No error is returned if the object doesn't exist.
|
|
func (c *Client) DeleteObject(ctx context.Context, key string) error {
|
|
fullKey := c.prefix + key
|
|
_, err := c.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
})
|
|
return err
|
|
}
|
|
|
|
// ListObjects lists all objects with the given prefix.
|
|
// The prefix is combined with the client's configured prefix.
|
|
// Returns a slice of object keys with the base prefix removed.
|
|
// This method loads all matching keys into memory, so use
|
|
// ListObjectsStream for large result sets.
|
|
func (c *Client) ListObjects(ctx context.Context, prefix string) ([]string, error) {
|
|
fullPrefix := c.prefix + prefix
|
|
|
|
var keys []string
|
|
paginator := s3.NewListObjectsV2Paginator(c.s3Client, &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(c.bucket),
|
|
Prefix: aws.String(fullPrefix),
|
|
})
|
|
|
|
for paginator.HasMorePages() {
|
|
page, err := paginator.NextPage(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, obj := range page.Contents {
|
|
if obj.Key != nil {
|
|
// Remove the base prefix from the key
|
|
key := *obj.Key
|
|
if len(key) > len(c.prefix) {
|
|
key = key[len(c.prefix):]
|
|
}
|
|
keys = append(keys, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
return keys, nil
|
|
}
|
|
|
|
// HeadObject checks if an object exists in S3.
|
|
// Returns true if the object exists, false otherwise.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// Note: This method returns false for any error, not just "not found".
|
|
func (c *Client) HeadObject(ctx context.Context, key string) (bool, error) {
|
|
fullKey := c.prefix + key
|
|
_, err := c.s3Client.HeadObject(ctx, &s3.HeadObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
})
|
|
if err != nil {
|
|
// Check if it's a not found error
|
|
// TODO: Add proper error type checking
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// ObjectInfo contains information about an S3 object.
|
|
// It is used by ListObjectsStream to return object metadata
|
|
// along with any errors encountered during listing.
|
|
type ObjectInfo struct {
|
|
Key string
|
|
Size int64
|
|
Err error
|
|
}
|
|
|
|
// ListObjectsStream lists objects with the given prefix and returns a channel.
|
|
// This method is preferred for large result sets as it streams results
|
|
// instead of loading everything into memory. The channel is closed when
|
|
// listing is complete or an error occurs. If an error occurs, it will be
|
|
// sent as the last item with the Err field set. The recursive parameter
|
|
// is currently unused but reserved for future use.
|
|
func (c *Client) ListObjectsStream(ctx context.Context, prefix string, recursive bool) <-chan ObjectInfo {
|
|
ch := make(chan ObjectInfo)
|
|
|
|
go func() {
|
|
defer close(ch)
|
|
|
|
fullPrefix := c.prefix + prefix
|
|
|
|
paginator := s3.NewListObjectsV2Paginator(c.s3Client, &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(c.bucket),
|
|
Prefix: aws.String(fullPrefix),
|
|
})
|
|
|
|
for paginator.HasMorePages() {
|
|
page, err := paginator.NextPage(ctx)
|
|
if err != nil {
|
|
ch <- ObjectInfo{Err: err}
|
|
return
|
|
}
|
|
|
|
for _, obj := range page.Contents {
|
|
if obj.Key != nil && obj.Size != nil {
|
|
// Remove the base prefix from the key
|
|
key := *obj.Key
|
|
if len(key) > len(c.prefix) {
|
|
key = key[len(c.prefix):]
|
|
}
|
|
ch <- ObjectInfo{
|
|
Key: key,
|
|
Size: *obj.Size,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
// StatObject returns information about an object without downloading it.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// Returns an ObjectInfo struct with the object's metadata.
|
|
// Returns an error if the object doesn't exist or if the operation fails.
|
|
func (c *Client) StatObject(ctx context.Context, key string) (*ObjectInfo, error) {
|
|
fullKey := c.prefix + key
|
|
result, err := c.s3Client.HeadObject(ctx, &s3.HeadObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
size := int64(0)
|
|
if result.ContentLength != nil {
|
|
size = *result.ContentLength
|
|
}
|
|
|
|
return &ObjectInfo{
|
|
Key: key,
|
|
Size: size,
|
|
}, nil
|
|
}
|
|
|
|
// RemoveObject deletes an object from S3 (alias for DeleteObject).
|
|
// This method exists for API compatibility and simply calls DeleteObject.
|
|
func (c *Client) RemoveObject(ctx context.Context, key string) error {
|
|
return c.DeleteObject(ctx, key)
|
|
}
|
|
|
|
// BucketName returns the configured S3 bucket name.
|
|
// This is useful for displaying configuration information.
|
|
func (c *Client) BucketName() string {
|
|
return c.bucket
|
|
}
|
|
|
|
// Endpoint returns the S3 endpoint URL.
|
|
// If no custom endpoint was configured, returns the default AWS S3 endpoint.
|
|
// This is useful for displaying configuration information.
|
|
func (c *Client) Endpoint() string {
|
|
if c.endpoint == "" {
|
|
return "s3.amazonaws.com"
|
|
}
|
|
return c.endpoint
|
|
}
|
|
|
|
// progressReader wraps an io.Reader to track reading progress
|
|
type progressReader struct {
|
|
reader io.Reader
|
|
size int64
|
|
read int64
|
|
callback ProgressCallback
|
|
}
|
|
|
|
// Read implements io.Reader
|
|
func (pr *progressReader) Read(p []byte) (int, error) {
|
|
n, err := pr.reader.Read(p)
|
|
if n > 0 {
|
|
atomic.AddInt64(&pr.read, int64(n))
|
|
if pr.callback != nil {
|
|
if callbackErr := pr.callback(atomic.LoadInt64(&pr.read)); callbackErr != nil {
|
|
return n, callbackErr
|
|
}
|
|
}
|
|
}
|
|
return n, err
|
|
}
|