diff --git a/go.mod b/go.mod index 7231b4d..37f362d 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.12.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.1 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect + github.com/adrg/xdg v0.5.3 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/aws/aws-sdk-go v1.44.256 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect diff --git a/go.sum b/go.sum index f1f5b8b..75c59bc 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1/go.mo github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs= github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78= +github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= diff --git a/internal/cli/app.go b/internal/cli/app.go index d627cb8..54cd838 100644 --- a/internal/cli/app.go +++ b/internal/cli/app.go @@ -2,9 +2,11 @@ package cli import ( "context" + "errors" "fmt" "os" "os/signal" + "path/filepath" "syscall" "time" @@ -12,9 +14,11 @@ import ( "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/globals" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/pidlock" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "git.eeqj.de/sneak/vaultik/internal/vaultik" + "github.com/adrg/xdg" "go.uber.org/fx" ) @@ -51,7 +55,7 @@ func NewApp(opts AppOptions) *fx.App { config.Module, database.Module, log.Module, - s3.Module, + storage.Module, snapshot.Module, fx.Provide(vaultik.New), fx.Invoke(setupGlobals), @@ -118,7 +122,23 @@ func RunApp(ctx context.Context, app *fx.App) error { // RunWithApp is a helper that creates and runs an fx app with the given options. // It combines NewApp and RunApp into a single convenient function. This is the // preferred way to run CLI commands that need the full application context. +// It acquires a PID lock before starting to prevent concurrent instances. func RunWithApp(ctx context.Context, opts AppOptions) error { + // Acquire PID lock to prevent concurrent instances + lockDir := filepath.Join(xdg.DataHome, "berlin.sneak.app.vaultik") + lock, err := pidlock.Acquire(lockDir) + if err != nil { + if errors.Is(err, pidlock.ErrAlreadyRunning) { + return fmt.Errorf("cannot start: %w", err) + } + return fmt.Errorf("failed to acquire lock: %w", err) + } + defer func() { + if err := lock.Release(); err != nil { + log.Warn("Failed to release PID lock", "error", err) + } + }() + app := NewApp(opts) return RunApp(ctx, app) } diff --git a/internal/cli/fetch.go b/internal/cli/fetch.go index 8b2634c..e9204d1 100644 --- a/internal/cli/fetch.go +++ b/internal/cli/fetch.go @@ -8,8 +8,8 @@ import ( "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/globals" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/cobra" "go.uber.org/fx" ) @@ -23,7 +23,7 @@ type FetchApp struct { Globals *globals.Globals Config *config.Config Repositories *database.Repositories - S3Client *s3.Client + Storage storage.Storer DB *database.DB Shutdowner fx.Shutdowner } @@ -61,15 +61,14 @@ The age_secret_key must be configured in the config file for decryption.`, }, Modules: []fx.Option{ snapshot.Module, - s3.Module, fx.Provide(fx.Annotate( func(g *globals.Globals, cfg *config.Config, repos *database.Repositories, - s3Client *s3.Client, db *database.DB, shutdowner fx.Shutdowner) *FetchApp { + storer storage.Storer, db *database.DB, shutdowner fx.Shutdowner) *FetchApp { return &FetchApp{ Globals: g, Config: cfg, Repositories: repos, - S3Client: s3Client, + Storage: storer, DB: db, Shutdowner: shutdowner, } diff --git a/internal/cli/restore.go b/internal/cli/restore.go index 3f34679..fa3d396 100644 --- a/internal/cli/restore.go +++ b/internal/cli/restore.go @@ -8,8 +8,8 @@ import ( "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/globals" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/cobra" "go.uber.org/fx" ) @@ -24,7 +24,7 @@ type RestoreApp struct { Globals *globals.Globals Config *config.Config Repositories *database.Repositories - S3Client *s3.Client + Storage storage.Storer DB *database.DB Shutdowner fx.Shutdowner } @@ -61,15 +61,14 @@ The age_secret_key must be configured in the config file for decryption.`, }, Modules: []fx.Option{ snapshot.Module, - s3.Module, fx.Provide(fx.Annotate( func(g *globals.Globals, cfg *config.Config, repos *database.Repositories, - s3Client *s3.Client, db *database.DB, shutdowner fx.Shutdowner) *RestoreApp { + storer storage.Storer, db *database.DB, shutdowner fx.Shutdowner) *RestoreApp { return &RestoreApp{ Globals: g, Config: cfg, Repositories: repos, - S3Client: s3Client, + Storage: storer, DB: db, Shutdowner: shutdowner, } diff --git a/internal/cli/store.go b/internal/cli/store.go index 55274fd..557f39b 100644 --- a/internal/cli/store.go +++ b/internal/cli/store.go @@ -7,14 +7,14 @@ import ( "time" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/cobra" "go.uber.org/fx" ) // StoreApp contains dependencies for store commands type StoreApp struct { - S3Client *s3.Client + Storage storage.Storer Shutdowner fx.Shutdowner } @@ -48,19 +48,18 @@ func newStoreInfoCommand() *cobra.Command { // Info displays storage information func (app *StoreApp) Info(ctx context.Context) error { - // Get bucket info - bucketName := app.S3Client.BucketName() - endpoint := app.S3Client.Endpoint() + // Get storage info + storageInfo := app.Storage.Info() fmt.Printf("Storage Information\n") fmt.Printf("==================\n\n") - fmt.Printf("S3 Configuration:\n") - fmt.Printf(" Endpoint: %s\n", endpoint) - fmt.Printf(" Bucket: %s\n\n", bucketName) + fmt.Printf("Storage Configuration:\n") + fmt.Printf(" Type: %s\n", storageInfo.Type) + fmt.Printf(" Location: %s\n\n", storageInfo.Location) // Count snapshots by listing metadata/ prefix snapshotCount := 0 - snapshotCh := app.S3Client.ListObjectsStream(ctx, "metadata/", true) + snapshotCh := app.Storage.ListStream(ctx, "metadata/") snapshotDirs := make(map[string]bool) for object := range snapshotCh { @@ -79,7 +78,7 @@ func (app *StoreApp) Info(ctx context.Context) error { blobCount := 0 var totalSize int64 - blobCh := app.S3Client.ListObjectsStream(ctx, "blobs/", false) + blobCh := app.Storage.ListStream(ctx, "blobs/") for object := range blobCh { if object.Err != nil { return fmt.Errorf("listing blobs: %w", object.Err) @@ -130,10 +129,9 @@ func runWithApp(ctx context.Context, fn func(*StoreApp) error) error { Debug: rootFlags.Debug, }, Modules: []fx.Option{ - s3.Module, - fx.Provide(func(s3Client *s3.Client, shutdowner fx.Shutdowner) *StoreApp { + fx.Provide(func(storer storage.Storer, shutdowner fx.Shutdowner) *StoreApp { return &StoreApp{ - S3Client: s3Client, + Storage: storer, Shutdowner: shutdowner, } }), diff --git a/internal/config/config.go b/internal/config/config.go index 6d5e53c..03c872a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,16 +3,43 @@ package config import ( "fmt" "os" + "path/filepath" + "strings" "time" "git.eeqj.de/sneak/smartconfig" + "github.com/adrg/xdg" "go.uber.org/fx" "gopkg.in/yaml.v3" ) +const appName = "berlin.sneak.app.vaultik" + +// expandTilde expands ~ at the start of a path to the user's home directory. +func expandTilde(path string) string { + if path == "~" { + home, _ := os.UserHomeDir() + return home + } + if strings.HasPrefix(path, "~/") { + home, _ := os.UserHomeDir() + return filepath.Join(home, path[2:]) + } + return path +} + +// expandTildeInURL expands ~ in file:// URLs. +func expandTildeInURL(url string) string { + if strings.HasPrefix(url, "file://~/") { + home, _ := os.UserHomeDir() + return "file://" + filepath.Join(home, url[9:]) + } + return url +} + // Config represents the application configuration for Vaultik. // It defines all settings for backup operations, including source directories, -// encryption recipients, S3 storage configuration, and performance tuning parameters. +// encryption recipients, storage configuration, and performance tuning parameters. // Configuration is typically loaded from a YAML file. type Config struct { AgeRecipients []string `yaml:"age_recipients"` @@ -28,6 +55,14 @@ type Config struct { S3 S3Config `yaml:"s3"` SourceDirs []string `yaml:"source_dirs"` CompressionLevel int `yaml:"compression_level"` + + // StorageURL specifies the storage backend using a URL format. + // Takes precedence over S3Config if set. + // Supported formats: + // - s3://bucket/prefix?endpoint=host®ion=us-east-1 + // - file:///path/to/backup + // For S3 URLs, credentials are still read from s3.access_key_id and s3.secret_access_key. + StorageURL string `yaml:"storage_url"` } // S3Config represents S3 storage configuration for backup storage. @@ -84,7 +119,7 @@ func Load(path string) (*Config, error) { BackupInterval: 1 * time.Hour, FullScanInterval: 24 * time.Hour, MinTimeBetweenRun: 15 * time.Minute, - IndexPath: "/var/lib/vaultik/index.sqlite", + IndexPath: filepath.Join(xdg.DataHome, appName, "index.sqlite"), CompressionLevel: 3, } @@ -99,9 +134,16 @@ func Load(path string) (*Config, error) { return nil, fmt.Errorf("failed to parse config: %w", err) } + // Expand tilde in all path fields + cfg.IndexPath = expandTilde(cfg.IndexPath) + cfg.StorageURL = expandTildeInURL(cfg.StorageURL) + for i, dir := range cfg.SourceDirs { + cfg.SourceDirs[i] = expandTilde(dir) + } + // Check for environment variable override for IndexPath if envIndexPath := os.Getenv("VAULTIK_INDEX_PATH"); envIndexPath != "" { - cfg.IndexPath = envIndexPath + cfg.IndexPath = expandTilde(envIndexPath) } // Get hostname if not set @@ -132,7 +174,7 @@ func Load(path string) (*Config, error) { // It ensures all required fields are present and have valid values: // - At least one age recipient must be specified // - At least one source directory must be configured -// - S3 credentials and endpoint must be provided +// - Storage must be configured (either storage_url or s3.* fields) // - Chunk size must be at least 1MB // - Blob size limit must be at least the chunk size // - Compression level must be between 1 and 19 @@ -146,20 +188,9 @@ func (c *Config) Validate() error { return fmt.Errorf("at least one source directory is required") } - if c.S3.Endpoint == "" { - return fmt.Errorf("s3.endpoint is required") - } - - if c.S3.Bucket == "" { - return fmt.Errorf("s3.bucket is required") - } - - if c.S3.AccessKeyID == "" { - return fmt.Errorf("s3.access_key_id is required") - } - - if c.S3.SecretAccessKey == "" { - return fmt.Errorf("s3.secret_access_key is required") + // Validate storage configuration + if err := c.validateStorage(); err != nil { + return err } if c.ChunkSize.Int64() < 1024*1024 { // 1MB minimum @@ -177,6 +208,50 @@ func (c *Config) Validate() error { return nil } +// validateStorage validates storage configuration. +// If StorageURL is set, it takes precedence. S3 URLs require credentials. +// File URLs don't require any S3 configuration. +// If StorageURL is not set, legacy S3 configuration is required. +func (c *Config) validateStorage() error { + if c.StorageURL != "" { + // URL-based configuration + if strings.HasPrefix(c.StorageURL, "file://") { + // File storage doesn't need S3 credentials + return nil + } + if strings.HasPrefix(c.StorageURL, "s3://") { + // S3 storage needs credentials + if c.S3.AccessKeyID == "" { + return fmt.Errorf("s3.access_key_id is required for s3:// URLs") + } + if c.S3.SecretAccessKey == "" { + return fmt.Errorf("s3.secret_access_key is required for s3:// URLs") + } + return nil + } + return fmt.Errorf("storage_url must start with s3:// or file://") + } + + // Legacy S3 configuration + if c.S3.Endpoint == "" { + return fmt.Errorf("s3.endpoint is required (or set storage_url)") + } + + if c.S3.Bucket == "" { + return fmt.Errorf("s3.bucket is required (or set storage_url)") + } + + if c.S3.AccessKeyID == "" { + return fmt.Errorf("s3.access_key_id is required") + } + + if c.S3.SecretAccessKey == "" { + return fmt.Errorf("s3.secret_access_key is required") + } + + return nil +} + // Module exports the config module for fx dependency injection. // It provides the Config type to other modules in the application. var Module = fx.Module("config", diff --git a/internal/pidlock/pidlock.go b/internal/pidlock/pidlock.go new file mode 100644 index 0000000..dfe0306 --- /dev/null +++ b/internal/pidlock/pidlock.go @@ -0,0 +1,108 @@ +// Package pidlock provides process-level locking using PID files. +// It prevents multiple instances of vaultik from running simultaneously, +// which would cause database locking conflicts. +package pidlock + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "syscall" +) + +// ErrAlreadyRunning indicates another vaultik instance is running. +var ErrAlreadyRunning = errors.New("another vaultik instance is already running") + +// Lock represents an acquired PID lock. +type Lock struct { + path string +} + +// Acquire attempts to acquire a PID lock in the specified directory. +// If the lock file exists and the process is still running, it returns +// ErrAlreadyRunning with details about the existing process. +// On success, it writes the current PID to the lock file and returns +// a Lock that must be released with Release(). +func Acquire(lockDir string) (*Lock, error) { + // Ensure lock directory exists + if err := os.MkdirAll(lockDir, 0700); err != nil { + return nil, fmt.Errorf("creating lock directory: %w", err) + } + + lockPath := filepath.Join(lockDir, "vaultik.pid") + + // Check for existing lock + existingPID, err := readPIDFile(lockPath) + if err == nil { + // Lock file exists, check if process is running + if isProcessRunning(existingPID) { + return nil, fmt.Errorf("%w (PID %d)", ErrAlreadyRunning, existingPID) + } + // Process is not running, stale lock file - we can take over + } + + // Write our PID + pid := os.Getpid() + if err := os.WriteFile(lockPath, []byte(strconv.Itoa(pid)), 0600); err != nil { + return nil, fmt.Errorf("writing PID file: %w", err) + } + + return &Lock{path: lockPath}, nil +} + +// Release removes the PID lock file. +// It is safe to call Release multiple times. +func (l *Lock) Release() error { + if l == nil || l.path == "" { + return nil + } + + // Verify we still own the lock (our PID is in the file) + existingPID, err := readPIDFile(l.path) + if err != nil { + // File already gone or unreadable - that's fine + return nil + } + + if existingPID != os.Getpid() { + // Someone else wrote to our lock file - don't remove it + return nil + } + + if err := os.Remove(l.path); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("removing PID file: %w", err) + } + + l.path = "" // Prevent double-release + return nil +} + +// readPIDFile reads and parses the PID from a lock file. +func readPIDFile(path string) (int, error) { + data, err := os.ReadFile(path) + if err != nil { + return 0, err + } + + pid, err := strconv.Atoi(strings.TrimSpace(string(data))) + if err != nil { + return 0, fmt.Errorf("parsing PID: %w", err) + } + + return pid, nil +} + +// isProcessRunning checks if a process with the given PID is running. +func isProcessRunning(pid int) bool { + process, err := os.FindProcess(pid) + if err != nil { + return false + } + + // On Unix, FindProcess always succeeds. We need to send signal 0 to check. + err = process.Signal(syscall.Signal(0)) + return err == nil +} diff --git a/internal/pidlock/pidlock_test.go b/internal/pidlock/pidlock_test.go new file mode 100644 index 0000000..d256ee1 --- /dev/null +++ b/internal/pidlock/pidlock_test.go @@ -0,0 +1,108 @@ +package pidlock + +import ( + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAcquireAndRelease(t *testing.T) { + tmpDir := t.TempDir() + + // Acquire lock + lock, err := Acquire(tmpDir) + require.NoError(t, err) + require.NotNil(t, lock) + + // Verify PID file exists with our PID + data, err := os.ReadFile(filepath.Join(tmpDir, "vaultik.pid")) + require.NoError(t, err) + pid, err := strconv.Atoi(string(data)) + require.NoError(t, err) + assert.Equal(t, os.Getpid(), pid) + + // Release lock + err = lock.Release() + require.NoError(t, err) + + // Verify PID file is gone + _, err = os.Stat(filepath.Join(tmpDir, "vaultik.pid")) + assert.True(t, os.IsNotExist(err)) +} + +func TestAcquireBlocksSecondInstance(t *testing.T) { + tmpDir := t.TempDir() + + // Acquire first lock + lock1, err := Acquire(tmpDir) + require.NoError(t, err) + require.NotNil(t, lock1) + defer func() { _ = lock1.Release() }() + + // Try to acquire second lock - should fail + lock2, err := Acquire(tmpDir) + assert.ErrorIs(t, err, ErrAlreadyRunning) + assert.Nil(t, lock2) +} + +func TestAcquireWithStaleLock(t *testing.T) { + tmpDir := t.TempDir() + + // Write a stale PID file (PID that doesn't exist) + stalePID := 999999999 // Unlikely to be a real process + pidPath := filepath.Join(tmpDir, "vaultik.pid") + err := os.WriteFile(pidPath, []byte(strconv.Itoa(stalePID)), 0600) + require.NoError(t, err) + + // Should be able to acquire lock (stale lock is cleaned up) + lock, err := Acquire(tmpDir) + require.NoError(t, err) + require.NotNil(t, lock) + defer func() { _ = lock.Release() }() + + // Verify our PID is now in the file + data, err := os.ReadFile(pidPath) + require.NoError(t, err) + pid, err := strconv.Atoi(string(data)) + require.NoError(t, err) + assert.Equal(t, os.Getpid(), pid) +} + +func TestReleaseIsIdempotent(t *testing.T) { + tmpDir := t.TempDir() + + lock, err := Acquire(tmpDir) + require.NoError(t, err) + + // Release multiple times - should not error + err = lock.Release() + require.NoError(t, err) + + err = lock.Release() + require.NoError(t, err) +} + +func TestReleaseNilLock(t *testing.T) { + var lock *Lock + err := lock.Release() + assert.NoError(t, err) +} + +func TestAcquireCreatesDirectory(t *testing.T) { + tmpDir := t.TempDir() + nestedDir := filepath.Join(tmpDir, "nested", "dir") + + lock, err := Acquire(nestedDir) + require.NoError(t, err) + require.NotNil(t, lock) + defer func() { _ = lock.Release() }() + + // Verify directory was created + info, err := os.Stat(nestedDir) + require.NoError(t, err) + assert.True(t, info.IsDir()) +} diff --git a/internal/snapshot/module.go b/internal/snapshot/module.go index fde5e43..9beea61 100644 --- a/internal/snapshot/module.go +++ b/internal/snapshot/module.go @@ -3,7 +3,7 @@ package snapshot import ( "git.eeqj.de/sneak/vaultik/internal/config" "git.eeqj.de/sneak/vaultik/internal/database" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/afero" "go.uber.org/fx" ) @@ -27,13 +27,13 @@ var Module = fx.Module("backup", // ScannerFactory creates scanners with custom parameters type ScannerFactory func(params ScannerParams) *Scanner -func provideScannerFactory(cfg *config.Config, repos *database.Repositories, s3Client *s3.Client) ScannerFactory { +func provideScannerFactory(cfg *config.Config, repos *database.Repositories, storer storage.Storer) ScannerFactory { return func(params ScannerParams) *Scanner { return NewScanner(ScannerConfig{ FS: params.Fs, ChunkSize: cfg.ChunkSize.Int64(), Repositories: repos, - S3Client: s3Client, + Storage: storer, MaxBlobSize: cfg.BlobSizeLimit.Int64(), CompressionLevel: cfg.CompressionLevel, AgeRecipients: cfg.AgeRecipients, diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index 004a11b..007d2a2 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "io" "os" "strings" "sync" @@ -14,7 +13,7 @@ import ( "git.eeqj.de/sneak/vaultik/internal/chunker" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/dustin/go-humanize" "github.com/spf13/afero" ) @@ -32,7 +31,7 @@ type Scanner struct { chunker *chunker.Chunker packer *blob.Packer repos *database.Repositories - s3Client S3Client + storage storage.Storer maxBlobSize int64 compressionLevel int ageRecipient string @@ -46,19 +45,12 @@ type Scanner struct { scanCtx context.Context } -// S3Client interface for blob storage operations -type S3Client interface { - PutObject(ctx context.Context, key string, data io.Reader) error - PutObjectWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress s3.ProgressCallback) error - StatObject(ctx context.Context, key string) (*s3.ObjectInfo, error) -} - // ScannerConfig contains configuration for the scanner type ScannerConfig struct { FS afero.Fs ChunkSize int64 Repositories *database.Repositories - S3Client S3Client + Storage storage.Storer MaxBlobSize int64 CompressionLevel int AgeRecipients []string // Optional, empty means no encryption @@ -111,7 +103,7 @@ func NewScanner(cfg ScannerConfig) *Scanner { chunker: chunker.NewChunker(cfg.ChunkSize), packer: packer, repos: cfg.Repositories, - s3Client: cfg.S3Client, + storage: cfg.Storage, maxBlobSize: cfg.MaxBlobSize, compressionLevel: cfg.CompressionLevel, ageRecipient: strings.Join(cfg.AgeRecipients, ","), @@ -128,11 +120,11 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc } // Set blob handler for concurrent upload - if s.s3Client != nil { - log.Debug("Setting blob handler for S3 uploads") + if s.storage != nil { + log.Debug("Setting blob handler for storage uploads") s.packer.SetBlobHandler(s.handleBlobReady) } else { - log.Debug("No S3 client configured, blobs will not be uploaded") + log.Debug("No storage configured, blobs will not be uploaded") } // Start progress reporting if enabled @@ -141,14 +133,23 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc defer s.progress.Stop() } - // Phase 0: Check for deleted files from previous snapshots - if err := s.detectDeletedFiles(ctx, path, result); err != nil { + // Phase 0: Quick enumeration of all files on disk + fmt.Println("Enumerating files...") + existingFiles, err := s.enumerateFiles(ctx, path) + if err != nil && err != context.Canceled { + log.Warn("Failed to enumerate files", "error", err) + existingFiles = make(map[string]struct{}) + } + fmt.Printf("Found %s files\n", formatNumber(len(existingFiles))) + + // Phase 0b: Check for deleted files by comparing DB against enumerated set (no filesystem access) + if err := s.detectDeletedFiles(ctx, path, existingFiles, result); err != nil { return nil, fmt.Errorf("detecting deleted files: %w", err) } // Phase 1: Scan directory and collect files to process log.Info("Phase 1/3: Scanning directory structure") - filesToProcess, err := s.scanPhase(ctx, path, result) + filesToProcess, err := s.scanPhase(ctx, path, result, existingFiles) if err != nil { return nil, fmt.Errorf("scan phase failed: %w", err) } @@ -216,16 +217,78 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc return result, nil } +// enumerateFiles performs a quick enumeration to get all file paths without expensive stat() calls +// Returns a set of all file paths found on disk +func (s *Scanner) enumerateFiles(ctx context.Context, path string) (map[string]struct{}, error) { + files := make(map[string]struct{}) + startTime := time.Now() + lastStatusTime := time.Now() + statusInterval := 5 * time.Second + + var enumDir func(dirPath string) error + enumDir = func(dirPath string) error { + // Check context cancellation + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + f, err := s.fs.Open(dirPath) + if err != nil { + return nil // Skip directories we can't open + } + defer func() { _ = f.Close() }() + + for { + // Read directory entries in batches + entries, err := f.Readdir(1000) + if err != nil { + break // End of directory or error + } + + for _, entry := range entries { + fullPath := dirPath + "/" + entry.Name() + if entry.IsDir() { + if err := enumDir(fullPath); err != nil { + return err + } + } else if entry.Mode().IsRegular() { + files[fullPath] = struct{}{} + } + } + + // Periodic status update + if time.Since(lastStatusTime) >= statusInterval { + elapsed := time.Since(startTime).Round(time.Second) + fmt.Printf("Enumerating files: %s found (%s elapsed)\n", + formatNumber(len(files)), elapsed) + lastStatusTime = time.Now() + } + } + + return nil + } + + if err := enumDir(path); err != nil { + return files, err + } + + return files, nil +} + // scanPhase performs the initial directory scan to identify files to process -func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult) ([]*FileToProcess, error) { +func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}) ([]*FileToProcess, error) { + totalFiles := int64(len(existingFiles)) + var filesToProcess []*FileToProcess var mu sync.Mutex // Set up periodic status output + startTime := time.Now() lastStatusTime := time.Now() statusInterval := 15 * time.Second var filesScanned int64 - var bytesScanned int64 log.Debug("Starting directory walk", "path", path) err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error { @@ -266,7 +329,6 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult // Update scan statistics if info.Mode().IsRegular() { filesScanned++ - bytesScanned += info.Size() } // Output periodic status @@ -275,9 +337,35 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult changedCount := len(filesToProcess) mu.Unlock() - fmt.Printf("Scan progress: %s files examined, %s changed\n", - formatNumber(int(filesScanned)), - formatNumber(changedCount)) + elapsed := time.Since(startTime) + rate := float64(filesScanned) / elapsed.Seconds() + + // Build status line + if totalFiles > 0 { + pct := float64(filesScanned) / float64(totalFiles) * 100 + remaining := totalFiles - filesScanned + var eta time.Duration + if rate > 0 { + eta = time.Duration(float64(remaining)/rate) * time.Second + } + fmt.Printf("Scan: %s/%s files (%.1f%%), %s changed/new, %.0f files/sec, %s elapsed", + formatNumber(int(filesScanned)), + formatNumber(int(totalFiles)), + pct, + formatNumber(changedCount), + rate, + elapsed.Round(time.Second)) + if eta > 0 { + fmt.Printf(", ETA %s", eta.Round(time.Second)) + } + fmt.Println() + } else { + fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n", + formatNumber(int(filesScanned)), + formatNumber(changedCount), + rate, + elapsed.Round(time.Second)) + } lastStatusTime = time.Now() } @@ -345,8 +433,8 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc } s.packerMu.Unlock() - // If no S3 client, store any remaining blobs - if s.s3Client == nil { + // If no storage configured, store any remaining blobs locally + if s.storage == nil { blobs := s.packer.GetFinishedBlobs() for _, b := range blobs { // Blob metadata is already stored incrementally during packing @@ -573,7 +661,7 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed) } - // Upload to S3 first (without holding any locks) + // Upload to storage first (without holding any locks) // Use scan context for cancellation support ctx := s.scanCtx if ctx == nil { @@ -585,7 +673,6 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { lastProgressBytes := int64(0) progressCallback := func(uploaded int64) error { - // Calculate instantaneous speed now := time.Now() elapsed := now.Sub(lastProgressTime).Seconds() @@ -612,15 +699,15 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { // Create sharded path: blobs/ca/fe/cafebabe... blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash) - if err := s.s3Client.PutObjectWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { - return fmt.Errorf("uploading blob %s to S3: %w", finishedBlob.Hash, err) + if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { + return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err) } uploadDuration := time.Since(startTime) // Log upload stats uploadSpeed := float64(finishedBlob.Compressed) * 8 / uploadDuration.Seconds() // bits per second - log.Info("Successfully uploaded blob to S3 storage", + log.Info("Successfully uploaded blob to storage", "path", blobPath, "size", humanize.Bytes(uint64(finishedBlob.Compressed)), "duration", uploadDuration, @@ -861,17 +948,31 @@ func (s *Scanner) GetProgress() *ProgressReporter { } // detectDeletedFiles finds files that existed in previous snapshots but no longer exist -func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, result *ScanResult) error { +// Uses the pre-enumerated existingFiles set to avoid additional filesystem access +func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, existingFiles map[string]struct{}, result *ScanResult) error { // Get all files with this path prefix from the database - files, err := s.repos.Files.ListByPrefix(ctx, path) + knownFiles, err := s.repos.Files.ListByPrefix(ctx, path) if err != nil { return fmt.Errorf("listing files by prefix: %w", err) } - for _, file := range files { - // Check if the file still exists on disk - _, err := s.fs.Stat(file.Path) - if os.IsNotExist(err) { + if len(knownFiles) == 0 { + return nil + } + + fmt.Printf("Checking %s known files for deletions...\n", formatNumber(len(knownFiles))) + + // Check each known file against the enumerated set (no filesystem access needed) + for _, file := range knownFiles { + // Check context cancellation + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Check if the file exists in our enumerated set + if _, exists := existingFiles[file.Path]; !exists { // File has been deleted result.FilesDeleted++ result.BytesDeleted += file.Size @@ -879,6 +980,10 @@ func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, result *S } } + if result.FilesDeleted > 0 { + fmt.Printf("Found %s deleted files\n", formatNumber(result.FilesDeleted)) + } + return nil } diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go index 5a19b94..7a6e769 100644 --- a/internal/snapshot/snapshot.go +++ b/internal/snapshot/snapshot.go @@ -52,7 +52,7 @@ import ( "git.eeqj.de/sneak/vaultik/internal/config" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/dustin/go-humanize" "github.com/spf13/afero" "go.uber.org/fx" @@ -60,27 +60,27 @@ import ( // SnapshotManager handles snapshot creation and metadata export type SnapshotManager struct { - repos *database.Repositories - s3Client S3Client - config *config.Config - fs afero.Fs + repos *database.Repositories + storage storage.Storer + config *config.Config + fs afero.Fs } // SnapshotManagerParams holds dependencies for NewSnapshotManager type SnapshotManagerParams struct { fx.In - Repos *database.Repositories - S3Client *s3.Client - Config *config.Config + Repos *database.Repositories + Storage storage.Storer + Config *config.Config } // NewSnapshotManager creates a new snapshot manager for dependency injection func NewSnapshotManager(params SnapshotManagerParams) *SnapshotManager { return &SnapshotManager{ - repos: params.Repos, - s3Client: params.S3Client, - config: params.Config, + repos: params.Repos, + storage: params.Storage, + config: params.Config, } } @@ -268,7 +268,7 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st dbKey := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID) dbUploadStart := time.Now() - if err := sm.s3Client.PutObject(ctx, dbKey, bytes.NewReader(finalData)); err != nil { + if err := sm.storage.Put(ctx, dbKey, bytes.NewReader(finalData)); err != nil { return fmt.Errorf("uploading snapshot database: %w", err) } dbUploadDuration := time.Since(dbUploadStart) @@ -282,7 +282,7 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st // Upload blob manifest (compressed only, not encrypted) manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) manifestUploadStart := time.Now() - if err := sm.s3Client.PutObject(ctx, manifestKey, bytes.NewReader(blobManifest)); err != nil { + if err := sm.storage.Put(ctx, manifestKey, bytes.NewReader(blobManifest)); err != nil { return fmt.Errorf("uploading blob manifest: %w", err) } manifestUploadDuration := time.Since(manifestUploadStart) @@ -635,11 +635,11 @@ func (sm *SnapshotManager) CleanupIncompleteSnapshots(ctx context.Context, hostn log.Info("Found incomplete snapshots", "count", len(incompleteSnapshots)) - // Check each incomplete snapshot for metadata in S3 + // Check each incomplete snapshot for metadata in storage for _, snapshot := range incompleteSnapshots { - // Check if metadata exists in S3 + // Check if metadata exists in storage metadataKey := fmt.Sprintf("metadata/%s/db.zst", snapshot.ID) - _, err := sm.s3Client.StatObject(ctx, metadataKey) + _, err := sm.storage.Stat(ctx, metadataKey) if err != nil { // Metadata doesn't exist in S3 - this is an incomplete snapshot diff --git a/internal/storage/file.go b/internal/storage/file.go new file mode 100644 index 0000000..a5b3ae8 --- /dev/null +++ b/internal/storage/file.go @@ -0,0 +1,262 @@ +package storage + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/spf13/afero" +) + +// FileStorer implements Storer using the local filesystem. +// It mirrors the S3 path structure for consistency. +type FileStorer struct { + fs afero.Fs + basePath string +} + +// NewFileStorer creates a new filesystem storage backend. +// The basePath directory will be created if it doesn't exist. +// Uses the real OS filesystem by default; call SetFilesystem to override for testing. +func NewFileStorer(basePath string) (*FileStorer, error) { + fs := afero.NewOsFs() + // Ensure base path exists + if err := fs.MkdirAll(basePath, 0755); err != nil { + return nil, fmt.Errorf("creating base path: %w", err) + } + return &FileStorer{ + fs: fs, + basePath: basePath, + }, nil +} + +// SetFilesystem overrides the filesystem for testing. +func (f *FileStorer) SetFilesystem(fs afero.Fs) { + f.fs = fs +} + +// fullPath returns the full filesystem path for a key. +func (f *FileStorer) fullPath(key string) string { + return filepath.Join(f.basePath, key) +} + +// Put stores data at the specified key. +func (f *FileStorer) Put(ctx context.Context, key string, data io.Reader) error { + path := f.fullPath(key) + + // Create parent directories + dir := filepath.Dir(path) + if err := f.fs.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("creating directories: %w", err) + } + + file, err := f.fs.Create(path) + if err != nil { + return fmt.Errorf("creating file: %w", err) + } + defer func() { _ = file.Close() }() + + if _, err := io.Copy(file, data); err != nil { + return fmt.Errorf("writing file: %w", err) + } + + return nil +} + +// PutWithProgress stores data with progress reporting. +func (f *FileStorer) PutWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress ProgressCallback) error { + path := f.fullPath(key) + + // Create parent directories + dir := filepath.Dir(path) + if err := f.fs.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("creating directories: %w", err) + } + + file, err := f.fs.Create(path) + if err != nil { + return fmt.Errorf("creating file: %w", err) + } + defer func() { _ = file.Close() }() + + // Wrap with progress tracking + pw := &progressWriter{ + writer: file, + callback: progress, + } + + if _, err := io.Copy(pw, data); err != nil { + return fmt.Errorf("writing file: %w", err) + } + + return nil +} + +// Get retrieves data from the specified key. +func (f *FileStorer) Get(ctx context.Context, key string) (io.ReadCloser, error) { + path := f.fullPath(key) + file, err := f.fs.Open(path) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("opening file: %w", err) + } + return file, nil +} + +// Stat returns metadata about an object without retrieving its contents. +func (f *FileStorer) Stat(ctx context.Context, key string) (*ObjectInfo, error) { + path := f.fullPath(key) + info, err := f.fs.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("stat file: %w", err) + } + return &ObjectInfo{ + Key: key, + Size: info.Size(), + }, nil +} + +// Delete removes an object. +func (f *FileStorer) Delete(ctx context.Context, key string) error { + path := f.fullPath(key) + err := f.fs.Remove(path) + if os.IsNotExist(err) { + return nil // Match S3 behavior: no error if doesn't exist + } + if err != nil { + return fmt.Errorf("removing file: %w", err) + } + return nil +} + +// List returns all keys with the given prefix. +func (f *FileStorer) List(ctx context.Context, prefix string) ([]string, error) { + var keys []string + basePath := f.fullPath(prefix) + + // Check if base path exists + exists, err := afero.Exists(f.fs, basePath) + if err != nil { + return nil, fmt.Errorf("checking path: %w", err) + } + if !exists { + return keys, nil // Empty list for non-existent prefix + } + + err = afero.Walk(f.fs, basePath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + // Check context cancellation + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if !info.IsDir() { + // Convert back to key (relative path from basePath) + relPath, err := filepath.Rel(f.basePath, path) + if err != nil { + return fmt.Errorf("computing relative path: %w", err) + } + // Normalize path separators to forward slashes for consistency + relPath = strings.ReplaceAll(relPath, string(filepath.Separator), "/") + keys = append(keys, relPath) + } + return nil + }) + + if err != nil { + return nil, fmt.Errorf("walking directory: %w", err) + } + + return keys, nil +} + +// ListStream returns a channel of ObjectInfo for large result sets. +func (f *FileStorer) ListStream(ctx context.Context, prefix string) <-chan ObjectInfo { + ch := make(chan ObjectInfo) + go func() { + defer close(ch) + basePath := f.fullPath(prefix) + + // Check if base path exists + exists, err := afero.Exists(f.fs, basePath) + if err != nil { + ch <- ObjectInfo{Err: fmt.Errorf("checking path: %w", err)} + return + } + if !exists { + return // Empty channel for non-existent prefix + } + + _ = afero.Walk(f.fs, basePath, func(path string, info os.FileInfo, err error) error { + // Check context cancellation + select { + case <-ctx.Done(): + ch <- ObjectInfo{Err: ctx.Err()} + return ctx.Err() + default: + } + + if err != nil { + ch <- ObjectInfo{Err: err} + return nil // Continue walking despite errors + } + + if !info.IsDir() { + relPath, err := filepath.Rel(f.basePath, path) + if err != nil { + ch <- ObjectInfo{Err: fmt.Errorf("computing relative path: %w", err)} + return nil + } + // Normalize path separators + relPath = strings.ReplaceAll(relPath, string(filepath.Separator), "/") + ch <- ObjectInfo{ + Key: relPath, + Size: info.Size(), + } + } + return nil + }) + }() + return ch +} + +// Info returns human-readable storage location information. +func (f *FileStorer) Info() StorageInfo { + return StorageInfo{ + Type: "file", + Location: f.basePath, + } +} + +// progressWriter wraps an io.Writer to track write progress. +type progressWriter struct { + writer io.Writer + written int64 + callback ProgressCallback +} + +func (pw *progressWriter) Write(p []byte) (int, error) { + n, err := pw.writer.Write(p) + if n > 0 { + pw.written += int64(n) + if pw.callback != nil { + if callbackErr := pw.callback(pw.written); callbackErr != nil { + return n, callbackErr + } + } + } + return n, err +} diff --git a/internal/storage/module.go b/internal/storage/module.go new file mode 100644 index 0000000..12b6d05 --- /dev/null +++ b/internal/storage/module.go @@ -0,0 +1,110 @@ +package storage + +import ( + "context" + "fmt" + "strings" + + "git.eeqj.de/sneak/vaultik/internal/config" + "git.eeqj.de/sneak/vaultik/internal/s3" + "go.uber.org/fx" +) + +// Module exports storage functionality as an fx module. +// It provides a Storer implementation based on the configured storage URL +// or falls back to legacy S3 configuration. +var Module = fx.Module("storage", + fx.Provide(NewStorer), +) + +// NewStorer creates a Storer based on configuration. +// If StorageURL is set, it uses URL-based configuration. +// Otherwise, it falls back to legacy S3 configuration. +func NewStorer(cfg *config.Config) (Storer, error) { + if cfg.StorageURL != "" { + return storerFromURL(cfg.StorageURL, cfg) + } + return storerFromLegacyS3Config(cfg) +} + +func storerFromURL(rawURL string, cfg *config.Config) (Storer, error) { + parsed, err := ParseStorageURL(rawURL) + if err != nil { + return nil, fmt.Errorf("parsing storage URL: %w", err) + } + + switch parsed.Scheme { + case "file": + return NewFileStorer(parsed.Prefix) + + case "s3": + // Build endpoint URL + endpoint := parsed.Endpoint + if endpoint == "" { + endpoint = "s3.amazonaws.com" + } + + // Add protocol if not present + if parsed.UseSSL && !strings.HasPrefix(endpoint, "https://") && !strings.HasPrefix(endpoint, "http://") { + endpoint = "https://" + endpoint + } else if !parsed.UseSSL && !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { + endpoint = "http://" + endpoint + } + + region := parsed.Region + if region == "" { + region = cfg.S3.Region + if region == "" { + region = "us-east-1" + } + } + + // Credentials come from config (not URL for security) + client, err := s3.NewClient(context.Background(), s3.Config{ + Endpoint: endpoint, + Bucket: parsed.Bucket, + Prefix: parsed.Prefix, + AccessKeyID: cfg.S3.AccessKeyID, + SecretAccessKey: cfg.S3.SecretAccessKey, + Region: region, + }) + if err != nil { + return nil, fmt.Errorf("creating S3 client: %w", err) + } + return NewS3Storer(client), nil + + default: + return nil, fmt.Errorf("unsupported storage scheme: %s", parsed.Scheme) + } +} + +func storerFromLegacyS3Config(cfg *config.Config) (Storer, error) { + endpoint := cfg.S3.Endpoint + + // Ensure protocol is present + if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { + if cfg.S3.UseSSL { + endpoint = "https://" + endpoint + } else { + endpoint = "http://" + endpoint + } + } + + region := cfg.S3.Region + if region == "" { + region = "us-east-1" + } + + client, err := s3.NewClient(context.Background(), s3.Config{ + Endpoint: endpoint, + Bucket: cfg.S3.Bucket, + Prefix: cfg.S3.Prefix, + AccessKeyID: cfg.S3.AccessKeyID, + SecretAccessKey: cfg.S3.SecretAccessKey, + Region: region, + }) + if err != nil { + return nil, fmt.Errorf("creating S3 client: %w", err) + } + return NewS3Storer(client), nil +} diff --git a/internal/storage/s3.go b/internal/storage/s3.go new file mode 100644 index 0000000..9d648fa --- /dev/null +++ b/internal/storage/s3.go @@ -0,0 +1,85 @@ +package storage + +import ( + "context" + "fmt" + "io" + + "git.eeqj.de/sneak/vaultik/internal/s3" +) + +// S3Storer wraps the existing s3.Client to implement Storer. +type S3Storer struct { + client *s3.Client +} + +// NewS3Storer creates a new S3 storage backend. +func NewS3Storer(client *s3.Client) *S3Storer { + return &S3Storer{client: client} +} + +// Put stores data at the specified key. +func (s *S3Storer) Put(ctx context.Context, key string, data io.Reader) error { + return s.client.PutObject(ctx, key, data) +} + +// PutWithProgress stores data with progress reporting. +func (s *S3Storer) PutWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress ProgressCallback) error { + // Convert storage.ProgressCallback to s3.ProgressCallback + var s3Progress s3.ProgressCallback + if progress != nil { + s3Progress = s3.ProgressCallback(progress) + } + return s.client.PutObjectWithProgress(ctx, key, data, size, s3Progress) +} + +// Get retrieves data from the specified key. +func (s *S3Storer) Get(ctx context.Context, key string) (io.ReadCloser, error) { + return s.client.GetObject(ctx, key) +} + +// Stat returns metadata about an object without retrieving its contents. +func (s *S3Storer) Stat(ctx context.Context, key string) (*ObjectInfo, error) { + info, err := s.client.StatObject(ctx, key) + if err != nil { + return nil, err + } + return &ObjectInfo{ + Key: info.Key, + Size: info.Size, + }, nil +} + +// Delete removes an object. +func (s *S3Storer) Delete(ctx context.Context, key string) error { + return s.client.DeleteObject(ctx, key) +} + +// List returns all keys with the given prefix. +func (s *S3Storer) List(ctx context.Context, prefix string) ([]string, error) { + return s.client.ListObjects(ctx, prefix) +} + +// ListStream returns a channel of ObjectInfo for large result sets. +func (s *S3Storer) ListStream(ctx context.Context, prefix string) <-chan ObjectInfo { + ch := make(chan ObjectInfo) + go func() { + defer close(ch) + for info := range s.client.ListObjectsStream(ctx, prefix, false) { + ch <- ObjectInfo{ + Key: info.Key, + Size: info.Size, + Err: info.Err, + } + } + }() + return ch +} + +// Info returns human-readable storage location information. +func (s *S3Storer) Info() StorageInfo { + return StorageInfo{ + Type: "s3", + Location: fmt.Sprintf("%s/%s", s.client.Endpoint(), s.client.BucketName()), + } +} diff --git a/internal/storage/storer.go b/internal/storage/storer.go new file mode 100644 index 0000000..9cd4e25 --- /dev/null +++ b/internal/storage/storer.go @@ -0,0 +1,74 @@ +// Package storage provides a unified interface for storage backends. +// It supports both S3-compatible object storage and local filesystem storage, +// allowing Vaultik to store backups in either location with the same API. +// +// Storage backends are selected via URL: +// - s3://bucket/prefix?endpoint=host®ion=r - S3-compatible storage +// - file:///path/to/backup - Local filesystem storage +// +// Both backends implement the Storer interface and support progress reporting +// during upload/write operations. +package storage + +import ( + "context" + "errors" + "io" +) + +// ErrNotFound is returned when an object does not exist. +var ErrNotFound = errors.New("object not found") + +// ProgressCallback is called during storage operations with bytes transferred so far. +// Return an error to cancel the operation. +type ProgressCallback func(bytesTransferred int64) error + +// ObjectInfo contains metadata about a stored object. +type ObjectInfo struct { + Key string // Object key/path + Size int64 // Size in bytes + Err error // Error for streaming results (nil on success) +} + +// StorageInfo provides human-readable storage configuration. +type StorageInfo struct { + Type string // "s3" or "file" + Location string // endpoint/bucket for S3, base path for filesystem +} + +// Storer defines the interface for storage backends. +// All paths are relative to the storage root (bucket/prefix for S3, base directory for filesystem). +type Storer interface { + // Put stores data at the specified key. + // Parent directories are created automatically for filesystem backends. + Put(ctx context.Context, key string, data io.Reader) error + + // PutWithProgress stores data with progress reporting. + // Size must be the exact size of the data to store. + // The progress callback is called periodically with bytes transferred. + PutWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress ProgressCallback) error + + // Get retrieves data from the specified key. + // The caller must close the returned ReadCloser. + // Returns ErrNotFound if the object does not exist. + Get(ctx context.Context, key string) (io.ReadCloser, error) + + // Stat returns metadata about an object without retrieving its contents. + // Returns ErrNotFound if the object does not exist. + Stat(ctx context.Context, key string) (*ObjectInfo, error) + + // Delete removes an object. No error is returned if the object doesn't exist. + Delete(ctx context.Context, key string) error + + // List returns all keys with the given prefix. + // For large result sets, prefer ListStream. + List(ctx context.Context, prefix string) ([]string, error) + + // ListStream returns a channel of ObjectInfo for large result sets. + // The channel is closed when listing completes. + // If an error occurs during listing, the final item will have Err set. + ListStream(ctx context.Context, prefix string) <-chan ObjectInfo + + // Info returns human-readable storage location information. + Info() StorageInfo +} diff --git a/internal/storage/url.go b/internal/storage/url.go new file mode 100644 index 0000000..900890d --- /dev/null +++ b/internal/storage/url.go @@ -0,0 +1,90 @@ +package storage + +import ( + "fmt" + "net/url" + "strings" +) + +// StorageURL represents a parsed storage URL. +type StorageURL struct { + Scheme string // "s3" or "file" + Bucket string // S3 bucket name (empty for file) + Prefix string // Path within bucket or filesystem base path + Endpoint string // S3 endpoint (optional, default AWS) + Region string // S3 region (optional) + UseSSL bool // Use HTTPS for S3 (default true) +} + +// ParseStorageURL parses a storage URL string. +// Supported formats: +// - s3://bucket/prefix?endpoint=host®ion=us-east-1&ssl=true +// - file:///absolute/path/to/backup +func ParseStorageURL(rawURL string) (*StorageURL, error) { + if rawURL == "" { + return nil, fmt.Errorf("storage URL is empty") + } + + // Handle file:// URLs + if strings.HasPrefix(rawURL, "file://") { + path := strings.TrimPrefix(rawURL, "file://") + if path == "" { + return nil, fmt.Errorf("file URL path is empty") + } + return &StorageURL{ + Scheme: "file", + Prefix: path, + }, nil + } + + // Handle s3:// URLs + if strings.HasPrefix(rawURL, "s3://") { + u, err := url.Parse(rawURL) + if err != nil { + return nil, fmt.Errorf("invalid URL: %w", err) + } + + bucket := u.Host + if bucket == "" { + return nil, fmt.Errorf("s3 URL missing bucket name") + } + + prefix := strings.TrimPrefix(u.Path, "/") + + query := u.Query() + useSSL := true + if query.Get("ssl") == "false" { + useSSL = false + } + + return &StorageURL{ + Scheme: "s3", + Bucket: bucket, + Prefix: prefix, + Endpoint: query.Get("endpoint"), + Region: query.Get("region"), + UseSSL: useSSL, + }, nil + } + + return nil, fmt.Errorf("unsupported URL scheme: must start with s3:// or file://") +} + +// String returns a human-readable representation of the storage URL. +func (u *StorageURL) String() string { + switch u.Scheme { + case "file": + return fmt.Sprintf("file://%s", u.Prefix) + case "s3": + endpoint := u.Endpoint + if endpoint == "" { + endpoint = "s3.amazonaws.com" + } + if u.Prefix != "" { + return fmt.Sprintf("s3://%s/%s (endpoint: %s)", u.Bucket, u.Prefix, endpoint) + } + return fmt.Sprintf("s3://%s (endpoint: %s)", u.Bucket, endpoint) + default: + return fmt.Sprintf("%s://?", u.Scheme) + } +} diff --git a/internal/vaultik/integration_test.go b/internal/vaultik/integration_test.go index d99186f..92b03b1 100644 --- a/internal/vaultik/integration_test.go +++ b/internal/vaultik/integration_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "database/sql" - "fmt" "io" "sync" "testing" @@ -13,100 +12,122 @@ import ( "git.eeqj.de/sneak/vaultik/internal/config" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// MockS3Client implements a mock S3 client for testing -type MockS3Client struct { - mu sync.Mutex - storage map[string][]byte - calls []string +// MockStorer implements storage.Storer for testing +type MockStorer struct { + mu sync.Mutex + data map[string][]byte + calls []string } -func NewMockS3Client() *MockS3Client { - return &MockS3Client{ - storage: make(map[string][]byte), - calls: make([]string, 0), +func NewMockStorer() *MockStorer { + return &MockStorer{ + data: make(map[string][]byte), + calls: make([]string, 0), } } -func (m *MockS3Client) PutObject(ctx context.Context, key string, reader io.Reader) error { +func (m *MockStorer) Put(ctx context.Context, key string, reader io.Reader) error { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "PutObject:"+key) + m.calls = append(m.calls, "Put:"+key) data, err := io.ReadAll(reader) if err != nil { return err } - m.storage[key] = data + m.data[key] = data return nil } -func (m *MockS3Client) PutObjectWithProgress(ctx context.Context, key string, reader io.Reader, size int64, progress s3.ProgressCallback) error { - // For testing, just call PutObject - return m.PutObject(ctx, key, reader) +func (m *MockStorer) PutWithProgress(ctx context.Context, key string, reader io.Reader, size int64, progress storage.ProgressCallback) error { + return m.Put(ctx, key, reader) } -func (m *MockS3Client) GetObject(ctx context.Context, key string) (io.ReadCloser, error) { +func (m *MockStorer) Get(ctx context.Context, key string) (io.ReadCloser, error) { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "GetObject:"+key) - data, exists := m.storage[key] + m.calls = append(m.calls, "Get:"+key) + data, exists := m.data[key] if !exists { - return nil, fmt.Errorf("key not found: %s", key) + return nil, storage.ErrNotFound } return io.NopCloser(bytes.NewReader(data)), nil } -func (m *MockS3Client) StatObject(ctx context.Context, key string) (*s3.ObjectInfo, error) { +func (m *MockStorer) Stat(ctx context.Context, key string) (*storage.ObjectInfo, error) { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "StatObject:"+key) - data, exists := m.storage[key] + m.calls = append(m.calls, "Stat:"+key) + data, exists := m.data[key] if !exists { - return nil, fmt.Errorf("key not found: %s", key) + return nil, storage.ErrNotFound } - return &s3.ObjectInfo{ + return &storage.ObjectInfo{ Key: key, Size: int64(len(data)), }, nil } -func (m *MockS3Client) DeleteObject(ctx context.Context, key string) error { +func (m *MockStorer) Delete(ctx context.Context, key string) error { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "DeleteObject:"+key) - delete(m.storage, key) + m.calls = append(m.calls, "Delete:"+key) + delete(m.data, key) return nil } -func (m *MockS3Client) ListObjects(ctx context.Context, prefix string) ([]*s3.ObjectInfo, error) { +func (m *MockStorer) List(ctx context.Context, prefix string) ([]string, error) { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "ListObjects:"+prefix) - var objects []*s3.ObjectInfo - for key, data := range m.storage { + m.calls = append(m.calls, "List:"+prefix) + var keys []string + for key := range m.data { if len(prefix) == 0 || (len(key) >= len(prefix) && key[:len(prefix)] == prefix) { - objects = append(objects, &s3.ObjectInfo{ - Key: key, - Size: int64(len(data)), - }) + keys = append(keys, key) } } - return objects, nil + return keys, nil } -// GetCalls returns the list of S3 operations that were called -func (m *MockS3Client) GetCalls() []string { +func (m *MockStorer) ListStream(ctx context.Context, prefix string) <-chan storage.ObjectInfo { + ch := make(chan storage.ObjectInfo) + go func() { + defer close(ch) + m.mu.Lock() + defer m.mu.Unlock() + + for key, data := range m.data { + if len(prefix) == 0 || (len(key) >= len(prefix) && key[:len(prefix)] == prefix) { + ch <- storage.ObjectInfo{ + Key: key, + Size: int64(len(data)), + } + } + } + }() + return ch +} + +func (m *MockStorer) Info() storage.StorageInfo { + return storage.StorageInfo{ + Type: "mock", + Location: "memory", + } +} + +// GetCalls returns the list of operations that were called +func (m *MockStorer) GetCalls() []string { m.mu.Lock() defer m.mu.Unlock() @@ -116,11 +137,11 @@ func (m *MockS3Client) GetCalls() []string { } // GetStorageSize returns the number of objects in storage -func (m *MockS3Client) GetStorageSize() int { +func (m *MockStorer) GetStorageSize() int { m.mu.Lock() defer m.mu.Unlock() - return len(m.storage) + return len(m.data) } // TestEndToEndBackup tests the full backup workflow with mocked dependencies @@ -158,8 +179,8 @@ func TestEndToEndBackup(t *testing.T) { } } - // Create mock S3 client - mockS3 := NewMockS3Client() + // Create mock storage + mockStorage := NewMockStorer() // Create test configuration cfg := &config.Config{ @@ -181,7 +202,7 @@ func TestEndToEndBackup(t *testing.T) { } // For a true end-to-end test, we'll create a simpler test that focuses on - // the core backup logic using the scanner directly with our mock S3 client + // the core backup logic using the scanner directly with our mock storage ctx := context.Background() // Create in-memory database @@ -195,12 +216,12 @@ func TestEndToEndBackup(t *testing.T) { repos := database.NewRepositories(db) - // Create scanner with mock S3 client + // Create scanner with mock storage scanner := snapshot.NewScanner(snapshot.ScannerConfig{ FS: fs, ChunkSize: cfg.ChunkSize.Int64(), Repositories: repos, - S3Client: mockS3, + Storage: mockStorage, MaxBlobSize: cfg.BlobSizeLimit.Int64(), CompressionLevel: cfg.CompressionLevel, AgeRecipients: cfg.AgeRecipients, @@ -232,15 +253,15 @@ func TestEndToEndBackup(t *testing.T) { assert.Greater(t, result.ChunksCreated, 0, "Should create chunks") assert.Greater(t, result.BlobsCreated, 0, "Should create blobs") - // Verify S3 operations - calls := mockS3.GetCalls() - t.Logf("S3 operations performed: %v", calls) + // Verify storage operations + calls := mockStorage.GetCalls() + t.Logf("Storage operations performed: %v", calls) // Should have uploaded at least one blob blobUploads := 0 for _, call := range calls { - if len(call) > 10 && call[:10] == "PutObject:" { - if len(call) > 16 && call[10:16] == "blobs/" { + if len(call) > 4 && call[:4] == "Put:" { + if len(call) > 10 && call[4:10] == "blobs/" { blobUploads++ } } @@ -264,8 +285,8 @@ func TestEndToEndBackup(t *testing.T) { require.NoError(t, err) assert.Greater(t, len(fileChunks), 0, "Should have chunks for file1.txt") - // Verify blobs were uploaded to S3 - assert.Greater(t, mockS3.GetStorageSize(), 0, "Should have blobs in S3 storage") + // Verify blobs were uploaded to storage + assert.Greater(t, mockStorage.GetStorageSize(), 0, "Should have blobs in storage") // Complete the snapshot - just verify we got results // In a real integration test, we'd update the snapshot record @@ -283,7 +304,7 @@ func TestEndToEndBackup(t *testing.T) { t.Logf(" Bytes scanned: %d", result.BytesScanned) t.Logf(" Chunks created: %d", result.ChunksCreated) t.Logf(" Blobs created: %d", result.BlobsCreated) - t.Logf(" S3 storage size: %d objects", mockS3.GetStorageSize()) + t.Logf(" Storage size: %d objects", mockStorage.GetStorageSize()) } // TestBackupAndVerify tests backing up files and verifying the blobs @@ -301,8 +322,8 @@ func TestBackupAndVerify(t *testing.T) { err = afero.WriteFile(fs, "/data/test.txt", []byte(testContent), 0644) require.NoError(t, err) - // Create mock S3 client - mockS3 := NewMockS3Client() + // Create mock storage + mockStorage := NewMockStorer() // Create test database ctx := context.Background() @@ -321,7 +342,7 @@ func TestBackupAndVerify(t *testing.T) { FS: fs, ChunkSize: int64(1024 * 16), // 16KB chunks Repositories: repos, - S3Client: mockS3, + Storage: mockStorage, MaxBlobSize: int64(1024 * 1024), // 1MB blobs CompressionLevel: 3, AgeRecipients: []string{"age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"}, // Test public key @@ -346,25 +367,25 @@ func TestBackupAndVerify(t *testing.T) { // Verify backup created blobs assert.Greater(t, result.BlobsCreated, 0, "Should create at least one blob") - assert.Equal(t, mockS3.GetStorageSize(), result.BlobsCreated, "S3 should have the blobs") + assert.Equal(t, mockStorage.GetStorageSize(), result.BlobsCreated, "Storage should have the blobs") - // Verify we can retrieve the blob from S3 - objects, err := mockS3.ListObjects(ctx, "blobs/") + // Verify we can retrieve the blob from storage + objects, err := mockStorage.List(ctx, "blobs/") require.NoError(t, err) - assert.Len(t, objects, result.BlobsCreated, "Should have correct number of blobs in S3") + assert.Len(t, objects, result.BlobsCreated, "Should have correct number of blobs in storage") // Get the first blob and verify it exists if len(objects) > 0 { - blobKey := objects[0].Key + blobKey := objects[0] t.Logf("Verifying blob: %s", blobKey) // Get blob info - blobInfo, err := mockS3.StatObject(ctx, blobKey) + blobInfo, err := mockStorage.Stat(ctx, blobKey) require.NoError(t, err) assert.Greater(t, blobInfo.Size, int64(0), "Blob should have content") // Get blob content - reader, err := mockS3.GetObject(ctx, blobKey) + reader, err := mockStorage.Get(ctx, blobKey) require.NoError(t, err) defer func() { _ = reader.Close() }() diff --git a/internal/vaultik/prune.go b/internal/vaultik/prune.go index 030098f..b4049dc 100644 --- a/internal/vaultik/prune.go +++ b/internal/vaultik/prune.go @@ -21,9 +21,9 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { allBlobsReferenced := make(map[string]bool) manifestCount := 0 - // List all snapshots in S3 + // List all snapshots in storage log.Info("Listing remote snapshots") - objectCh := v.S3Client.ListObjectsStream(v.ctx, "metadata/", false) + objectCh := v.Storage.ListStream(v.ctx, "metadata/") var snapshotIDs []string for object := range objectCh { @@ -73,10 +73,10 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { log.Info("Processed manifests", "count", manifestCount, "unique_blobs_referenced", len(allBlobsReferenced)) - // List all blobs in S3 + // List all blobs in storage log.Info("Listing all blobs in storage") allBlobs := make(map[string]int64) // hash -> size - blobObjectCh := v.S3Client.ListObjectsStream(v.ctx, "blobs/", true) + blobObjectCh := v.Storage.ListStream(v.ctx, "blobs/") for object := range blobObjectCh { if object.Err != nil { @@ -136,7 +136,7 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { for i, hash := range unreferencedBlobs { blobPath := fmt.Sprintf("blobs/%s/%s/%s", hash[:2], hash[2:4], hash) - if err := v.S3Client.RemoveObject(v.ctx, blobPath); err != nil { + if err := v.Storage.Delete(v.ctx, blobPath); err != nil { log.Error("Failed to delete blob", "hash", hash, "error", err) continue } diff --git a/internal/vaultik/snapshot.go b/internal/vaultik/snapshot.go index 97c317b..f6a865f 100644 --- a/internal/vaultik/snapshot.go +++ b/internal/vaultik/snapshot.go @@ -265,7 +265,7 @@ func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error { func (v *Vaultik) ListSnapshots(jsonOutput bool) error { // Get all remote snapshots remoteSnapshots := make(map[string]bool) - objectCh := v.S3Client.ListObjectsStream(v.ctx, "metadata/", false) + objectCh := v.Storage.ListStream(v.ctx, "metadata/") for object := range objectCh { if object.Err != nil { @@ -546,7 +546,7 @@ func (v *Vaultik) VerifySnapshot(snapshotID string, deep bool) error { return nil } else { // Just check existence - _, err := v.S3Client.StatObject(v.ctx, blobPath) + _, err := v.Storage.Stat(v.ctx, blobPath) if err != nil { fmt.Printf(" Missing: %s (%s)\n", blob.Hash, humanize.Bytes(uint64(blob.CompressedSize))) missing++ @@ -581,7 +581,7 @@ func (v *Vaultik) VerifySnapshot(snapshotID string, deep bool) error { func (v *Vaultik) getManifestSize(snapshotID string) (int64, error) { manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) - reader, err := v.S3Client.GetObject(v.ctx, manifestPath) + reader, err := v.Storage.Get(v.ctx, manifestPath) if err != nil { return 0, fmt.Errorf("downloading manifest: %w", err) } @@ -598,7 +598,7 @@ func (v *Vaultik) getManifestSize(snapshotID string) (int64, error) { func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error) { manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) - reader, err := v.S3Client.GetObject(v.ctx, manifestPath) + reader, err := v.Storage.Get(v.ctx, manifestPath) if err != nil { return nil, err } @@ -613,10 +613,10 @@ func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error } func (v *Vaultik) deleteSnapshot(snapshotID string) error { - // First, delete from S3 + // First, delete from storage // List all objects under metadata/{snapshotID}/ prefix := fmt.Sprintf("metadata/%s/", snapshotID) - objectCh := v.S3Client.ListObjectsStream(v.ctx, prefix, true) + objectCh := v.Storage.ListStream(v.ctx, prefix) var objectsToDelete []string for object := range objectCh { @@ -628,7 +628,7 @@ func (v *Vaultik) deleteSnapshot(snapshotID string) error { // Delete all objects for _, key := range objectsToDelete { - if err := v.S3Client.RemoveObject(v.ctx, key); err != nil { + if err := v.Storage.Delete(v.ctx, key); err != nil { return fmt.Errorf("removing %s: %w", key, err) } } @@ -658,7 +658,7 @@ func (v *Vaultik) syncWithRemote() error { // Get all remote snapshot IDs remoteSnapshots := make(map[string]bool) - objectCh := v.S3Client.ListObjectsStream(v.ctx, "metadata/", false) + objectCh := v.Storage.ListStream(v.ctx, "metadata/") for object := range objectCh { if object.Err != nil { diff --git a/internal/vaultik/vaultik.go b/internal/vaultik/vaultik.go index ae5f410..1fbcb4d 100644 --- a/internal/vaultik/vaultik.go +++ b/internal/vaultik/vaultik.go @@ -10,8 +10,8 @@ import ( "git.eeqj.de/sneak/vaultik/internal/crypto" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/globals" - "git.eeqj.de/sneak/vaultik/internal/s3" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/afero" "go.uber.org/fx" ) @@ -22,7 +22,7 @@ type Vaultik struct { Config *config.Config DB *database.DB Repositories *database.Repositories - S3Client *s3.Client + Storage storage.Storer ScannerFactory snapshot.ScannerFactory SnapshotManager *snapshot.SnapshotManager Shutdowner fx.Shutdowner @@ -46,7 +46,7 @@ type VaultikParams struct { Config *config.Config DB *database.DB Repositories *database.Repositories - S3Client *s3.Client + Storage storage.Storer ScannerFactory snapshot.ScannerFactory SnapshotManager *snapshot.SnapshotManager Shutdowner fx.Shutdowner @@ -72,7 +72,7 @@ func New(params VaultikParams) *Vaultik { Config: params.Config, DB: params.DB, Repositories: params.Repositories, - S3Client: params.S3Client, + Storage: params.Storage, ScannerFactory: params.ScannerFactory, SnapshotManager: params.SnapshotManager, Shutdowner: params.Shutdowner, diff --git a/internal/vaultik/verify.go b/internal/vaultik/verify.go index bb32054..b039c4c 100644 --- a/internal/vaultik/verify.go +++ b/internal/vaultik/verify.go @@ -36,7 +36,7 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) log.Info("Downloading manifest", "path", manifestPath) - manifestReader, err := v.S3Client.GetObject(v.ctx, manifestPath) + manifestReader, err := v.Storage.Get(v.ctx, manifestPath) if err != nil { return fmt.Errorf("failed to download manifest: %w", err) } @@ -57,7 +57,7 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID) log.Info("Downloading encrypted database", "path", dbPath) - dbReader, err := v.S3Client.GetObject(v.ctx, dbPath) + dbReader, err := v.Storage.Get(v.ctx, dbPath) if err != nil { return fmt.Errorf("failed to download database: %w", err) } @@ -236,10 +236,10 @@ func (v *Vaultik) verifyBlobExistence(manifest *snapshot.Manifest) error { // Construct blob path blobPath := fmt.Sprintf("blobs/%s/%s/%s", blob.Hash[:2], blob.Hash[2:4], blob.Hash) - // Check blob exists with HeadObject - stat, err := v.S3Client.StatObject(v.ctx, blobPath) + // Check blob exists + stat, err := v.Storage.Stat(v.ctx, blobPath) if err != nil { - return fmt.Errorf("blob %s missing from S3: %w", blob.Hash, err) + return fmt.Errorf("blob %s missing from storage: %w", blob.Hash, err) } // Verify size matches @@ -258,7 +258,7 @@ func (v *Vaultik) verifyBlobExistence(manifest *snapshot.Manifest) error { } } - log.Info("✓ All blobs exist in S3") + log.Info("✓ All blobs exist in storage") return nil } @@ -295,7 +295,7 @@ func (v *Vaultik) performDeepVerification(manifest *snapshot.Manifest, db *sql.D func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error { // Download blob blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobInfo.Hash[:2], blobInfo.Hash[2:4], blobInfo.Hash) - reader, err := v.S3Client.GetObject(v.ctx, blobPath) + reader, err := v.Storage.Get(v.ctx, blobPath) if err != nil { return fmt.Errorf("failed to download: %w", err) }