package s3 import ( "context" "io" "sync/atomic" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" ) // Client wraps the AWS S3 client for vaultik operations. // It provides a simplified interface for S3 operations with automatic // prefix handling and connection management. All operations are performed // within the configured bucket and prefix. type Client struct { s3Client *s3.Client bucket string prefix string endpoint string } // Config contains S3 client configuration. // All fields are required except Prefix, which defaults to an empty string. // The Endpoint field should include the protocol (http:// or https://). type Config struct { Endpoint string Bucket string Prefix string AccessKeyID string SecretAccessKey string Region string } // NewClient creates a new S3 client with the provided configuration. // It establishes a connection to the S3-compatible storage service and // validates the credentials. The client uses static credentials and // path-style URLs for compatibility with various S3-compatible services. func NewClient(ctx context.Context, cfg Config) (*Client, error) { // Create AWS config awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(cfg.Region), config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( cfg.AccessKeyID, cfg.SecretAccessKey, "", )), ) if err != nil { return nil, err } // Configure custom endpoint if provided s3Opts := func(o *s3.Options) { if cfg.Endpoint != "" { o.BaseEndpoint = aws.String(cfg.Endpoint) o.UsePathStyle = true } } s3Client := s3.NewFromConfig(awsCfg, s3Opts) return &Client{ s3Client: s3Client, bucket: cfg.Bucket, prefix: cfg.Prefix, endpoint: cfg.Endpoint, }, nil } // PutObject uploads an object to S3 with the specified key. // The key is automatically prefixed with the configured prefix. // The data parameter should be a reader containing the object data. // Returns an error if the upload fails. func (c *Client) PutObject(ctx context.Context, key string, data io.Reader) error { fullKey := c.prefix + key _, err := c.s3Client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(fullKey), Body: data, }) return err } // ProgressCallback is called during upload progress with bytes uploaded so far. // The callback should return an error to cancel the upload. type ProgressCallback func(bytesUploaded int64) error // PutObjectWithProgress uploads an object to S3 with progress tracking. // The key is automatically prefixed with the configured prefix. // The size parameter must be the exact size of the data to upload. // The progress callback is called periodically with the number of bytes uploaded. // Returns an error if the upload fails. func (c *Client) PutObjectWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress ProgressCallback) error { fullKey := c.prefix + key // Create an uploader with the S3 client uploader := manager.NewUploader(c.s3Client, func(u *manager.Uploader) { // Set part size to 10MB for better progress granularity u.PartSize = 10 * 1024 * 1024 }) // Create a progress reader that tracks upload progress pr := &progressReader{ reader: data, size: size, callback: progress, read: 0, } // Upload the file _, err := uploader.Upload(ctx, &s3.PutObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(fullKey), Body: pr, }) return err } // GetObject downloads an object from S3 with the specified key. // The key is automatically prefixed with the configured prefix. // Returns a ReadCloser containing the object data. The caller must // close the returned reader when done to avoid resource leaks. func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, error) { fullKey := c.prefix + key result, err := c.s3Client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(fullKey), }) if err != nil { return nil, err } return result.Body, nil } // DeleteObject removes an object from S3 with the specified key. // The key is automatically prefixed with the configured prefix. // No error is returned if the object doesn't exist. func (c *Client) DeleteObject(ctx context.Context, key string) error { fullKey := c.prefix + key _, err := c.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(fullKey), }) return err } // ListObjects lists all objects with the given prefix. // The prefix is combined with the client's configured prefix. // Returns a slice of object keys with the base prefix removed. // This method loads all matching keys into memory, so use // ListObjectsStream for large result sets. func (c *Client) ListObjects(ctx context.Context, prefix string) ([]string, error) { fullPrefix := c.prefix + prefix var keys []string paginator := s3.NewListObjectsV2Paginator(c.s3Client, &s3.ListObjectsV2Input{ Bucket: aws.String(c.bucket), Prefix: aws.String(fullPrefix), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { return nil, err } for _, obj := range page.Contents { if obj.Key != nil { // Remove the base prefix from the key key := *obj.Key if len(key) > len(c.prefix) { key = key[len(c.prefix):] } keys = append(keys, key) } } } return keys, nil } // HeadObject checks if an object exists in S3. // Returns true if the object exists, false otherwise. // The key is automatically prefixed with the configured prefix. // Note: This method returns false for any error, not just "not found". func (c *Client) HeadObject(ctx context.Context, key string) (bool, error) { fullKey := c.prefix + key _, err := c.s3Client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(fullKey), }) if err != nil { // Check if it's a not found error // TODO: Add proper error type checking return false, nil } return true, nil } // ObjectInfo contains information about an S3 object. // It is used by ListObjectsStream to return object metadata // along with any errors encountered during listing. type ObjectInfo struct { Key string Size int64 Err error } // ListObjectsStream lists objects with the given prefix and returns a channel. // This method is preferred for large result sets as it streams results // instead of loading everything into memory. The channel is closed when // listing is complete or an error occurs. If an error occurs, it will be // sent as the last item with the Err field set. The recursive parameter // is currently unused but reserved for future use. func (c *Client) ListObjectsStream(ctx context.Context, prefix string, recursive bool) <-chan ObjectInfo { ch := make(chan ObjectInfo) go func() { defer close(ch) fullPrefix := c.prefix + prefix paginator := s3.NewListObjectsV2Paginator(c.s3Client, &s3.ListObjectsV2Input{ Bucket: aws.String(c.bucket), Prefix: aws.String(fullPrefix), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { ch <- ObjectInfo{Err: err} return } for _, obj := range page.Contents { if obj.Key != nil && obj.Size != nil { // Remove the base prefix from the key key := *obj.Key if len(key) > len(c.prefix) { key = key[len(c.prefix):] } ch <- ObjectInfo{ Key: key, Size: *obj.Size, } } } } }() return ch } // StatObject returns information about an object without downloading it. // The key is automatically prefixed with the configured prefix. // Returns an ObjectInfo struct with the object's metadata. // Returns an error if the object doesn't exist or if the operation fails. func (c *Client) StatObject(ctx context.Context, key string) (*ObjectInfo, error) { fullKey := c.prefix + key result, err := c.s3Client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(fullKey), }) if err != nil { return nil, err } size := int64(0) if result.ContentLength != nil { size = *result.ContentLength } return &ObjectInfo{ Key: key, Size: size, }, nil } // RemoveObject deletes an object from S3 (alias for DeleteObject). // This method exists for API compatibility and simply calls DeleteObject. func (c *Client) RemoveObject(ctx context.Context, key string) error { return c.DeleteObject(ctx, key) } // BucketName returns the configured S3 bucket name. // This is useful for displaying configuration information. func (c *Client) BucketName() string { return c.bucket } // Endpoint returns the S3 endpoint URL. // If no custom endpoint was configured, returns the default AWS S3 endpoint. // This is useful for displaying configuration information. func (c *Client) Endpoint() string { if c.endpoint == "" { return "s3.amazonaws.com" } return c.endpoint } // progressReader wraps an io.Reader to track reading progress type progressReader struct { reader io.Reader size int64 read int64 callback ProgressCallback } // Read implements io.Reader func (pr *progressReader) Read(p []byte) (int, error) { n, err := pr.reader.Read(p) if n > 0 { atomic.AddInt64(&pr.read, int64(n)) if pr.callback != nil { if callbackErr := pr.callback(atomic.LoadInt64(&pr.read)); callbackErr != nil { return n, callbackErr } } } return n, err }