All checks were successful
check / check (push) Successful in 2m38s
Scanner now writes all user-facing output to an io.Writer (os.Stdout when progress is enabled, io.Discard in --cron mode). This fixes the long-standing issue where --cron still printed progress lines. S3 HeadObject now properly distinguishes not-found from other errors instead of swallowing all errors as not-found. Config/CLI error messages include actionable hints (where to find the config, how to generate keys, what storage options exist).
340 lines
9.9 KiB
Go
340 lines
9.9 KiB
Go
package s3
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"sync/atomic"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
"github.com/aws/smithy-go/logging"
|
|
)
|
|
|
|
// Client wraps the AWS S3 client for vaultik operations.
|
|
// It provides a simplified interface for S3 operations with automatic
|
|
// prefix handling and connection management. All operations are performed
|
|
// within the configured bucket and prefix.
|
|
type Client struct {
|
|
s3Client *s3.Client
|
|
bucket string
|
|
prefix string
|
|
endpoint string
|
|
}
|
|
|
|
// Config contains S3 client configuration.
|
|
// All fields are required except Prefix, which defaults to an empty string.
|
|
// The Endpoint field should include the protocol (http:// or https://).
|
|
type Config struct {
|
|
Endpoint string
|
|
Bucket string
|
|
Prefix string
|
|
AccessKeyID string
|
|
SecretAccessKey string
|
|
Region string
|
|
}
|
|
|
|
// nopLogger is a logger that discards all output.
|
|
// Used to suppress SDK warnings about checksums.
|
|
type nopLogger struct{}
|
|
|
|
func (nopLogger) Logf(classification logging.Classification, format string, v ...interface{}) {}
|
|
|
|
// NewClient creates a new S3 client with the provided configuration.
|
|
// It establishes a connection to the S3-compatible storage service and
|
|
// validates the credentials. The client uses static credentials and
|
|
// path-style URLs for compatibility with various S3-compatible services.
|
|
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
|
// Create AWS config with a nop logger to suppress SDK warnings
|
|
awsCfg, err := config.LoadDefaultConfig(ctx,
|
|
config.WithRegion(cfg.Region),
|
|
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
|
cfg.AccessKeyID,
|
|
cfg.SecretAccessKey,
|
|
"",
|
|
)),
|
|
config.WithLogger(nopLogger{}),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Configure custom endpoint if provided
|
|
s3Opts := func(o *s3.Options) {
|
|
if cfg.Endpoint != "" {
|
|
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
|
o.UsePathStyle = true
|
|
}
|
|
}
|
|
|
|
s3Client := s3.NewFromConfig(awsCfg, s3Opts)
|
|
|
|
return &Client{
|
|
s3Client: s3Client,
|
|
bucket: cfg.Bucket,
|
|
prefix: cfg.Prefix,
|
|
endpoint: cfg.Endpoint,
|
|
}, nil
|
|
}
|
|
|
|
// PutObject uploads an object to S3 with the specified key.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// The data parameter should be a reader containing the object data.
|
|
// Returns an error if the upload fails.
|
|
func (c *Client) PutObject(ctx context.Context, key string, data io.Reader) error {
|
|
fullKey := c.prefix + key
|
|
_, err := c.s3Client.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
Body: data,
|
|
})
|
|
return err
|
|
}
|
|
|
|
// ProgressCallback is called during upload progress with bytes uploaded so far.
|
|
// The callback should return an error to cancel the upload.
|
|
type ProgressCallback func(bytesUploaded int64) error
|
|
|
|
// PutObjectWithProgress uploads an object to S3 with progress tracking.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// The size parameter must be the exact size of the data to upload.
|
|
// The progress callback is called periodically with the number of bytes uploaded.
|
|
// Returns an error if the upload fails.
|
|
func (c *Client) PutObjectWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress ProgressCallback) error {
|
|
fullKey := c.prefix + key
|
|
|
|
// Create an uploader with the S3 client
|
|
uploader := manager.NewUploader(c.s3Client, func(u *manager.Uploader) {
|
|
// Set part size to 10MB for better progress granularity
|
|
u.PartSize = 10 * 1024 * 1024
|
|
})
|
|
|
|
// Create a progress reader that tracks upload progress
|
|
pr := &progressReader{
|
|
reader: data,
|
|
size: size,
|
|
callback: progress,
|
|
read: 0,
|
|
}
|
|
|
|
// Upload the file
|
|
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
Body: pr,
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// GetObject downloads an object from S3 with the specified key.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// Returns a ReadCloser containing the object data. The caller must
|
|
// close the returned reader when done to avoid resource leaks.
|
|
func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, error) {
|
|
fullKey := c.prefix + key
|
|
result, err := c.s3Client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.Body, nil
|
|
}
|
|
|
|
// DeleteObject removes an object from S3 with the specified key.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// No error is returned if the object doesn't exist.
|
|
func (c *Client) DeleteObject(ctx context.Context, key string) error {
|
|
fullKey := c.prefix + key
|
|
_, err := c.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
})
|
|
return err
|
|
}
|
|
|
|
// ListObjects lists all objects with the given prefix.
|
|
// The prefix is combined with the client's configured prefix.
|
|
// Returns a slice of object keys with the base prefix removed.
|
|
// This method loads all matching keys into memory, so use
|
|
// ListObjectsStream for large result sets.
|
|
func (c *Client) ListObjects(ctx context.Context, prefix string) ([]string, error) {
|
|
fullPrefix := c.prefix + prefix
|
|
|
|
var keys []string
|
|
paginator := s3.NewListObjectsV2Paginator(c.s3Client, &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(c.bucket),
|
|
Prefix: aws.String(fullPrefix),
|
|
})
|
|
|
|
for paginator.HasMorePages() {
|
|
page, err := paginator.NextPage(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, obj := range page.Contents {
|
|
if obj.Key != nil {
|
|
// Remove the base prefix from the key
|
|
key := *obj.Key
|
|
if len(key) > len(c.prefix) {
|
|
key = key[len(c.prefix):]
|
|
}
|
|
keys = append(keys, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
return keys, nil
|
|
}
|
|
|
|
// HeadObject checks if an object exists in S3.
|
|
// Returns true if the object exists, false otherwise.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// Note: This method returns false for any error, not just "not found".
|
|
func (c *Client) HeadObject(ctx context.Context, key string) (bool, error) {
|
|
fullKey := c.prefix + key
|
|
_, err := c.s3Client.HeadObject(ctx, &s3.HeadObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
})
|
|
if err != nil {
|
|
var notFound *s3types.NotFound
|
|
var noSuchKey *s3types.NoSuchKey
|
|
if errors.As(err, ¬Found) || errors.As(err, &noSuchKey) {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// ObjectInfo contains information about an S3 object.
|
|
// It is used by ListObjectsStream to return object metadata
|
|
// along with any errors encountered during listing.
|
|
type ObjectInfo struct {
|
|
Key string
|
|
Size int64
|
|
Err error
|
|
}
|
|
|
|
// ListObjectsStream lists objects with the given prefix and returns a channel.
|
|
// This method is preferred for large result sets as it streams results
|
|
// instead of loading everything into memory. The channel is closed when
|
|
// listing is complete or an error occurs. If an error occurs, it will be
|
|
// sent as the last item with the Err field set. The recursive parameter
|
|
// is currently unused but reserved for future use.
|
|
func (c *Client) ListObjectsStream(ctx context.Context, prefix string, recursive bool) <-chan ObjectInfo {
|
|
ch := make(chan ObjectInfo)
|
|
|
|
go func() {
|
|
defer close(ch)
|
|
|
|
fullPrefix := c.prefix + prefix
|
|
|
|
paginator := s3.NewListObjectsV2Paginator(c.s3Client, &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(c.bucket),
|
|
Prefix: aws.String(fullPrefix),
|
|
})
|
|
|
|
for paginator.HasMorePages() {
|
|
page, err := paginator.NextPage(ctx)
|
|
if err != nil {
|
|
ch <- ObjectInfo{Err: err}
|
|
return
|
|
}
|
|
|
|
for _, obj := range page.Contents {
|
|
if obj.Key != nil && obj.Size != nil {
|
|
// Remove the base prefix from the key
|
|
key := *obj.Key
|
|
if len(key) > len(c.prefix) {
|
|
key = key[len(c.prefix):]
|
|
}
|
|
ch <- ObjectInfo{
|
|
Key: key,
|
|
Size: *obj.Size,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
// StatObject returns information about an object without downloading it.
|
|
// The key is automatically prefixed with the configured prefix.
|
|
// Returns an ObjectInfo struct with the object's metadata.
|
|
// Returns an error if the object doesn't exist or if the operation fails.
|
|
func (c *Client) StatObject(ctx context.Context, key string) (*ObjectInfo, error) {
|
|
fullKey := c.prefix + key
|
|
result, err := c.s3Client.HeadObject(ctx, &s3.HeadObjectInput{
|
|
Bucket: aws.String(c.bucket),
|
|
Key: aws.String(fullKey),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
size := int64(0)
|
|
if result.ContentLength != nil {
|
|
size = *result.ContentLength
|
|
}
|
|
|
|
return &ObjectInfo{
|
|
Key: key,
|
|
Size: size,
|
|
}, nil
|
|
}
|
|
|
|
// RemoveObject deletes an object from S3 (alias for DeleteObject).
|
|
// This method exists for API compatibility and simply calls DeleteObject.
|
|
func (c *Client) RemoveObject(ctx context.Context, key string) error {
|
|
return c.DeleteObject(ctx, key)
|
|
}
|
|
|
|
// BucketName returns the configured S3 bucket name.
|
|
// This is useful for displaying configuration information.
|
|
func (c *Client) BucketName() string {
|
|
return c.bucket
|
|
}
|
|
|
|
// Endpoint returns the S3 endpoint URL.
|
|
// If no custom endpoint was configured, returns the default AWS S3 endpoint.
|
|
// This is useful for displaying configuration information.
|
|
func (c *Client) Endpoint() string {
|
|
if c.endpoint == "" {
|
|
return "s3.amazonaws.com"
|
|
}
|
|
return c.endpoint
|
|
}
|
|
|
|
// progressReader wraps an io.Reader to track reading progress
|
|
type progressReader struct {
|
|
reader io.Reader
|
|
size int64
|
|
read int64
|
|
callback ProgressCallback
|
|
}
|
|
|
|
// Read implements io.Reader
|
|
func (pr *progressReader) Read(p []byte) (int, error) {
|
|
n, err := pr.reader.Read(p)
|
|
if n > 0 {
|
|
atomic.AddInt64(&pr.read, int64(n))
|
|
if pr.callback != nil {
|
|
if callbackErr := pr.callback(atomic.LoadInt64(&pr.read)); callbackErr != nil {
|
|
return n, callbackErr
|
|
}
|
|
}
|
|
}
|
|
return n, err
|
|
}
|