diff --git a/CLAUDE.md b/CLAUDE.md index 42f26aa..cec4213 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,6 +10,9 @@ Read the rules in AGENTS.md and follow them. corporate advertising for Anthropic and is therefore completely unacceptable in commit messages. +* NEVER use `git add -A`. Always add only the files you intentionally + changed. + * Tests should always be run before committing code. No commits should be made that do not pass tests. @@ -33,6 +36,9 @@ Read the rules in AGENTS.md and follow them. * When testing on a 2.5Gbit/s ethernet to an s3 server backed by 2000MB/sec SSD, estimate about 4 seconds per gigabyte of backup time. -* When running tests, don't run individual tests, or grep the output. run the entire test suite every time and read the full output. +* When running tests, don't run individual tests, or grep the output. run + the entire test suite every time and read the full output. -* When running tests, don't run individual tests, or try to grep the output. never run "go test". only ever run "make test" to run the full test suite, and examine the full output. \ No newline at end of file +* When running tests, don't run individual tests, or try to grep the output. + never run "go test". only ever run "make test" to run the full test + suite, and examine the full output. diff --git a/PROCESS.md b/PROCESS.md new file mode 100644 index 0000000..356b90e --- /dev/null +++ b/PROCESS.md @@ -0,0 +1,556 @@ +# Vaultik Snapshot Creation Process + +This document describes the lifecycle of objects during snapshot creation, with a focus on database transactions and foreign key constraints. + +## Database Schema Overview + +### Tables and Foreign Key Dependencies + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ FOREIGN KEY GRAPH │ +│ │ +│ snapshots ◄────── snapshot_files ────────► files │ +│ │ │ │ +│ └───────── snapshot_blobs ────────► blobs │ │ +│ │ │ │ +│ │ ├──► file_chunks ◄── chunks│ +│ │ │ ▲ │ +│ │ └──► chunk_files ────┘ │ +│ │ │ +│ └──► blob_chunks ─────────────┘│ +│ │ +│ uploads ───────► blobs.blob_hash │ +│ └──────────► snapshots.id │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### Critical Constraint: `chunks` Must Exist First + +These tables reference `chunks.chunk_hash` **without CASCADE**: +- `file_chunks.chunk_hash` → `chunks.chunk_hash` +- `chunk_files.chunk_hash` → `chunks.chunk_hash` +- `blob_chunks.chunk_hash` → `chunks.chunk_hash` + +**Implication**: A chunk record MUST be committed to the database BEFORE any of these referencing records can be created. + +### Order of Operations Required by Schema + +``` +1. snapshots (created first, before scan) +2. blobs (created when packer starts new blob) +3. chunks (created during file processing) +4. blob_chunks (created immediately after chunk added to packer) +5. files (created after file fully chunked) +6. file_chunks (created with file record) +7. chunk_files (created with file record) +8. snapshot_files (created with file record) +9. snapshot_blobs (created after blob uploaded) +10. uploads (created after blob uploaded) +``` + +--- + +## Snapshot Creation Phases + +### Phase 0: Initialization + +**Actions:** +1. Snapshot record created in database (Transaction T0) +2. Known files loaded into memory from `files` table +3. Known chunks loaded into memory from `chunks` table + +**Transactions:** +``` +T0: INSERT INTO snapshots (id, hostname, ...) VALUES (...) + COMMIT +``` + +--- + +### Phase 1: Scan Directory + +**Actions:** +1. Walk filesystem directory tree +2. For each file, compare against in-memory `knownFiles` map +3. Classify files as: unchanged, new, or modified +4. Collect unchanged file IDs for later association +5. Collect new/modified files for processing + +**Transactions:** +``` +(None during scan - all in-memory) +``` + +--- + +### Phase 1b: Associate Unchanged Files + +**Actions:** +1. For unchanged files, add entries to `snapshot_files` table +2. Done in batches of 1000 + +**Transactions:** +``` +For each batch of 1000 file IDs: + T: BEGIN + INSERT INTO snapshot_files (snapshot_id, file_id) VALUES (?, ?) + ... (up to 1000 inserts) + COMMIT +``` + +--- + +### Phase 2: Process Files + +For each file that needs processing: + +#### Step 2a: Open and Chunk File + +**Location:** `processFileStreaming()` + +For each chunk produced by content-defined chunking: + +##### Step 2a-1: Check Chunk Existence +```go +chunkExists := s.chunkExists(chunk.Hash) // In-memory lookup +``` + +##### Step 2a-2: Create Chunk Record (if new) +```go +// TRANSACTION: Create chunk in database +err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + dbChunk := &database.Chunk{ChunkHash: chunk.Hash, Size: chunk.Size} + return s.repos.Chunks.Create(txCtx, tx, dbChunk) +}) +// COMMIT immediately after WithTx returns + +// Update in-memory cache +s.addKnownChunk(chunk.Hash) +``` + +**Transaction:** +``` +T_chunk: BEGIN + INSERT INTO chunks (chunk_hash, size) VALUES (?, ?) + COMMIT +``` + +##### Step 2a-3: Add Chunk to Packer + +```go +s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data}) +``` + +**Inside packer.AddChunk → addChunkToCurrentBlob():** + +```go +// TRANSACTION: Create blob_chunks record IMMEDIATELY +if p.repos != nil { + blobChunk := &database.BlobChunk{ + BlobID: p.currentBlob.id, + ChunkHash: chunk.Hash, + Offset: offset, + Length: chunkSize, + } + err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { + return p.repos.BlobChunks.Create(ctx, tx, blobChunk) + }) + // COMMIT immediately +} +``` + +**Transaction:** +``` +T_blob_chunk: BEGIN + INSERT INTO blob_chunks (blob_id, chunk_hash, offset, length) VALUES (?, ?, ?, ?) + COMMIT +``` + +**⚠️ CRITICAL DEPENDENCY**: This transaction requires `chunks.chunk_hash` to exist (FK constraint). +The chunk MUST be committed in Step 2a-2 BEFORE this can succeed. + +--- + +#### Step 2b: Blob Size Limit Handling + +If adding a chunk would exceed blob size limit: + +```go +if err == blob.ErrBlobSizeLimitExceeded { + if err := s.packer.FinalizeBlob(); err != nil { ... } + // Retry adding the chunk + if err := s.packer.AddChunk(...); err != nil { ... } +} +``` + +**FinalizeBlob() transactions:** +``` +T_blob_finish: BEGIN + UPDATE blobs SET blob_hash=?, uncompressed_size=?, compressed_size=?, finished_ts=? WHERE id=? + COMMIT +``` + +Then blob handler is called (handleBlobReady): +``` +(Upload to S3 - no transaction) + +T_blob_uploaded: BEGIN + UPDATE blobs SET uploaded_ts=? WHERE id=? + INSERT INTO snapshot_blobs (snapshot_id, blob_id, blob_hash) VALUES (?, ?, ?) + INSERT INTO uploads (blob_hash, snapshot_id, uploaded_at, size, duration_ms) VALUES (?, ?, ?, ?, ?) + COMMIT +``` + +--- + +#### Step 2c: Queue File for Batch Insertion + +After all chunks for a file are processed: + +```go +// Build file data (in-memory, no DB) +fileChunks := make([]database.FileChunk, len(chunks)) +chunkFiles := make([]database.ChunkFile, len(chunks)) + +// Queue for batch insertion +return s.addPendingFile(ctx, pendingFileData{ + file: fileToProcess.File, + fileChunks: fileChunks, + chunkFiles: chunkFiles, +}) +``` + +**No transaction yet** - just adds to `pendingFiles` slice. + +If `len(pendingFiles) >= fileBatchSize (100)`, triggers `flushPendingFiles()`. + +--- + +### Step 2d: Flush Pending Files + +**Location:** `flushPendingFiles()` - called when batch is full or at end of processing + +```go +return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + for _, data := range files { + // 1. Create file record + s.repos.Files.Create(txCtx, tx, data.file) // INSERT OR REPLACE + + // 2. Delete old associations + s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID) + s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID) + + // 3. Create file_chunks records + for _, fc := range data.fileChunks { + s.repos.FileChunks.Create(txCtx, tx, &fc) // FK: chunks.chunk_hash + } + + // 4. Create chunk_files records + for _, cf := range data.chunkFiles { + s.repos.ChunkFiles.Create(txCtx, tx, &cf) // FK: chunks.chunk_hash + } + + // 5. Add file to snapshot + s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID) + } + return nil +}) +// COMMIT (all or nothing for the batch) +``` + +**Transaction:** +``` +T_files_batch: BEGIN + -- For each file in batch: + INSERT OR REPLACE INTO files (...) VALUES (...) + DELETE FROM file_chunks WHERE file_id = ? + DELETE FROM chunk_files WHERE file_id = ? + INSERT INTO file_chunks (file_id, idx, chunk_hash) VALUES (?, ?, ?) -- FK: chunks + INSERT INTO chunk_files (chunk_hash, file_id, ...) VALUES (?, ?, ...) -- FK: chunks + INSERT INTO snapshot_files (snapshot_id, file_id) VALUES (?, ?) + -- Repeat for each file + COMMIT +``` + +**⚠️ CRITICAL DEPENDENCY**: `file_chunks` and `chunk_files` require `chunks.chunk_hash` to exist. + +--- + +### Phase 2 End: Final Flush + +```go +// Flush any remaining pending files +if err := s.flushAllPending(ctx); err != nil { ... } + +// Final packer flush +s.packer.Flush() +``` + +--- + +## The Current Bug + +### Problem + +The current code attempts to batch file insertions, but `file_chunks` and `chunk_files` have foreign keys to `chunks.chunk_hash`. The batched file flush tries to insert these records, but if the chunks haven't been committed yet, the FK constraint fails. + +### Why It's Happening + +Looking at the sequence: + +1. Process file A, chunk X +2. Create chunk X in DB (Transaction commits) +3. Add chunk X to packer +4. Packer creates blob_chunks for chunk X (needs chunk X - OK, committed in step 2) +5. Queue file A with chunk references +6. Process file B, chunk Y +7. Create chunk Y in DB (Transaction commits) +8. ... etc ... +9. At end: flushPendingFiles() +10. Insert file_chunks for file A referencing chunk X (chunk X committed - should work) + +The chunks ARE being created individually. But something is going wrong. + +### Actual Issue + +Wait - let me re-read the code. The issue is: + +In `processFileStreaming`, when we queue file data: +```go +fileChunks[i] = database.FileChunk{ + FileID: fileToProcess.File.ID, + Idx: ci.fileChunk.Idx, + ChunkHash: ci.fileChunk.ChunkHash, +} +``` + +The `FileID` is set, but `fileToProcess.File.ID` might be empty at this point because the file record hasn't been created yet! + +Looking at `checkFileInMemory`: +```go +// For new files: +if !exists { + return file, true // file.ID is empty string! +} + +// For existing files: +file.ID = existingFile.ID // Reuse existing ID +``` + +**For NEW files, `file.ID` is empty!** + +Then in `flushPendingFiles`: +```go +s.repos.Files.Create(txCtx, tx, data.file) // This generates/uses the ID +``` + +But `data.fileChunks` was built with the EMPTY ID! + +### The Real Problem + +For new files: +1. `checkFileInMemory` creates file record with empty ID +2. `processFileStreaming` queues file_chunks with empty `FileID` +3. `flushPendingFiles` creates file (generates ID), but file_chunks still have empty `FileID` + +Wait, but `Files.Create` should be INSERT OR REPLACE by path, and the file struct should get updated... Let me check. + +Actually, looking more carefully at the code path - the file IS created first in the flush, but the `fileChunks` slice was already built with the old (possibly empty) ID. The ID isn't updated after the file is created. + +Hmm, but looking at the current code: +```go +fileChunks[i] = database.FileChunk{ + FileID: fileToProcess.File.ID, // This uses the ID from the File struct +``` + +And in `checkFileInMemory` for new files, we create a file struct but don't set the ID. However, looking at the database repository, `Files.Create` should be doing `INSERT OR REPLACE` and the ID should be pre-generated... + +Let me check if IDs are being generated. Looking at the File struct usage, it seems like UUIDs should be generated somewhere... + +Actually, looking at the test failures again: +``` +creating file chunk: inserting file_chunk: constraint failed: FOREIGN KEY constraint failed (787) +``` + +Error 787 is SQLite's foreign key constraint error. The failing FK is on `file_chunks.chunk_hash → chunks.chunk_hash`. + +So the chunks ARE NOT in the database when we try to insert file_chunks. Let me trace through more carefully... + +--- + +## Transaction Timing Issue + +The problem is transaction visibility in SQLite. + +Each `WithTx` creates a new transaction that commits at the end. But with batched file insertion: + +1. Chunk transactions commit one at a time +2. File batch transaction runs later + +If chunks are being inserted but something goes wrong with transaction isolation, the file batch might not see them. + +But actually SQLite in WAL mode should have SERIALIZABLE isolation by default, so committed transactions should be visible. + +Let me check if the in-memory cache is masking a database problem... + +Actually, wait. Let me re-check the current broken code more carefully. The issue might be simpler. + +--- + +## Current Code Flow Analysis + +Looking at `processFileStreaming` in the current broken state: + +```go +// For each chunk: +if !chunkExists { + err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + dbChunk := &database.Chunk{ChunkHash: chunk.Hash, Size: chunk.Size} + return s.repos.Chunks.Create(txCtx, tx, dbChunk) + }) + // ... check error ... + s.addKnownChunk(chunk.Hash) +} + +// ... add to packer (creates blob_chunks) ... + +// Collect chunk info for file +chunks = append(chunks, chunkInfo{...}) +``` + +Then at end of function: +```go +// Queue file for batch insertion +return s.addPendingFile(ctx, pendingFileData{ + file: fileToProcess.File, + fileChunks: fileChunks, + chunkFiles: chunkFiles, +}) +``` + +At end of `processPhase`: +```go +if err := s.flushAllPending(ctx); err != nil { ... } +``` + +The chunks are being created one-by-one with individual transactions. By the time `flushPendingFiles` runs, all chunk transactions should have committed. + +Unless... there's a bug in how the chunks are being referenced. Let me check if the chunk_hash values are correct. + +Or... maybe the test database is being recreated between operations somehow? + +Actually, let me check the test setup. Maybe the issue is specific to the test environment. + +--- + +## Summary of Object Lifecycle + +| Object | When Created | Transaction | Dependencies | +|--------|--------------|-------------|--------------| +| snapshot | Before scan | Individual tx | None | +| blob | When packer needs new blob | Individual tx | None | +| chunk | During file chunking (each chunk) | Individual tx | None | +| blob_chunks | Immediately after adding chunk to packer | Individual tx | chunks, blobs | +| files | Batched at end of processing | Batch tx | None | +| file_chunks | With file (batched) | Batch tx | files, chunks | +| chunk_files | With file (batched) | Batch tx | files, chunks | +| snapshot_files | With file (batched) | Batch tx | snapshots, files | +| snapshot_blobs | After blob upload | Individual tx | snapshots, blobs | +| uploads | After blob upload | Same tx as snapshot_blobs | blobs, snapshots | + +--- + +## Root Cause Analysis + +After detailed analysis, I believe the issue is one of the following: + +### Hypothesis 1: File ID Not Set + +Looking at `checkFileInMemory()` for NEW files: +```go +if !exists { + return file, true // file.ID is empty string! +} +``` + +For new files, `file.ID` is empty. Then in `processFileStreaming`: +```go +fileChunks[i] = database.FileChunk{ + FileID: fileToProcess.File.ID, // Empty for new files! + ... +} +``` + +The `FileID` in the built `fileChunks` slice is empty. + +Then in `flushPendingFiles`: +```go +s.repos.Files.Create(txCtx, tx, data.file) // This generates the ID +// But data.fileChunks still has empty FileID! +for i := range data.fileChunks { + s.repos.FileChunks.Create(...) // Uses empty FileID +} +``` + +**Solution**: Generate file IDs upfront in `checkFileInMemory()`: +```go +file := &database.File{ + ID: uuid.New().String(), // Generate ID immediately + Path: path, + ... +} +``` + +### Hypothesis 2: Transaction Isolation + +SQLite with a single connection pool (`MaxOpenConns(1)`) should serialize all transactions. Committed data should be visible to subsequent transactions. + +However, there might be a subtle issue with how `context.Background()` is used in the packer vs the scanner's context. + +## Recommended Fix + +**Step 1: Generate file IDs upfront** + +In `checkFileInMemory()`, generate the UUID for new files immediately: +```go +file := &database.File{ + ID: uuid.New().String(), // Always generate ID + Path: path, + ... +} +``` + +This ensures `file.ID` is set when building `fileChunks` and `chunkFiles` slices. + +**Step 2: Verify by reverting to per-file transactions** + +If Step 1 doesn't fix it, revert to non-batched file insertion to isolate the issue: + +```go +// Instead of queuing: +// return s.addPendingFile(ctx, pendingFileData{...}) + +// Do immediate insertion: +return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + // Create file + s.repos.Files.Create(txCtx, tx, fileToProcess.File) + // Delete old associations + s.repos.FileChunks.DeleteByFileID(...) + s.repos.ChunkFiles.DeleteByFileID(...) + // Create new associations + for _, fc := range fileChunks { + s.repos.FileChunks.Create(...) + } + for _, cf := range chunkFiles { + s.repos.ChunkFiles.Create(...) + } + // Add to snapshot + s.repos.Snapshots.AddFileByID(...) + return nil +}) +``` + +**Step 3: If batching is still desired** + +After confirming per-file transactions work, re-implement batching with the ID fix in place, and add debug logging to trace exactly which chunk_hash is failing and why. diff --git a/go.mod b/go.mod index 7231b4d..37f362d 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.12.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.1 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect + github.com/adrg/xdg v0.5.3 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/aws/aws-sdk-go v1.44.256 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect diff --git a/go.sum b/go.sum index f1f5b8b..75c59bc 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1/go.mo github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs= github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78= +github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= diff --git a/internal/cli/app.go b/internal/cli/app.go index d627cb8..54cd838 100644 --- a/internal/cli/app.go +++ b/internal/cli/app.go @@ -2,9 +2,11 @@ package cli import ( "context" + "errors" "fmt" "os" "os/signal" + "path/filepath" "syscall" "time" @@ -12,9 +14,11 @@ import ( "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/globals" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/pidlock" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "git.eeqj.de/sneak/vaultik/internal/vaultik" + "github.com/adrg/xdg" "go.uber.org/fx" ) @@ -51,7 +55,7 @@ func NewApp(opts AppOptions) *fx.App { config.Module, database.Module, log.Module, - s3.Module, + storage.Module, snapshot.Module, fx.Provide(vaultik.New), fx.Invoke(setupGlobals), @@ -118,7 +122,23 @@ func RunApp(ctx context.Context, app *fx.App) error { // RunWithApp is a helper that creates and runs an fx app with the given options. // It combines NewApp and RunApp into a single convenient function. This is the // preferred way to run CLI commands that need the full application context. +// It acquires a PID lock before starting to prevent concurrent instances. func RunWithApp(ctx context.Context, opts AppOptions) error { + // Acquire PID lock to prevent concurrent instances + lockDir := filepath.Join(xdg.DataHome, "berlin.sneak.app.vaultik") + lock, err := pidlock.Acquire(lockDir) + if err != nil { + if errors.Is(err, pidlock.ErrAlreadyRunning) { + return fmt.Errorf("cannot start: %w", err) + } + return fmt.Errorf("failed to acquire lock: %w", err) + } + defer func() { + if err := lock.Release(); err != nil { + log.Warn("Failed to release PID lock", "error", err) + } + }() + app := NewApp(opts) return RunApp(ctx, app) } diff --git a/internal/cli/fetch.go b/internal/cli/fetch.go index 8b2634c..e9204d1 100644 --- a/internal/cli/fetch.go +++ b/internal/cli/fetch.go @@ -8,8 +8,8 @@ import ( "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/globals" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/cobra" "go.uber.org/fx" ) @@ -23,7 +23,7 @@ type FetchApp struct { Globals *globals.Globals Config *config.Config Repositories *database.Repositories - S3Client *s3.Client + Storage storage.Storer DB *database.DB Shutdowner fx.Shutdowner } @@ -61,15 +61,14 @@ The age_secret_key must be configured in the config file for decryption.`, }, Modules: []fx.Option{ snapshot.Module, - s3.Module, fx.Provide(fx.Annotate( func(g *globals.Globals, cfg *config.Config, repos *database.Repositories, - s3Client *s3.Client, db *database.DB, shutdowner fx.Shutdowner) *FetchApp { + storer storage.Storer, db *database.DB, shutdowner fx.Shutdowner) *FetchApp { return &FetchApp{ Globals: g, Config: cfg, Repositories: repos, - S3Client: s3Client, + Storage: storer, DB: db, Shutdowner: shutdowner, } diff --git a/internal/cli/restore.go b/internal/cli/restore.go index 3f34679..fa3d396 100644 --- a/internal/cli/restore.go +++ b/internal/cli/restore.go @@ -8,8 +8,8 @@ import ( "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/globals" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/cobra" "go.uber.org/fx" ) @@ -24,7 +24,7 @@ type RestoreApp struct { Globals *globals.Globals Config *config.Config Repositories *database.Repositories - S3Client *s3.Client + Storage storage.Storer DB *database.DB Shutdowner fx.Shutdowner } @@ -61,15 +61,14 @@ The age_secret_key must be configured in the config file for decryption.`, }, Modules: []fx.Option{ snapshot.Module, - s3.Module, fx.Provide(fx.Annotate( func(g *globals.Globals, cfg *config.Config, repos *database.Repositories, - s3Client *s3.Client, db *database.DB, shutdowner fx.Shutdowner) *RestoreApp { + storer storage.Storer, db *database.DB, shutdowner fx.Shutdowner) *RestoreApp { return &RestoreApp{ Globals: g, Config: cfg, Repositories: repos, - S3Client: s3Client, + Storage: storer, DB: db, Shutdowner: shutdowner, } diff --git a/internal/cli/store.go b/internal/cli/store.go index 55274fd..557f39b 100644 --- a/internal/cli/store.go +++ b/internal/cli/store.go @@ -7,14 +7,14 @@ import ( "time" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/cobra" "go.uber.org/fx" ) // StoreApp contains dependencies for store commands type StoreApp struct { - S3Client *s3.Client + Storage storage.Storer Shutdowner fx.Shutdowner } @@ -48,19 +48,18 @@ func newStoreInfoCommand() *cobra.Command { // Info displays storage information func (app *StoreApp) Info(ctx context.Context) error { - // Get bucket info - bucketName := app.S3Client.BucketName() - endpoint := app.S3Client.Endpoint() + // Get storage info + storageInfo := app.Storage.Info() fmt.Printf("Storage Information\n") fmt.Printf("==================\n\n") - fmt.Printf("S3 Configuration:\n") - fmt.Printf(" Endpoint: %s\n", endpoint) - fmt.Printf(" Bucket: %s\n\n", bucketName) + fmt.Printf("Storage Configuration:\n") + fmt.Printf(" Type: %s\n", storageInfo.Type) + fmt.Printf(" Location: %s\n\n", storageInfo.Location) // Count snapshots by listing metadata/ prefix snapshotCount := 0 - snapshotCh := app.S3Client.ListObjectsStream(ctx, "metadata/", true) + snapshotCh := app.Storage.ListStream(ctx, "metadata/") snapshotDirs := make(map[string]bool) for object := range snapshotCh { @@ -79,7 +78,7 @@ func (app *StoreApp) Info(ctx context.Context) error { blobCount := 0 var totalSize int64 - blobCh := app.S3Client.ListObjectsStream(ctx, "blobs/", false) + blobCh := app.Storage.ListStream(ctx, "blobs/") for object := range blobCh { if object.Err != nil { return fmt.Errorf("listing blobs: %w", object.Err) @@ -130,10 +129,9 @@ func runWithApp(ctx context.Context, fn func(*StoreApp) error) error { Debug: rootFlags.Debug, }, Modules: []fx.Option{ - s3.Module, - fx.Provide(func(s3Client *s3.Client, shutdowner fx.Shutdowner) *StoreApp { + fx.Provide(func(storer storage.Storer, shutdowner fx.Shutdowner) *StoreApp { return &StoreApp{ - S3Client: s3Client, + Storage: storer, Shutdowner: shutdowner, } }), diff --git a/internal/config/config.go b/internal/config/config.go index 6d5e53c..03c872a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,16 +3,43 @@ package config import ( "fmt" "os" + "path/filepath" + "strings" "time" "git.eeqj.de/sneak/smartconfig" + "github.com/adrg/xdg" "go.uber.org/fx" "gopkg.in/yaml.v3" ) +const appName = "berlin.sneak.app.vaultik" + +// expandTilde expands ~ at the start of a path to the user's home directory. +func expandTilde(path string) string { + if path == "~" { + home, _ := os.UserHomeDir() + return home + } + if strings.HasPrefix(path, "~/") { + home, _ := os.UserHomeDir() + return filepath.Join(home, path[2:]) + } + return path +} + +// expandTildeInURL expands ~ in file:// URLs. +func expandTildeInURL(url string) string { + if strings.HasPrefix(url, "file://~/") { + home, _ := os.UserHomeDir() + return "file://" + filepath.Join(home, url[9:]) + } + return url +} + // Config represents the application configuration for Vaultik. // It defines all settings for backup operations, including source directories, -// encryption recipients, S3 storage configuration, and performance tuning parameters. +// encryption recipients, storage configuration, and performance tuning parameters. // Configuration is typically loaded from a YAML file. type Config struct { AgeRecipients []string `yaml:"age_recipients"` @@ -28,6 +55,14 @@ type Config struct { S3 S3Config `yaml:"s3"` SourceDirs []string `yaml:"source_dirs"` CompressionLevel int `yaml:"compression_level"` + + // StorageURL specifies the storage backend using a URL format. + // Takes precedence over S3Config if set. + // Supported formats: + // - s3://bucket/prefix?endpoint=host®ion=us-east-1 + // - file:///path/to/backup + // For S3 URLs, credentials are still read from s3.access_key_id and s3.secret_access_key. + StorageURL string `yaml:"storage_url"` } // S3Config represents S3 storage configuration for backup storage. @@ -84,7 +119,7 @@ func Load(path string) (*Config, error) { BackupInterval: 1 * time.Hour, FullScanInterval: 24 * time.Hour, MinTimeBetweenRun: 15 * time.Minute, - IndexPath: "/var/lib/vaultik/index.sqlite", + IndexPath: filepath.Join(xdg.DataHome, appName, "index.sqlite"), CompressionLevel: 3, } @@ -99,9 +134,16 @@ func Load(path string) (*Config, error) { return nil, fmt.Errorf("failed to parse config: %w", err) } + // Expand tilde in all path fields + cfg.IndexPath = expandTilde(cfg.IndexPath) + cfg.StorageURL = expandTildeInURL(cfg.StorageURL) + for i, dir := range cfg.SourceDirs { + cfg.SourceDirs[i] = expandTilde(dir) + } + // Check for environment variable override for IndexPath if envIndexPath := os.Getenv("VAULTIK_INDEX_PATH"); envIndexPath != "" { - cfg.IndexPath = envIndexPath + cfg.IndexPath = expandTilde(envIndexPath) } // Get hostname if not set @@ -132,7 +174,7 @@ func Load(path string) (*Config, error) { // It ensures all required fields are present and have valid values: // - At least one age recipient must be specified // - At least one source directory must be configured -// - S3 credentials and endpoint must be provided +// - Storage must be configured (either storage_url or s3.* fields) // - Chunk size must be at least 1MB // - Blob size limit must be at least the chunk size // - Compression level must be between 1 and 19 @@ -146,20 +188,9 @@ func (c *Config) Validate() error { return fmt.Errorf("at least one source directory is required") } - if c.S3.Endpoint == "" { - return fmt.Errorf("s3.endpoint is required") - } - - if c.S3.Bucket == "" { - return fmt.Errorf("s3.bucket is required") - } - - if c.S3.AccessKeyID == "" { - return fmt.Errorf("s3.access_key_id is required") - } - - if c.S3.SecretAccessKey == "" { - return fmt.Errorf("s3.secret_access_key is required") + // Validate storage configuration + if err := c.validateStorage(); err != nil { + return err } if c.ChunkSize.Int64() < 1024*1024 { // 1MB minimum @@ -177,6 +208,50 @@ func (c *Config) Validate() error { return nil } +// validateStorage validates storage configuration. +// If StorageURL is set, it takes precedence. S3 URLs require credentials. +// File URLs don't require any S3 configuration. +// If StorageURL is not set, legacy S3 configuration is required. +func (c *Config) validateStorage() error { + if c.StorageURL != "" { + // URL-based configuration + if strings.HasPrefix(c.StorageURL, "file://") { + // File storage doesn't need S3 credentials + return nil + } + if strings.HasPrefix(c.StorageURL, "s3://") { + // S3 storage needs credentials + if c.S3.AccessKeyID == "" { + return fmt.Errorf("s3.access_key_id is required for s3:// URLs") + } + if c.S3.SecretAccessKey == "" { + return fmt.Errorf("s3.secret_access_key is required for s3:// URLs") + } + return nil + } + return fmt.Errorf("storage_url must start with s3:// or file://") + } + + // Legacy S3 configuration + if c.S3.Endpoint == "" { + return fmt.Errorf("s3.endpoint is required (or set storage_url)") + } + + if c.S3.Bucket == "" { + return fmt.Errorf("s3.bucket is required (or set storage_url)") + } + + if c.S3.AccessKeyID == "" { + return fmt.Errorf("s3.access_key_id is required") + } + + if c.S3.SecretAccessKey == "" { + return fmt.Errorf("s3.secret_access_key is required") + } + + return nil +} + // Module exports the config module for fx dependency injection. // It provides the Config type to other modules in the application. var Module = fx.Module("config", diff --git a/internal/database/database.go b/internal/database/database.go index 1152aef..0fd7b49 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -36,26 +36,17 @@ type DB struct { } // New creates a new database connection at the specified path. -// It automatically handles database recovery, creates the schema if needed, -// and configures SQLite with appropriate settings for performance and reliability. -// The database uses WAL mode for better concurrency and sets a busy timeout -// to handle concurrent access gracefully. -// -// If the database appears locked, it will attempt recovery by removing stale -// lock files and switching temporarily to TRUNCATE journal mode. -// -// New creates a new database connection at the specified path. -// It automatically handles recovery from stale locks, creates the schema if needed, -// and configures SQLite with WAL mode for better concurrency. +// It creates the schema if needed and configures SQLite with WAL mode for +// better concurrency. SQLite handles crash recovery automatically when +// opening a database with journal/WAL files present. // The path parameter can be a file path for persistent storage or ":memory:" // for an in-memory database (useful for testing). func New(ctx context.Context, path string) (*DB, error) { log.Debug("Opening database connection", "path", path) - // First, try to recover from any stale locks - if err := recoverDatabase(ctx, path); err != nil { - log.Warn("Failed to recover database", "error", err) - } + // Note: We do NOT delete journal/WAL files before opening. + // SQLite handles crash recovery automatically when the database is opened. + // Deleting these files would corrupt the database after an unclean shutdown. // First attempt with standard WAL mode log.Debug("Attempting to open database with WAL mode", "path", path) @@ -156,62 +147,6 @@ func (db *DB) Close() error { return nil } -// recoverDatabase attempts to recover a locked database -func recoverDatabase(ctx context.Context, path string) error { - // Check if database file exists - if _, err := os.Stat(path); os.IsNotExist(err) { - // No database file, nothing to recover - return nil - } - - // Remove stale lock files - // SQLite creates -wal and -shm files for WAL mode - walPath := path + "-wal" - shmPath := path + "-shm" - journalPath := path + "-journal" - - log.Info("Attempting database recovery", "path", path) - - // Always remove lock files on startup to ensure clean state - removed := false - - // Check for and remove journal file (from non-WAL mode) - if _, err := os.Stat(journalPath); err == nil { - log.Info("Found journal file, removing", "path", journalPath) - if err := os.Remove(journalPath); err != nil { - log.Warn("Failed to remove journal file", "error", err) - } else { - removed = true - } - } - - // Remove WAL file - if _, err := os.Stat(walPath); err == nil { - log.Info("Found WAL file, removing", "path", walPath) - if err := os.Remove(walPath); err != nil { - log.Warn("Failed to remove WAL file", "error", err) - } else { - removed = true - } - } - - // Remove SHM file - if _, err := os.Stat(shmPath); err == nil { - log.Info("Found shared memory file, removing", "path", shmPath) - if err := os.Remove(shmPath); err != nil { - log.Warn("Failed to remove shared memory file", "error", err) - } else { - removed = true - } - } - - if removed { - log.Info("Database lock files removed") - } - - return nil -} - // Conn returns the underlying *sql.DB connection. // This should be used sparingly and primarily for read operations. // For write operations, prefer using the ExecWithLog method. diff --git a/internal/pidlock/pidlock.go b/internal/pidlock/pidlock.go new file mode 100644 index 0000000..dfe0306 --- /dev/null +++ b/internal/pidlock/pidlock.go @@ -0,0 +1,108 @@ +// Package pidlock provides process-level locking using PID files. +// It prevents multiple instances of vaultik from running simultaneously, +// which would cause database locking conflicts. +package pidlock + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "syscall" +) + +// ErrAlreadyRunning indicates another vaultik instance is running. +var ErrAlreadyRunning = errors.New("another vaultik instance is already running") + +// Lock represents an acquired PID lock. +type Lock struct { + path string +} + +// Acquire attempts to acquire a PID lock in the specified directory. +// If the lock file exists and the process is still running, it returns +// ErrAlreadyRunning with details about the existing process. +// On success, it writes the current PID to the lock file and returns +// a Lock that must be released with Release(). +func Acquire(lockDir string) (*Lock, error) { + // Ensure lock directory exists + if err := os.MkdirAll(lockDir, 0700); err != nil { + return nil, fmt.Errorf("creating lock directory: %w", err) + } + + lockPath := filepath.Join(lockDir, "vaultik.pid") + + // Check for existing lock + existingPID, err := readPIDFile(lockPath) + if err == nil { + // Lock file exists, check if process is running + if isProcessRunning(existingPID) { + return nil, fmt.Errorf("%w (PID %d)", ErrAlreadyRunning, existingPID) + } + // Process is not running, stale lock file - we can take over + } + + // Write our PID + pid := os.Getpid() + if err := os.WriteFile(lockPath, []byte(strconv.Itoa(pid)), 0600); err != nil { + return nil, fmt.Errorf("writing PID file: %w", err) + } + + return &Lock{path: lockPath}, nil +} + +// Release removes the PID lock file. +// It is safe to call Release multiple times. +func (l *Lock) Release() error { + if l == nil || l.path == "" { + return nil + } + + // Verify we still own the lock (our PID is in the file) + existingPID, err := readPIDFile(l.path) + if err != nil { + // File already gone or unreadable - that's fine + return nil + } + + if existingPID != os.Getpid() { + // Someone else wrote to our lock file - don't remove it + return nil + } + + if err := os.Remove(l.path); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("removing PID file: %w", err) + } + + l.path = "" // Prevent double-release + return nil +} + +// readPIDFile reads and parses the PID from a lock file. +func readPIDFile(path string) (int, error) { + data, err := os.ReadFile(path) + if err != nil { + return 0, err + } + + pid, err := strconv.Atoi(strings.TrimSpace(string(data))) + if err != nil { + return 0, fmt.Errorf("parsing PID: %w", err) + } + + return pid, nil +} + +// isProcessRunning checks if a process with the given PID is running. +func isProcessRunning(pid int) bool { + process, err := os.FindProcess(pid) + if err != nil { + return false + } + + // On Unix, FindProcess always succeeds. We need to send signal 0 to check. + err = process.Signal(syscall.Signal(0)) + return err == nil +} diff --git a/internal/pidlock/pidlock_test.go b/internal/pidlock/pidlock_test.go new file mode 100644 index 0000000..d256ee1 --- /dev/null +++ b/internal/pidlock/pidlock_test.go @@ -0,0 +1,108 @@ +package pidlock + +import ( + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAcquireAndRelease(t *testing.T) { + tmpDir := t.TempDir() + + // Acquire lock + lock, err := Acquire(tmpDir) + require.NoError(t, err) + require.NotNil(t, lock) + + // Verify PID file exists with our PID + data, err := os.ReadFile(filepath.Join(tmpDir, "vaultik.pid")) + require.NoError(t, err) + pid, err := strconv.Atoi(string(data)) + require.NoError(t, err) + assert.Equal(t, os.Getpid(), pid) + + // Release lock + err = lock.Release() + require.NoError(t, err) + + // Verify PID file is gone + _, err = os.Stat(filepath.Join(tmpDir, "vaultik.pid")) + assert.True(t, os.IsNotExist(err)) +} + +func TestAcquireBlocksSecondInstance(t *testing.T) { + tmpDir := t.TempDir() + + // Acquire first lock + lock1, err := Acquire(tmpDir) + require.NoError(t, err) + require.NotNil(t, lock1) + defer func() { _ = lock1.Release() }() + + // Try to acquire second lock - should fail + lock2, err := Acquire(tmpDir) + assert.ErrorIs(t, err, ErrAlreadyRunning) + assert.Nil(t, lock2) +} + +func TestAcquireWithStaleLock(t *testing.T) { + tmpDir := t.TempDir() + + // Write a stale PID file (PID that doesn't exist) + stalePID := 999999999 // Unlikely to be a real process + pidPath := filepath.Join(tmpDir, "vaultik.pid") + err := os.WriteFile(pidPath, []byte(strconv.Itoa(stalePID)), 0600) + require.NoError(t, err) + + // Should be able to acquire lock (stale lock is cleaned up) + lock, err := Acquire(tmpDir) + require.NoError(t, err) + require.NotNil(t, lock) + defer func() { _ = lock.Release() }() + + // Verify our PID is now in the file + data, err := os.ReadFile(pidPath) + require.NoError(t, err) + pid, err := strconv.Atoi(string(data)) + require.NoError(t, err) + assert.Equal(t, os.Getpid(), pid) +} + +func TestReleaseIsIdempotent(t *testing.T) { + tmpDir := t.TempDir() + + lock, err := Acquire(tmpDir) + require.NoError(t, err) + + // Release multiple times - should not error + err = lock.Release() + require.NoError(t, err) + + err = lock.Release() + require.NoError(t, err) +} + +func TestReleaseNilLock(t *testing.T) { + var lock *Lock + err := lock.Release() + assert.NoError(t, err) +} + +func TestAcquireCreatesDirectory(t *testing.T) { + tmpDir := t.TempDir() + nestedDir := filepath.Join(tmpDir, "nested", "dir") + + lock, err := Acquire(nestedDir) + require.NoError(t, err) + require.NotNil(t, lock) + defer func() { _ = lock.Release() }() + + // Verify directory was created + info, err := os.Stat(nestedDir) + require.NoError(t, err) + assert.True(t, info.IsDir()) +} diff --git a/internal/snapshot/file_change_test.go b/internal/snapshot/file_change_test.go index 57f3aee..2a7bf7f 100644 --- a/internal/snapshot/file_change_test.go +++ b/internal/snapshot/file_change_test.go @@ -194,8 +194,8 @@ func TestMultipleFileChanges(t *testing.T) { // First scan result1, err := scanner.Scan(ctx, "/", snapshotID1) require.NoError(t, err) - // 4 files because root directory is also counted - assert.Equal(t, 4, result1.FilesScanned) + // Only regular files are counted, not directories + assert.Equal(t, 3, result1.FilesScanned) // Modify two files time.Sleep(10 * time.Millisecond) // Ensure mtime changes @@ -221,9 +221,8 @@ func TestMultipleFileChanges(t *testing.T) { result2, err := scanner.Scan(ctx, "/", snapshotID2) require.NoError(t, err) - // The scanner might examine more items than just our files (includes directories, etc) - // We should verify that at least our expected files were scanned - assert.GreaterOrEqual(t, result2.FilesScanned, 4, "Should scan at least 4 files (3 files + root dir)") + // Only regular files are counted, not directories + assert.Equal(t, 3, result2.FilesScanned) // Verify each file has exactly one set of chunks for path := range files { diff --git a/internal/snapshot/module.go b/internal/snapshot/module.go index fde5e43..9beea61 100644 --- a/internal/snapshot/module.go +++ b/internal/snapshot/module.go @@ -3,7 +3,7 @@ package snapshot import ( "git.eeqj.de/sneak/vaultik/internal/config" "git.eeqj.de/sneak/vaultik/internal/database" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/afero" "go.uber.org/fx" ) @@ -27,13 +27,13 @@ var Module = fx.Module("backup", // ScannerFactory creates scanners with custom parameters type ScannerFactory func(params ScannerParams) *Scanner -func provideScannerFactory(cfg *config.Config, repos *database.Repositories, s3Client *s3.Client) ScannerFactory { +func provideScannerFactory(cfg *config.Config, repos *database.Repositories, storer storage.Storer) ScannerFactory { return func(params ScannerParams) *Scanner { return NewScanner(ScannerConfig{ FS: params.Fs, ChunkSize: cfg.ChunkSize.Int64(), Repositories: repos, - S3Client: s3Client, + Storage: storer, MaxBlobSize: cfg.BlobSizeLimit.Int64(), CompressionLevel: cfg.CompressionLevel, AgeRecipients: cfg.AgeRecipients, diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index 004a11b..0507f5e 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "io" "os" "strings" "sync" @@ -14,8 +13,9 @@ import ( "git.eeqj.de/sneak/vaultik/internal/chunker" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/dustin/go-humanize" + "github.com/google/uuid" "github.com/spf13/afero" ) @@ -26,19 +26,34 @@ type FileToProcess struct { File *database.File } +// pendingFileData holds all data needed to commit a file to the database +type pendingFileData struct { + file *database.File + fileChunks []database.FileChunk + chunkFiles []database.ChunkFile +} + // Scanner scans directories and populates the database with file and chunk information type Scanner struct { fs afero.Fs chunker *chunker.Chunker packer *blob.Packer repos *database.Repositories - s3Client S3Client + storage storage.Storer maxBlobSize int64 compressionLevel int ageRecipient string snapshotID string // Current snapshot being processed progress *ProgressReporter + // In-memory cache of known chunk hashes for fast existence checks + knownChunks map[string]struct{} + knownChunksMu sync.RWMutex + + // Pending file data buffer for batch insertion + pendingFiles []pendingFileData + pendingFilesMu sync.Mutex + // Mutex for coordinating blob creation packerMu sync.Mutex // Blocks chunk production during blob creation @@ -46,19 +61,17 @@ type Scanner struct { scanCtx context.Context } -// S3Client interface for blob storage operations -type S3Client interface { - PutObject(ctx context.Context, key string, data io.Reader) error - PutObjectWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress s3.ProgressCallback) error - StatObject(ctx context.Context, key string) (*s3.ObjectInfo, error) -} +const ( + // Batch size for file database operations + fileBatchSize = 100 +) // ScannerConfig contains configuration for the scanner type ScannerConfig struct { FS afero.Fs ChunkSize int64 Repositories *database.Repositories - S3Client S3Client + Storage storage.Storer MaxBlobSize int64 CompressionLevel int AgeRecipients []string // Optional, empty means no encryption @@ -111,7 +124,7 @@ func NewScanner(cfg ScannerConfig) *Scanner { chunker: chunker.NewChunker(cfg.ChunkSize), packer: packer, repos: cfg.Repositories, - s3Client: cfg.S3Client, + storage: cfg.Storage, maxBlobSize: cfg.MaxBlobSize, compressionLevel: cfg.CompressionLevel, ageRecipient: strings.Join(cfg.AgeRecipients, ","), @@ -128,11 +141,11 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc } // Set blob handler for concurrent upload - if s.s3Client != nil { - log.Debug("Setting blob handler for S3 uploads") + if s.storage != nil { + log.Debug("Setting blob handler for storage uploads") s.packer.SetBlobHandler(s.handleBlobReady) } else { - log.Debug("No S3 client configured, blobs will not be uploaded") + log.Debug("No storage configured, blobs will not be uploaded") } // Start progress reporting if enabled @@ -141,16 +154,41 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc defer s.progress.Stop() } - // Phase 0: Check for deleted files from previous snapshots - if err := s.detectDeletedFiles(ctx, path, result); err != nil { + // Phase 0: Load known files and chunks from database into memory for fast lookup + fmt.Println("Loading known files from database...") + knownFiles, err := s.loadKnownFiles(ctx, path) + if err != nil { + return nil, fmt.Errorf("loading known files: %w", err) + } + fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles))) + + fmt.Println("Loading known chunks from database...") + if err := s.loadKnownChunks(ctx); err != nil { + return nil, fmt.Errorf("loading known chunks: %w", err) + } + fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks))) + + // Phase 1: Scan directory, collect files to process, and track existing files + // (builds existingFiles map during walk to avoid double traversal) + log.Info("Phase 1/3: Scanning directory structure") + existingFiles := make(map[string]struct{}) + scanResult, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles) + if err != nil { + return nil, fmt.Errorf("scan phase failed: %w", err) + } + filesToProcess := scanResult.FilesToProcess + + // Phase 1b: Detect deleted files by comparing DB against scanned files + if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil { return nil, fmt.Errorf("detecting deleted files: %w", err) } - // Phase 1: Scan directory and collect files to process - log.Info("Phase 1/3: Scanning directory structure") - filesToProcess, err := s.scanPhase(ctx, path, result) - if err != nil { - return nil, fmt.Errorf("scan phase failed: %w", err) + // Phase 1c: Associate unchanged files with this snapshot (no new records needed) + if len(scanResult.UnchangedFileIDs) > 0 { + fmt.Printf("Associating %s unchanged files with snapshot...\n", formatNumber(len(scanResult.UnchangedFileIDs))) + if err := s.batchAddFilesToSnapshot(ctx, scanResult.UnchangedFileIDs); err != nil { + return nil, fmt.Errorf("associating unchanged files: %w", err) + } } // Calculate total size to process @@ -216,22 +254,150 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc return result, nil } +// loadKnownFiles loads all known files from the database into a map for fast lookup +// This avoids per-file database queries during the scan phase +func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*database.File, error) { + files, err := s.repos.Files.ListByPrefix(ctx, path) + if err != nil { + return nil, fmt.Errorf("listing files by prefix: %w", err) + } + + result := make(map[string]*database.File, len(files)) + for _, f := range files { + result[f.Path] = f + } + + return result, nil +} + +// loadKnownChunks loads all known chunk hashes from the database into a map for fast lookup +// This avoids per-chunk database queries during file processing +func (s *Scanner) loadKnownChunks(ctx context.Context) error { + chunks, err := s.repos.Chunks.List(ctx) + if err != nil { + return fmt.Errorf("listing chunks: %w", err) + } + + s.knownChunksMu.Lock() + s.knownChunks = make(map[string]struct{}, len(chunks)) + for _, c := range chunks { + s.knownChunks[c.ChunkHash] = struct{}{} + } + s.knownChunksMu.Unlock() + + return nil +} + +// chunkExists checks if a chunk hash exists in the in-memory cache +func (s *Scanner) chunkExists(hash string) bool { + s.knownChunksMu.RLock() + _, exists := s.knownChunks[hash] + s.knownChunksMu.RUnlock() + return exists +} + +// addKnownChunk adds a chunk hash to the in-memory cache +func (s *Scanner) addKnownChunk(hash string) { + s.knownChunksMu.Lock() + s.knownChunks[hash] = struct{}{} + s.knownChunksMu.Unlock() +} + +// addPendingFile adds a file to the pending buffer and flushes if needed +func (s *Scanner) addPendingFile(ctx context.Context, data pendingFileData) error { + s.pendingFilesMu.Lock() + s.pendingFiles = append(s.pendingFiles, data) + needsFlush := len(s.pendingFiles) >= fileBatchSize + s.pendingFilesMu.Unlock() + + if needsFlush { + return s.flushPendingFiles(ctx) + } + return nil +} + +// flushPendingFiles writes all pending files to the database in a single transaction +func (s *Scanner) flushPendingFiles(ctx context.Context) error { + s.pendingFilesMu.Lock() + files := s.pendingFiles + s.pendingFiles = nil + s.pendingFilesMu.Unlock() + + if len(files) == 0 { + return nil + } + + return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + for _, data := range files { + // Create or update the file record + if err := s.repos.Files.Create(txCtx, tx, data.file); err != nil { + return fmt.Errorf("creating file record: %w", err) + } + + // Delete any existing file_chunks and chunk_files for this file + if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID); err != nil { + return fmt.Errorf("deleting old file chunks: %w", err) + } + if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID); err != nil { + return fmt.Errorf("deleting old chunk files: %w", err) + } + + // Create file-chunk mappings + for i := range data.fileChunks { + if err := s.repos.FileChunks.Create(txCtx, tx, &data.fileChunks[i]); err != nil { + return fmt.Errorf("creating file chunk: %w", err) + } + } + + // Create chunk-file mappings + for i := range data.chunkFiles { + if err := s.repos.ChunkFiles.Create(txCtx, tx, &data.chunkFiles[i]); err != nil { + return fmt.Errorf("creating chunk file: %w", err) + } + } + + // Add file to snapshot + if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID); err != nil { + return fmt.Errorf("adding file to snapshot: %w", err) + } + } + return nil + }) +} + +// flushAllPending flushes all pending files to the database +func (s *Scanner) flushAllPending(ctx context.Context) error { + return s.flushPendingFiles(ctx) +} + +// ScanPhaseResult contains the results of the scan phase +type ScanPhaseResult struct { + FilesToProcess []*FileToProcess + UnchangedFileIDs []string // IDs of unchanged files to associate with snapshot +} + // scanPhase performs the initial directory scan to identify files to process -func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult) ([]*FileToProcess, error) { +// It uses the pre-loaded knownFiles map for fast change detection without DB queries +// It also populates existingFiles map for deletion detection +// Returns files needing processing and IDs of unchanged files for snapshot association +func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) (*ScanPhaseResult, error) { + // Use known file count as estimate for progress (accurate for subsequent backups) + estimatedTotal := int64(len(knownFiles)) + var filesToProcess []*FileToProcess + var unchangedFileIDs []string // Just IDs - no new records needed var mu sync.Mutex // Set up periodic status output + startTime := time.Now() lastStatusTime := time.Now() statusInterval := 15 * time.Second var filesScanned int64 - var bytesScanned int64 log.Debug("Starting directory walk", "path", path) - err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error { - log.Debug("Scanning filesystem entry", "path", path) + err := afero.Walk(s.fs, path, func(filePath string, info os.FileInfo, err error) error { if err != nil { - log.Debug("Error accessing filesystem entry", "path", path, "error", err) + log.Debug("Error accessing filesystem entry", "path", filePath, "error", err) return err } @@ -242,42 +408,80 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult default: } - // Check file and update metadata - file, needsProcessing, err := s.checkFileAndUpdateMetadata(ctx, path, info, result) - if err != nil { - // Don't log context cancellation as an error - if err == context.Canceled { - return err - } - return fmt.Errorf("failed to check %s: %w", path, err) + // Skip non-regular files for processing (but still count them) + if !info.Mode().IsRegular() { + return nil } - // If file needs processing, add to list - if needsProcessing && info.Mode().IsRegular() && info.Size() > 0 { - mu.Lock() + // Track this file as existing (for deletion detection) + existingFiles[filePath] = struct{}{} + + // Check file against in-memory map (no DB query!) + file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles) + + mu.Lock() + if needsProcessing { + // New or changed file - will create record after processing filesToProcess = append(filesToProcess, &FileToProcess{ - Path: path, + Path: filePath, FileInfo: info, File: file, }) - mu.Unlock() + } else if file.ID != "" { + // Unchanged file with existing ID - just need snapshot association + unchangedFileIDs = append(unchangedFileIDs, file.ID) } + filesScanned++ + changedCount := len(filesToProcess) + mu.Unlock() - // Update scan statistics - if info.Mode().IsRegular() { - filesScanned++ - bytesScanned += info.Size() + // Update result stats + if needsProcessing { + result.BytesScanned += info.Size() + } else { + result.FilesSkipped++ + result.BytesSkipped += info.Size() } + result.FilesScanned++ // Output periodic status if time.Since(lastStatusTime) >= statusInterval { - mu.Lock() - changedCount := len(filesToProcess) - mu.Unlock() + elapsed := time.Since(startTime) + rate := float64(filesScanned) / elapsed.Seconds() - fmt.Printf("Scan progress: %s files examined, %s changed\n", - formatNumber(int(filesScanned)), - formatNumber(changedCount)) + // Build status line - use estimate if available (not first backup) + if estimatedTotal > 0 { + // Show actual scanned vs estimate (may exceed estimate if files were added) + pct := float64(filesScanned) / float64(estimatedTotal) * 100 + if pct > 100 { + pct = 100 // Cap at 100% for display + } + remaining := estimatedTotal - filesScanned + if remaining < 0 { + remaining = 0 + } + var eta time.Duration + if rate > 0 && remaining > 0 { + eta = time.Duration(float64(remaining)/rate) * time.Second + } + fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed", + formatNumber(int(filesScanned)), + pct, + formatNumber(changedCount), + rate, + elapsed.Round(time.Second)) + if eta > 0 { + fmt.Printf(", ETA %s", eta.Round(time.Second)) + } + fmt.Println() + } else { + // First backup - no estimate available + fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n", + formatNumber(int(filesScanned)), + formatNumber(changedCount), + rate, + elapsed.Round(time.Second)) + } lastStatusTime = time.Now() } @@ -288,16 +492,137 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult return nil, err } - return filesToProcess, nil + return &ScanPhaseResult{ + FilesToProcess: filesToProcess, + UnchangedFileIDs: unchangedFileIDs, + }, nil +} + +// checkFileInMemory checks if a file needs processing using the in-memory map +// No database access is performed - this is purely CPU/memory work +func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) { + // Get file stats + stat, ok := info.Sys().(interface { + Uid() uint32 + Gid() uint32 + }) + + var uid, gid uint32 + if ok { + uid = stat.Uid() + gid = stat.Gid() + } + + // Check against in-memory map first to get existing ID if available + existingFile, exists := knownFiles[path] + + // Create file record with ID set upfront + // For new files, generate UUID immediately so it's available for chunk associations + // For existing files, reuse the existing ID + var fileID string + if exists { + fileID = existingFile.ID + } else { + fileID = uuid.New().String() + } + + file := &database.File{ + ID: fileID, + Path: path, + MTime: info.ModTime(), + CTime: info.ModTime(), // afero doesn't provide ctime + Size: info.Size(), + Mode: uint32(info.Mode()), + UID: uid, + GID: gid, + } + + // New file - needs processing + if !exists { + return file, true + } + + // Check if file has changed + if existingFile.Size != file.Size || + existingFile.MTime.Unix() != file.MTime.Unix() || + existingFile.Mode != file.Mode || + existingFile.UID != file.UID || + existingFile.GID != file.GID { + return file, true + } + + // File unchanged + return file, false +} + +// batchAddFilesToSnapshot adds existing file IDs to the snapshot association table +// This is used for unchanged files that already have records in the database +func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []string) error { + const batchSize = 1000 + + startTime := time.Now() + lastStatusTime := time.Now() + statusInterval := 5 * time.Second + + for i := 0; i < len(fileIDs); i += batchSize { + // Check context cancellation + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + end := i + batchSize + if end > len(fileIDs) { + end = len(fileIDs) + } + batch := fileIDs[i:end] + + err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { + for _, fileID := range batch { + if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, fileID); err != nil { + return fmt.Errorf("adding file to snapshot: %w", err) + } + } + return nil + }) + if err != nil { + return err + } + + // Periodic status + if time.Since(lastStatusTime) >= statusInterval { + elapsed := time.Since(startTime) + rate := float64(end) / elapsed.Seconds() + pct := float64(end) / float64(len(fileIDs)) * 100 + fmt.Printf("Associating files: %s/%s (%.1f%%), %.0f files/sec\n", + formatNumber(end), formatNumber(len(fileIDs)), pct, rate) + lastStatusTime = time.Now() + } + } + + elapsed := time.Since(startTime) + rate := float64(len(fileIDs)) / elapsed.Seconds() + fmt.Printf("Associated %s unchanged files in %s (%.0f files/sec)\n", + formatNumber(len(fileIDs)), elapsed.Round(time.Second), rate) + + return nil } // processPhase processes the files that need backing up func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error { + // Calculate total bytes to process + var totalBytes int64 + for _, f := range filesToProcess { + totalBytes += f.FileInfo.Size() + } + // Set up periodic status output lastStatusTime := time.Now() statusInterval := 15 * time.Second startTime := time.Now() filesProcessed := 0 + var bytesProcessed int64 totalFiles := len(filesToProcess) // Process each file @@ -318,18 +643,33 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc } filesProcessed++ + bytesProcessed += fileToProcess.FileInfo.Size() // Output periodic status if time.Since(lastStatusTime) >= statusInterval { elapsed := time.Since(startTime) - remaining := totalFiles - filesProcessed + pct := float64(bytesProcessed) / float64(totalBytes) * 100 + byteRate := float64(bytesProcessed) / elapsed.Seconds() + fileRate := float64(filesProcessed) / elapsed.Seconds() + + // Calculate ETA based on bytes (more accurate than files) + remainingBytes := totalBytes - bytesProcessed var eta time.Duration - if filesProcessed > 0 { - eta = elapsed / time.Duration(filesProcessed) * time.Duration(remaining) + if byteRate > 0 { + eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second } - fmt.Printf("Progress: %s/%s files", formatNumber(filesProcessed), formatNumber(totalFiles)) - if remaining > 0 && eta > 0 { + // Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s + fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s", + formatCompact(filesProcessed), + formatCompact(totalFiles), + humanize.Bytes(uint64(bytesProcessed)), + humanize.Bytes(uint64(totalBytes)), + pct, + humanize.Bytes(uint64(byteRate)), + fileRate, + elapsed.Round(time.Second)) + if eta > 0 { fmt.Printf(", ETA: %s", eta.Round(time.Second)) } fmt.Println() @@ -337,6 +677,11 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc } } + // Flush any remaining pending chunks and files to database + if err := s.flushAllPending(ctx); err != nil { + return fmt.Errorf("flushing pending database operations: %w", err) + } + // Final flush (outside any transaction) s.packerMu.Lock() if err := s.packer.Flush(); err != nil { @@ -345,8 +690,8 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc } s.packerMu.Unlock() - // If no S3 client, store any remaining blobs - if s.s3Client == nil { + // If no storage configured, store any remaining blobs locally + if s.storage == nil { blobs := s.packer.GetFinishedBlobs() for _, b := range blobs { // Blob metadata is already stored incrementally during packing @@ -364,205 +709,6 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc return nil } -// checkFileAndUpdateMetadata checks if a file needs processing and updates metadata -func (s *Scanner) checkFileAndUpdateMetadata(ctx context.Context, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) { - // Check context cancellation - select { - case <-ctx.Done(): - return nil, false, ctx.Err() - default: - } - - // Process file without holding a long transaction - return s.checkFile(ctx, path, info, result) -} - -// checkFile checks if a file needs processing and updates metadata -func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) { - // Get file stats - stat, ok := info.Sys().(interface { - Uid() uint32 - Gid() uint32 - }) - - var uid, gid uint32 - if ok { - uid = stat.Uid() - gid = stat.Gid() - } - - // Check if it's a symlink - var linkTarget string - if info.Mode()&os.ModeSymlink != 0 { - // Read the symlink target - if linker, ok := s.fs.(afero.LinkReader); ok { - linkTarget, _ = linker.ReadlinkIfPossible(path) - } - } - - // Create file record - file := &database.File{ - Path: path, - MTime: info.ModTime(), - CTime: info.ModTime(), // afero doesn't provide ctime - Size: info.Size(), - Mode: uint32(info.Mode()), - UID: uid, - GID: gid, - LinkTarget: linkTarget, - } - - // Check if file has changed since last backup (no transaction needed for read) - log.Debug("Querying database for existing file record", "path", path) - existingFile, err := s.repos.Files.GetByPath(ctx, path) - if err != nil { - return nil, false, fmt.Errorf("checking existing file: %w", err) - } - - fileChanged := existingFile == nil || s.hasFileChanged(existingFile, file) - - // Update file metadata and add to snapshot in a single transaction - log.Debug("Updating file record in database and adding to snapshot", "path", path, "changed", fileChanged, "snapshot", s.snapshotID) - err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { - // First create/update the file - if err := s.repos.Files.Create(ctx, tx, file); err != nil { - return fmt.Errorf("creating file: %w", err) - } - // Then add it to the snapshot using the file ID - if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, file.ID); err != nil { - return fmt.Errorf("adding file to snapshot: %w", err) - } - return nil - }) - if err != nil { - return nil, false, err - } - log.Debug("File record added to snapshot association", "path", path) - - result.FilesScanned++ - - // Update progress - if s.progress != nil { - stats := s.progress.GetStats() - stats.FilesScanned.Add(1) - stats.CurrentFile.Store(path) - } - - // Track skipped files - if info.Mode().IsRegular() && info.Size() > 0 && !fileChanged { - result.FilesSkipped++ - result.BytesSkipped += info.Size() - if s.progress != nil { - stats := s.progress.GetStats() - stats.FilesSkipped.Add(1) - stats.BytesSkipped.Add(info.Size()) - } - // File hasn't changed, but we still need to associate existing chunks with this snapshot - log.Debug("File content unchanged, reusing existing chunks and blobs", "path", path) - if err := s.associateExistingChunks(ctx, path); err != nil { - return nil, false, fmt.Errorf("associating existing chunks: %w", err) - } - log.Debug("Existing chunks and blobs associated with snapshot", "path", path) - } else { - // File changed or is not a regular file - result.BytesScanned += info.Size() - if s.progress != nil { - s.progress.GetStats().BytesScanned.Add(info.Size()) - } - } - - return file, fileChanged, nil -} - -// hasFileChanged determines if a file has changed since last backup -func (s *Scanner) hasFileChanged(existingFile, newFile *database.File) bool { - // Check if any metadata has changed - if existingFile.Size != newFile.Size { - return true - } - if existingFile.MTime.Unix() != newFile.MTime.Unix() { - return true - } - if existingFile.Mode != newFile.Mode { - return true - } - if existingFile.UID != newFile.UID { - return true - } - if existingFile.GID != newFile.GID { - return true - } - if existingFile.LinkTarget != newFile.LinkTarget { - return true - } - return false -} - -// associateExistingChunks links existing chunks from an unchanged file to the current snapshot -func (s *Scanner) associateExistingChunks(ctx context.Context, path string) error { - log.Debug("associateExistingChunks start", "path", path) - - // Get existing file chunks (no transaction needed for read) - log.Debug("Querying database for file's chunk associations", "path", path) - fileChunks, err := s.repos.FileChunks.GetByFile(ctx, path) - if err != nil { - return fmt.Errorf("getting existing file chunks: %w", err) - } - log.Debug("Retrieved file chunk associations from database", "path", path, "count", len(fileChunks)) - - // Collect unique blob IDs that need to be added to snapshot - blobsToAdd := make(map[string]string) // blob ID -> blob hash - for i, fc := range fileChunks { - log.Debug("Looking up blob containing chunk", "path", path, "chunk_index", i, "chunk_hash", fc.ChunkHash) - - // Find which blob contains this chunk (no transaction needed for read) - log.Debug("Querying database for blob containing chunk", "chunk_hash", fc.ChunkHash) - blobChunk, err := s.repos.BlobChunks.GetByChunkHash(ctx, fc.ChunkHash) - if err != nil { - return fmt.Errorf("finding blob for chunk %s: %w", fc.ChunkHash, err) - } - if blobChunk == nil { - log.Warn("Chunk record exists in database but not associated with any blob", "chunk", fc.ChunkHash, "file", path) - continue - } - log.Debug("Found blob record containing chunk", "chunk_hash", fc.ChunkHash, "blob_id", blobChunk.BlobID) - - // Track blob ID for later processing - if _, exists := blobsToAdd[blobChunk.BlobID]; !exists { - blobsToAdd[blobChunk.BlobID] = "" // We'll get the hash later - } - } - - // Now get blob hashes outside of transaction operations - for blobID := range blobsToAdd { - blob, err := s.repos.Blobs.GetByID(ctx, blobID) - if err != nil { - return fmt.Errorf("getting blob %s: %w", blobID, err) - } - if blob == nil { - log.Warn("Blob record missing from database", "blob_id", blobID) - delete(blobsToAdd, blobID) - continue - } - blobsToAdd[blobID] = blob.Hash - } - - // Add blobs to snapshot using short transactions - for blobID, blobHash := range blobsToAdd { - log.Debug("Adding blob reference to snapshot association", "blob_id", blobID, "blob_hash", blobHash, "snapshot", s.snapshotID) - err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { - return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, blobHash) - }) - if err != nil { - return fmt.Errorf("adding existing blob to snapshot: %w", err) - } - log.Debug("Created snapshot-blob association in database", "blob_id", blobID) - } - - log.Debug("associateExistingChunks complete", "path", path, "blobs_processed", len(blobsToAdd)) - return nil -} - // handleBlobReady is called by the packer when a blob is finalized func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { startTime := time.Now().UTC() @@ -573,7 +719,7 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed) } - // Upload to S3 first (without holding any locks) + // Upload to storage first (without holding any locks) // Use scan context for cancellation support ctx := s.scanCtx if ctx == nil { @@ -585,7 +731,6 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { lastProgressBytes := int64(0) progressCallback := func(uploaded int64) error { - // Calculate instantaneous speed now := time.Now() elapsed := now.Sub(lastProgressTime).Seconds() @@ -612,19 +757,29 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { // Create sharded path: blobs/ca/fe/cafebabe... blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash) - if err := s.s3Client.PutObjectWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { - return fmt.Errorf("uploading blob %s to S3: %w", finishedBlob.Hash, err) + if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { + return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err) } uploadDuration := time.Since(startTime) + // Calculate upload speed + uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds() + + // Print blob stored message + fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n", + finishedBlob.Hash[:12]+"...", + humanize.Bytes(uint64(finishedBlob.Compressed)), + humanize.Bytes(uint64(uploadSpeedBps)), + uploadDuration.Round(time.Millisecond)) + // Log upload stats - uploadSpeed := float64(finishedBlob.Compressed) * 8 / uploadDuration.Seconds() // bits per second - log.Info("Successfully uploaded blob to S3 storage", + uploadSpeedBits := uploadSpeedBps * 8 // bits per second + log.Info("Successfully uploaded blob to storage", "path", blobPath, "size", humanize.Bytes(uint64(finishedBlob.Compressed)), "duration", uploadDuration, - "speed", humanize.SI(uploadSpeed, "bps")) + "speed", humanize.SI(uploadSpeedBits, "bps")) // Report upload complete if s.progress != nil { @@ -718,28 +873,24 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT "hash", chunk.Hash, "size", chunk.Size) - // Check if chunk already exists (outside of transaction) - existing, err := s.repos.Chunks.GetByHash(ctx, chunk.Hash) - if err != nil { - return fmt.Errorf("checking chunk existence: %w", err) - } - chunkExists := (existing != nil) + // Check if chunk already exists (fast in-memory lookup) + chunkExists := s.chunkExists(chunk.Hash) - // Store chunk if new + // Store chunk in database if new (must happen before packer.AddChunk + // because packer creates blob_chunk entries that reference chunks) if !chunkExists { err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { dbChunk := &database.Chunk{ ChunkHash: chunk.Hash, Size: chunk.Size, } - if err := s.repos.Chunks.Create(txCtx, tx, dbChunk); err != nil { - return fmt.Errorf("creating chunk: %w", err) - } - return nil + return s.repos.Chunks.Create(txCtx, tx, dbChunk) }) if err != nil { - return fmt.Errorf("storing chunk: %w", err) + return fmt.Errorf("creating chunk: %w", err) } + // Add to in-memory cache for fast duplicate detection + s.addKnownChunk(chunk.Hash) } // Track file chunk association for later storage @@ -815,44 +966,30 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT "file_hash", fileHash, "chunks", len(chunks)) - // Store file-chunk associations and chunk-file mappings in database - err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { - // First, delete all existing file_chunks and chunk_files for this file - // This ensures old chunks are no longer associated when file content changes - if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil { - return fmt.Errorf("deleting old file chunks: %w", err) + // Build file data for batch insertion + // Update chunk associations with the file ID + fileChunks := make([]database.FileChunk, len(chunks)) + chunkFiles := make([]database.ChunkFile, len(chunks)) + for i, ci := range chunks { + fileChunks[i] = database.FileChunk{ + FileID: fileToProcess.File.ID, + Idx: ci.fileChunk.Idx, + ChunkHash: ci.fileChunk.ChunkHash, } - if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil { - return fmt.Errorf("deleting old chunk files: %w", err) + chunkFiles[i] = database.ChunkFile{ + ChunkHash: ci.fileChunk.ChunkHash, + FileID: fileToProcess.File.ID, + FileOffset: ci.offset, + Length: ci.size, } + } - for _, ci := range chunks { - // Create file-chunk mapping - if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil { - return fmt.Errorf("creating file chunk: %w", err) - } - - // Create chunk-file mapping - chunkFile := &database.ChunkFile{ - ChunkHash: ci.fileChunk.ChunkHash, - FileID: fileToProcess.File.ID, - FileOffset: ci.offset, - Length: ci.size, - } - if err := s.repos.ChunkFiles.Create(txCtx, tx, chunkFile); err != nil { - return fmt.Errorf("creating chunk file: %w", err) - } - } - - // Add file to snapshot - if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, fileToProcess.File.ID); err != nil { - return fmt.Errorf("adding file to snapshot: %w", err) - } - - return nil + // Queue file for batch insertion + return s.addPendingFile(ctx, pendingFileData{ + file: fileToProcess.File, + fileChunks: fileChunks, + chunkFiles: chunkFiles, }) - - return err } // GetProgress returns the progress reporter for this scanner @@ -860,25 +997,35 @@ func (s *Scanner) GetProgress() *ProgressReporter { return s.progress } -// detectDeletedFiles finds files that existed in previous snapshots but no longer exist -func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, result *ScanResult) error { - // Get all files with this path prefix from the database - files, err := s.repos.Files.ListByPrefix(ctx, path) - if err != nil { - return fmt.Errorf("listing files by prefix: %w", err) +// detectDeletedFilesFromMap finds files that existed in previous snapshots but no longer exist +// Uses pre-loaded maps to avoid any filesystem or database access +func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[string]*database.File, existingFiles map[string]struct{}, result *ScanResult) error { + if len(knownFiles) == 0 { + return nil } - for _, file := range files { - // Check if the file still exists on disk - _, err := s.fs.Stat(file.Path) - if os.IsNotExist(err) { + // Check each known file against the enumerated set (no filesystem access needed) + for path, file := range knownFiles { + // Check context cancellation periodically + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Check if the file exists in our enumerated set + if _, exists := existingFiles[path]; !exists { // File has been deleted result.FilesDeleted++ result.BytesDeleted += file.Size - log.Debug("Detected deleted file", "path", file.Path, "size", file.Size) + log.Debug("Detected deleted file", "path", path, "size", file.Size) } } + if result.FilesDeleted > 0 { + fmt.Printf("Found %s deleted files\n", formatNumber(result.FilesDeleted)) + } + return nil } @@ -889,3 +1036,17 @@ func formatNumber(n int) string { } return humanize.Comma(int64(n)) } + +// formatCompact formats a number compactly with k/M suffixes (e.g., 5.7k, 1.2M) +func formatCompact(n int) string { + if n < 1000 { + return fmt.Sprintf("%d", n) + } + if n < 10000 { + return fmt.Sprintf("%.1fk", float64(n)/1000) + } + if n < 1000000 { + return fmt.Sprintf("%.0fk", float64(n)/1000) + } + return fmt.Sprintf("%.1fM", float64(n)/1000000) +} diff --git a/internal/snapshot/scanner_test.go b/internal/snapshot/scanner_test.go index be6e0aa..19dbc20 100644 --- a/internal/snapshot/scanner_test.go +++ b/internal/snapshot/scanner_test.go @@ -99,26 +99,25 @@ func TestScannerSimpleDirectory(t *testing.T) { t.Fatalf("scan failed: %v", err) } - // Verify results - // We now scan 6 files + 3 directories (source, subdir, subdir2) = 9 entries - if result.FilesScanned != 9 { - t.Errorf("expected 9 entries scanned, got %d", result.FilesScanned) + // Verify results - we only scan regular files, not directories + if result.FilesScanned != 6 { + t.Errorf("expected 6 files scanned, got %d", result.FilesScanned) } - // Directories have their own sizes, so the total will be more than just file content + // Total bytes should be the sum of all file contents if result.BytesScanned < 97 { // At minimum we have 97 bytes of file content t.Errorf("expected at least 97 bytes scanned, got %d", result.BytesScanned) } - // Verify files in database + // Verify files in database - only regular files are stored files, err := repos.Files.ListByPrefix(ctx, "/source") if err != nil { t.Fatalf("failed to list files: %v", err) } - // We should have 6 files + 3 directories = 9 entries - if len(files) != 9 { - t.Errorf("expected 9 entries in database, got %d", len(files)) + // We should have 6 files (directories are not stored) + if len(files) != 6 { + t.Errorf("expected 6 files in database, got %d", len(files)) } // Verify specific file @@ -235,9 +234,9 @@ func TestScannerLargeFile(t *testing.T) { t.Fatalf("scan failed: %v", err) } - // We scan 1 file + 1 directory = 2 entries - if result.FilesScanned != 2 { - t.Errorf("expected 2 entries scanned, got %d", result.FilesScanned) + // We scan only regular files, not directories + if result.FilesScanned != 1 { + t.Errorf("expected 1 file scanned, got %d", result.FilesScanned) } // The file size should be at least 1MB diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go index 5a19b94..9e6052a 100644 --- a/internal/snapshot/snapshot.go +++ b/internal/snapshot/snapshot.go @@ -52,7 +52,7 @@ import ( "git.eeqj.de/sneak/vaultik/internal/config" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/dustin/go-humanize" "github.com/spf13/afero" "go.uber.org/fx" @@ -60,27 +60,27 @@ import ( // SnapshotManager handles snapshot creation and metadata export type SnapshotManager struct { - repos *database.Repositories - s3Client S3Client - config *config.Config - fs afero.Fs + repos *database.Repositories + storage storage.Storer + config *config.Config + fs afero.Fs } // SnapshotManagerParams holds dependencies for NewSnapshotManager type SnapshotManagerParams struct { fx.In - Repos *database.Repositories - S3Client *s3.Client - Config *config.Config + Repos *database.Repositories + Storage storage.Storer + Config *config.Config } // NewSnapshotManager creates a new snapshot manager for dependency injection func NewSnapshotManager(params SnapshotManagerParams) *SnapshotManager { return &SnapshotManager{ - repos: params.Repos, - s3Client: params.S3Client, - config: params.Config, + repos: params.Repos, + storage: params.Storage, + config: params.Config, } } @@ -268,7 +268,7 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st dbKey := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID) dbUploadStart := time.Now() - if err := sm.s3Client.PutObject(ctx, dbKey, bytes.NewReader(finalData)); err != nil { + if err := sm.storage.Put(ctx, dbKey, bytes.NewReader(finalData)); err != nil { return fmt.Errorf("uploading snapshot database: %w", err) } dbUploadDuration := time.Since(dbUploadStart) @@ -282,7 +282,7 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st // Upload blob manifest (compressed only, not encrypted) manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) manifestUploadStart := time.Now() - if err := sm.s3Client.PutObject(ctx, manifestKey, bytes.NewReader(blobManifest)); err != nil { + if err := sm.storage.Put(ctx, manifestKey, bytes.NewReader(blobManifest)); err != nil { return fmt.Errorf("uploading blob manifest: %w", err) } manifestUploadDuration := time.Since(manifestUploadStart) @@ -635,11 +635,11 @@ func (sm *SnapshotManager) CleanupIncompleteSnapshots(ctx context.Context, hostn log.Info("Found incomplete snapshots", "count", len(incompleteSnapshots)) - // Check each incomplete snapshot for metadata in S3 + // Check each incomplete snapshot for metadata in storage for _, snapshot := range incompleteSnapshots { - // Check if metadata exists in S3 + // Check if metadata exists in storage metadataKey := fmt.Sprintf("metadata/%s/db.zst", snapshot.ID) - _, err := sm.s3Client.StatObject(ctx, metadataKey) + _, err := sm.storage.Stat(ctx, metadataKey) if err != nil { // Metadata doesn't exist in S3 - this is an incomplete snapshot @@ -676,6 +676,11 @@ func (sm *SnapshotManager) deleteSnapshot(ctx context.Context, snapshotID string return fmt.Errorf("deleting snapshot blobs: %w", err) } + // Delete uploads entries (has foreign key to snapshots without CASCADE) + if err := sm.repos.Snapshots.DeleteSnapshotUploads(ctx, snapshotID); err != nil { + return fmt.Errorf("deleting snapshot uploads: %w", err) + } + // Delete the snapshot itself if err := sm.repos.Snapshots.Delete(ctx, snapshotID); err != nil { return fmt.Errorf("deleting snapshot: %w", err) diff --git a/internal/storage/file.go b/internal/storage/file.go new file mode 100644 index 0000000..a5b3ae8 --- /dev/null +++ b/internal/storage/file.go @@ -0,0 +1,262 @@ +package storage + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/spf13/afero" +) + +// FileStorer implements Storer using the local filesystem. +// It mirrors the S3 path structure for consistency. +type FileStorer struct { + fs afero.Fs + basePath string +} + +// NewFileStorer creates a new filesystem storage backend. +// The basePath directory will be created if it doesn't exist. +// Uses the real OS filesystem by default; call SetFilesystem to override for testing. +func NewFileStorer(basePath string) (*FileStorer, error) { + fs := afero.NewOsFs() + // Ensure base path exists + if err := fs.MkdirAll(basePath, 0755); err != nil { + return nil, fmt.Errorf("creating base path: %w", err) + } + return &FileStorer{ + fs: fs, + basePath: basePath, + }, nil +} + +// SetFilesystem overrides the filesystem for testing. +func (f *FileStorer) SetFilesystem(fs afero.Fs) { + f.fs = fs +} + +// fullPath returns the full filesystem path for a key. +func (f *FileStorer) fullPath(key string) string { + return filepath.Join(f.basePath, key) +} + +// Put stores data at the specified key. +func (f *FileStorer) Put(ctx context.Context, key string, data io.Reader) error { + path := f.fullPath(key) + + // Create parent directories + dir := filepath.Dir(path) + if err := f.fs.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("creating directories: %w", err) + } + + file, err := f.fs.Create(path) + if err != nil { + return fmt.Errorf("creating file: %w", err) + } + defer func() { _ = file.Close() }() + + if _, err := io.Copy(file, data); err != nil { + return fmt.Errorf("writing file: %w", err) + } + + return nil +} + +// PutWithProgress stores data with progress reporting. +func (f *FileStorer) PutWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress ProgressCallback) error { + path := f.fullPath(key) + + // Create parent directories + dir := filepath.Dir(path) + if err := f.fs.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("creating directories: %w", err) + } + + file, err := f.fs.Create(path) + if err != nil { + return fmt.Errorf("creating file: %w", err) + } + defer func() { _ = file.Close() }() + + // Wrap with progress tracking + pw := &progressWriter{ + writer: file, + callback: progress, + } + + if _, err := io.Copy(pw, data); err != nil { + return fmt.Errorf("writing file: %w", err) + } + + return nil +} + +// Get retrieves data from the specified key. +func (f *FileStorer) Get(ctx context.Context, key string) (io.ReadCloser, error) { + path := f.fullPath(key) + file, err := f.fs.Open(path) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("opening file: %w", err) + } + return file, nil +} + +// Stat returns metadata about an object without retrieving its contents. +func (f *FileStorer) Stat(ctx context.Context, key string) (*ObjectInfo, error) { + path := f.fullPath(key) + info, err := f.fs.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("stat file: %w", err) + } + return &ObjectInfo{ + Key: key, + Size: info.Size(), + }, nil +} + +// Delete removes an object. +func (f *FileStorer) Delete(ctx context.Context, key string) error { + path := f.fullPath(key) + err := f.fs.Remove(path) + if os.IsNotExist(err) { + return nil // Match S3 behavior: no error if doesn't exist + } + if err != nil { + return fmt.Errorf("removing file: %w", err) + } + return nil +} + +// List returns all keys with the given prefix. +func (f *FileStorer) List(ctx context.Context, prefix string) ([]string, error) { + var keys []string + basePath := f.fullPath(prefix) + + // Check if base path exists + exists, err := afero.Exists(f.fs, basePath) + if err != nil { + return nil, fmt.Errorf("checking path: %w", err) + } + if !exists { + return keys, nil // Empty list for non-existent prefix + } + + err = afero.Walk(f.fs, basePath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + // Check context cancellation + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if !info.IsDir() { + // Convert back to key (relative path from basePath) + relPath, err := filepath.Rel(f.basePath, path) + if err != nil { + return fmt.Errorf("computing relative path: %w", err) + } + // Normalize path separators to forward slashes for consistency + relPath = strings.ReplaceAll(relPath, string(filepath.Separator), "/") + keys = append(keys, relPath) + } + return nil + }) + + if err != nil { + return nil, fmt.Errorf("walking directory: %w", err) + } + + return keys, nil +} + +// ListStream returns a channel of ObjectInfo for large result sets. +func (f *FileStorer) ListStream(ctx context.Context, prefix string) <-chan ObjectInfo { + ch := make(chan ObjectInfo) + go func() { + defer close(ch) + basePath := f.fullPath(prefix) + + // Check if base path exists + exists, err := afero.Exists(f.fs, basePath) + if err != nil { + ch <- ObjectInfo{Err: fmt.Errorf("checking path: %w", err)} + return + } + if !exists { + return // Empty channel for non-existent prefix + } + + _ = afero.Walk(f.fs, basePath, func(path string, info os.FileInfo, err error) error { + // Check context cancellation + select { + case <-ctx.Done(): + ch <- ObjectInfo{Err: ctx.Err()} + return ctx.Err() + default: + } + + if err != nil { + ch <- ObjectInfo{Err: err} + return nil // Continue walking despite errors + } + + if !info.IsDir() { + relPath, err := filepath.Rel(f.basePath, path) + if err != nil { + ch <- ObjectInfo{Err: fmt.Errorf("computing relative path: %w", err)} + return nil + } + // Normalize path separators + relPath = strings.ReplaceAll(relPath, string(filepath.Separator), "/") + ch <- ObjectInfo{ + Key: relPath, + Size: info.Size(), + } + } + return nil + }) + }() + return ch +} + +// Info returns human-readable storage location information. +func (f *FileStorer) Info() StorageInfo { + return StorageInfo{ + Type: "file", + Location: f.basePath, + } +} + +// progressWriter wraps an io.Writer to track write progress. +type progressWriter struct { + writer io.Writer + written int64 + callback ProgressCallback +} + +func (pw *progressWriter) Write(p []byte) (int, error) { + n, err := pw.writer.Write(p) + if n > 0 { + pw.written += int64(n) + if pw.callback != nil { + if callbackErr := pw.callback(pw.written); callbackErr != nil { + return n, callbackErr + } + } + } + return n, err +} diff --git a/internal/storage/module.go b/internal/storage/module.go new file mode 100644 index 0000000..12b6d05 --- /dev/null +++ b/internal/storage/module.go @@ -0,0 +1,110 @@ +package storage + +import ( + "context" + "fmt" + "strings" + + "git.eeqj.de/sneak/vaultik/internal/config" + "git.eeqj.de/sneak/vaultik/internal/s3" + "go.uber.org/fx" +) + +// Module exports storage functionality as an fx module. +// It provides a Storer implementation based on the configured storage URL +// or falls back to legacy S3 configuration. +var Module = fx.Module("storage", + fx.Provide(NewStorer), +) + +// NewStorer creates a Storer based on configuration. +// If StorageURL is set, it uses URL-based configuration. +// Otherwise, it falls back to legacy S3 configuration. +func NewStorer(cfg *config.Config) (Storer, error) { + if cfg.StorageURL != "" { + return storerFromURL(cfg.StorageURL, cfg) + } + return storerFromLegacyS3Config(cfg) +} + +func storerFromURL(rawURL string, cfg *config.Config) (Storer, error) { + parsed, err := ParseStorageURL(rawURL) + if err != nil { + return nil, fmt.Errorf("parsing storage URL: %w", err) + } + + switch parsed.Scheme { + case "file": + return NewFileStorer(parsed.Prefix) + + case "s3": + // Build endpoint URL + endpoint := parsed.Endpoint + if endpoint == "" { + endpoint = "s3.amazonaws.com" + } + + // Add protocol if not present + if parsed.UseSSL && !strings.HasPrefix(endpoint, "https://") && !strings.HasPrefix(endpoint, "http://") { + endpoint = "https://" + endpoint + } else if !parsed.UseSSL && !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { + endpoint = "http://" + endpoint + } + + region := parsed.Region + if region == "" { + region = cfg.S3.Region + if region == "" { + region = "us-east-1" + } + } + + // Credentials come from config (not URL for security) + client, err := s3.NewClient(context.Background(), s3.Config{ + Endpoint: endpoint, + Bucket: parsed.Bucket, + Prefix: parsed.Prefix, + AccessKeyID: cfg.S3.AccessKeyID, + SecretAccessKey: cfg.S3.SecretAccessKey, + Region: region, + }) + if err != nil { + return nil, fmt.Errorf("creating S3 client: %w", err) + } + return NewS3Storer(client), nil + + default: + return nil, fmt.Errorf("unsupported storage scheme: %s", parsed.Scheme) + } +} + +func storerFromLegacyS3Config(cfg *config.Config) (Storer, error) { + endpoint := cfg.S3.Endpoint + + // Ensure protocol is present + if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { + if cfg.S3.UseSSL { + endpoint = "https://" + endpoint + } else { + endpoint = "http://" + endpoint + } + } + + region := cfg.S3.Region + if region == "" { + region = "us-east-1" + } + + client, err := s3.NewClient(context.Background(), s3.Config{ + Endpoint: endpoint, + Bucket: cfg.S3.Bucket, + Prefix: cfg.S3.Prefix, + AccessKeyID: cfg.S3.AccessKeyID, + SecretAccessKey: cfg.S3.SecretAccessKey, + Region: region, + }) + if err != nil { + return nil, fmt.Errorf("creating S3 client: %w", err) + } + return NewS3Storer(client), nil +} diff --git a/internal/storage/s3.go b/internal/storage/s3.go new file mode 100644 index 0000000..9d648fa --- /dev/null +++ b/internal/storage/s3.go @@ -0,0 +1,85 @@ +package storage + +import ( + "context" + "fmt" + "io" + + "git.eeqj.de/sneak/vaultik/internal/s3" +) + +// S3Storer wraps the existing s3.Client to implement Storer. +type S3Storer struct { + client *s3.Client +} + +// NewS3Storer creates a new S3 storage backend. +func NewS3Storer(client *s3.Client) *S3Storer { + return &S3Storer{client: client} +} + +// Put stores data at the specified key. +func (s *S3Storer) Put(ctx context.Context, key string, data io.Reader) error { + return s.client.PutObject(ctx, key, data) +} + +// PutWithProgress stores data with progress reporting. +func (s *S3Storer) PutWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress ProgressCallback) error { + // Convert storage.ProgressCallback to s3.ProgressCallback + var s3Progress s3.ProgressCallback + if progress != nil { + s3Progress = s3.ProgressCallback(progress) + } + return s.client.PutObjectWithProgress(ctx, key, data, size, s3Progress) +} + +// Get retrieves data from the specified key. +func (s *S3Storer) Get(ctx context.Context, key string) (io.ReadCloser, error) { + return s.client.GetObject(ctx, key) +} + +// Stat returns metadata about an object without retrieving its contents. +func (s *S3Storer) Stat(ctx context.Context, key string) (*ObjectInfo, error) { + info, err := s.client.StatObject(ctx, key) + if err != nil { + return nil, err + } + return &ObjectInfo{ + Key: info.Key, + Size: info.Size, + }, nil +} + +// Delete removes an object. +func (s *S3Storer) Delete(ctx context.Context, key string) error { + return s.client.DeleteObject(ctx, key) +} + +// List returns all keys with the given prefix. +func (s *S3Storer) List(ctx context.Context, prefix string) ([]string, error) { + return s.client.ListObjects(ctx, prefix) +} + +// ListStream returns a channel of ObjectInfo for large result sets. +func (s *S3Storer) ListStream(ctx context.Context, prefix string) <-chan ObjectInfo { + ch := make(chan ObjectInfo) + go func() { + defer close(ch) + for info := range s.client.ListObjectsStream(ctx, prefix, false) { + ch <- ObjectInfo{ + Key: info.Key, + Size: info.Size, + Err: info.Err, + } + } + }() + return ch +} + +// Info returns human-readable storage location information. +func (s *S3Storer) Info() StorageInfo { + return StorageInfo{ + Type: "s3", + Location: fmt.Sprintf("%s/%s", s.client.Endpoint(), s.client.BucketName()), + } +} diff --git a/internal/storage/storer.go b/internal/storage/storer.go new file mode 100644 index 0000000..9cd4e25 --- /dev/null +++ b/internal/storage/storer.go @@ -0,0 +1,74 @@ +// Package storage provides a unified interface for storage backends. +// It supports both S3-compatible object storage and local filesystem storage, +// allowing Vaultik to store backups in either location with the same API. +// +// Storage backends are selected via URL: +// - s3://bucket/prefix?endpoint=host®ion=r - S3-compatible storage +// - file:///path/to/backup - Local filesystem storage +// +// Both backends implement the Storer interface and support progress reporting +// during upload/write operations. +package storage + +import ( + "context" + "errors" + "io" +) + +// ErrNotFound is returned when an object does not exist. +var ErrNotFound = errors.New("object not found") + +// ProgressCallback is called during storage operations with bytes transferred so far. +// Return an error to cancel the operation. +type ProgressCallback func(bytesTransferred int64) error + +// ObjectInfo contains metadata about a stored object. +type ObjectInfo struct { + Key string // Object key/path + Size int64 // Size in bytes + Err error // Error for streaming results (nil on success) +} + +// StorageInfo provides human-readable storage configuration. +type StorageInfo struct { + Type string // "s3" or "file" + Location string // endpoint/bucket for S3, base path for filesystem +} + +// Storer defines the interface for storage backends. +// All paths are relative to the storage root (bucket/prefix for S3, base directory for filesystem). +type Storer interface { + // Put stores data at the specified key. + // Parent directories are created automatically for filesystem backends. + Put(ctx context.Context, key string, data io.Reader) error + + // PutWithProgress stores data with progress reporting. + // Size must be the exact size of the data to store. + // The progress callback is called periodically with bytes transferred. + PutWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress ProgressCallback) error + + // Get retrieves data from the specified key. + // The caller must close the returned ReadCloser. + // Returns ErrNotFound if the object does not exist. + Get(ctx context.Context, key string) (io.ReadCloser, error) + + // Stat returns metadata about an object without retrieving its contents. + // Returns ErrNotFound if the object does not exist. + Stat(ctx context.Context, key string) (*ObjectInfo, error) + + // Delete removes an object. No error is returned if the object doesn't exist. + Delete(ctx context.Context, key string) error + + // List returns all keys with the given prefix. + // For large result sets, prefer ListStream. + List(ctx context.Context, prefix string) ([]string, error) + + // ListStream returns a channel of ObjectInfo for large result sets. + // The channel is closed when listing completes. + // If an error occurs during listing, the final item will have Err set. + ListStream(ctx context.Context, prefix string) <-chan ObjectInfo + + // Info returns human-readable storage location information. + Info() StorageInfo +} diff --git a/internal/storage/url.go b/internal/storage/url.go new file mode 100644 index 0000000..900890d --- /dev/null +++ b/internal/storage/url.go @@ -0,0 +1,90 @@ +package storage + +import ( + "fmt" + "net/url" + "strings" +) + +// StorageURL represents a parsed storage URL. +type StorageURL struct { + Scheme string // "s3" or "file" + Bucket string // S3 bucket name (empty for file) + Prefix string // Path within bucket or filesystem base path + Endpoint string // S3 endpoint (optional, default AWS) + Region string // S3 region (optional) + UseSSL bool // Use HTTPS for S3 (default true) +} + +// ParseStorageURL parses a storage URL string. +// Supported formats: +// - s3://bucket/prefix?endpoint=host®ion=us-east-1&ssl=true +// - file:///absolute/path/to/backup +func ParseStorageURL(rawURL string) (*StorageURL, error) { + if rawURL == "" { + return nil, fmt.Errorf("storage URL is empty") + } + + // Handle file:// URLs + if strings.HasPrefix(rawURL, "file://") { + path := strings.TrimPrefix(rawURL, "file://") + if path == "" { + return nil, fmt.Errorf("file URL path is empty") + } + return &StorageURL{ + Scheme: "file", + Prefix: path, + }, nil + } + + // Handle s3:// URLs + if strings.HasPrefix(rawURL, "s3://") { + u, err := url.Parse(rawURL) + if err != nil { + return nil, fmt.Errorf("invalid URL: %w", err) + } + + bucket := u.Host + if bucket == "" { + return nil, fmt.Errorf("s3 URL missing bucket name") + } + + prefix := strings.TrimPrefix(u.Path, "/") + + query := u.Query() + useSSL := true + if query.Get("ssl") == "false" { + useSSL = false + } + + return &StorageURL{ + Scheme: "s3", + Bucket: bucket, + Prefix: prefix, + Endpoint: query.Get("endpoint"), + Region: query.Get("region"), + UseSSL: useSSL, + }, nil + } + + return nil, fmt.Errorf("unsupported URL scheme: must start with s3:// or file://") +} + +// String returns a human-readable representation of the storage URL. +func (u *StorageURL) String() string { + switch u.Scheme { + case "file": + return fmt.Sprintf("file://%s", u.Prefix) + case "s3": + endpoint := u.Endpoint + if endpoint == "" { + endpoint = "s3.amazonaws.com" + } + if u.Prefix != "" { + return fmt.Sprintf("s3://%s/%s (endpoint: %s)", u.Bucket, u.Prefix, endpoint) + } + return fmt.Sprintf("s3://%s (endpoint: %s)", u.Bucket, endpoint) + default: + return fmt.Sprintf("%s://?", u.Scheme) + } +} diff --git a/internal/vaultik/integration_test.go b/internal/vaultik/integration_test.go index d99186f..92b03b1 100644 --- a/internal/vaultik/integration_test.go +++ b/internal/vaultik/integration_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "database/sql" - "fmt" "io" "sync" "testing" @@ -13,100 +12,122 @@ import ( "git.eeqj.de/sneak/vaultik/internal/config" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/log" - "git.eeqj.de/sneak/vaultik/internal/s3" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// MockS3Client implements a mock S3 client for testing -type MockS3Client struct { - mu sync.Mutex - storage map[string][]byte - calls []string +// MockStorer implements storage.Storer for testing +type MockStorer struct { + mu sync.Mutex + data map[string][]byte + calls []string } -func NewMockS3Client() *MockS3Client { - return &MockS3Client{ - storage: make(map[string][]byte), - calls: make([]string, 0), +func NewMockStorer() *MockStorer { + return &MockStorer{ + data: make(map[string][]byte), + calls: make([]string, 0), } } -func (m *MockS3Client) PutObject(ctx context.Context, key string, reader io.Reader) error { +func (m *MockStorer) Put(ctx context.Context, key string, reader io.Reader) error { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "PutObject:"+key) + m.calls = append(m.calls, "Put:"+key) data, err := io.ReadAll(reader) if err != nil { return err } - m.storage[key] = data + m.data[key] = data return nil } -func (m *MockS3Client) PutObjectWithProgress(ctx context.Context, key string, reader io.Reader, size int64, progress s3.ProgressCallback) error { - // For testing, just call PutObject - return m.PutObject(ctx, key, reader) +func (m *MockStorer) PutWithProgress(ctx context.Context, key string, reader io.Reader, size int64, progress storage.ProgressCallback) error { + return m.Put(ctx, key, reader) } -func (m *MockS3Client) GetObject(ctx context.Context, key string) (io.ReadCloser, error) { +func (m *MockStorer) Get(ctx context.Context, key string) (io.ReadCloser, error) { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "GetObject:"+key) - data, exists := m.storage[key] + m.calls = append(m.calls, "Get:"+key) + data, exists := m.data[key] if !exists { - return nil, fmt.Errorf("key not found: %s", key) + return nil, storage.ErrNotFound } return io.NopCloser(bytes.NewReader(data)), nil } -func (m *MockS3Client) StatObject(ctx context.Context, key string) (*s3.ObjectInfo, error) { +func (m *MockStorer) Stat(ctx context.Context, key string) (*storage.ObjectInfo, error) { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "StatObject:"+key) - data, exists := m.storage[key] + m.calls = append(m.calls, "Stat:"+key) + data, exists := m.data[key] if !exists { - return nil, fmt.Errorf("key not found: %s", key) + return nil, storage.ErrNotFound } - return &s3.ObjectInfo{ + return &storage.ObjectInfo{ Key: key, Size: int64(len(data)), }, nil } -func (m *MockS3Client) DeleteObject(ctx context.Context, key string) error { +func (m *MockStorer) Delete(ctx context.Context, key string) error { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "DeleteObject:"+key) - delete(m.storage, key) + m.calls = append(m.calls, "Delete:"+key) + delete(m.data, key) return nil } -func (m *MockS3Client) ListObjects(ctx context.Context, prefix string) ([]*s3.ObjectInfo, error) { +func (m *MockStorer) List(ctx context.Context, prefix string) ([]string, error) { m.mu.Lock() defer m.mu.Unlock() - m.calls = append(m.calls, "ListObjects:"+prefix) - var objects []*s3.ObjectInfo - for key, data := range m.storage { + m.calls = append(m.calls, "List:"+prefix) + var keys []string + for key := range m.data { if len(prefix) == 0 || (len(key) >= len(prefix) && key[:len(prefix)] == prefix) { - objects = append(objects, &s3.ObjectInfo{ - Key: key, - Size: int64(len(data)), - }) + keys = append(keys, key) } } - return objects, nil + return keys, nil } -// GetCalls returns the list of S3 operations that were called -func (m *MockS3Client) GetCalls() []string { +func (m *MockStorer) ListStream(ctx context.Context, prefix string) <-chan storage.ObjectInfo { + ch := make(chan storage.ObjectInfo) + go func() { + defer close(ch) + m.mu.Lock() + defer m.mu.Unlock() + + for key, data := range m.data { + if len(prefix) == 0 || (len(key) >= len(prefix) && key[:len(prefix)] == prefix) { + ch <- storage.ObjectInfo{ + Key: key, + Size: int64(len(data)), + } + } + } + }() + return ch +} + +func (m *MockStorer) Info() storage.StorageInfo { + return storage.StorageInfo{ + Type: "mock", + Location: "memory", + } +} + +// GetCalls returns the list of operations that were called +func (m *MockStorer) GetCalls() []string { m.mu.Lock() defer m.mu.Unlock() @@ -116,11 +137,11 @@ func (m *MockS3Client) GetCalls() []string { } // GetStorageSize returns the number of objects in storage -func (m *MockS3Client) GetStorageSize() int { +func (m *MockStorer) GetStorageSize() int { m.mu.Lock() defer m.mu.Unlock() - return len(m.storage) + return len(m.data) } // TestEndToEndBackup tests the full backup workflow with mocked dependencies @@ -158,8 +179,8 @@ func TestEndToEndBackup(t *testing.T) { } } - // Create mock S3 client - mockS3 := NewMockS3Client() + // Create mock storage + mockStorage := NewMockStorer() // Create test configuration cfg := &config.Config{ @@ -181,7 +202,7 @@ func TestEndToEndBackup(t *testing.T) { } // For a true end-to-end test, we'll create a simpler test that focuses on - // the core backup logic using the scanner directly with our mock S3 client + // the core backup logic using the scanner directly with our mock storage ctx := context.Background() // Create in-memory database @@ -195,12 +216,12 @@ func TestEndToEndBackup(t *testing.T) { repos := database.NewRepositories(db) - // Create scanner with mock S3 client + // Create scanner with mock storage scanner := snapshot.NewScanner(snapshot.ScannerConfig{ FS: fs, ChunkSize: cfg.ChunkSize.Int64(), Repositories: repos, - S3Client: mockS3, + Storage: mockStorage, MaxBlobSize: cfg.BlobSizeLimit.Int64(), CompressionLevel: cfg.CompressionLevel, AgeRecipients: cfg.AgeRecipients, @@ -232,15 +253,15 @@ func TestEndToEndBackup(t *testing.T) { assert.Greater(t, result.ChunksCreated, 0, "Should create chunks") assert.Greater(t, result.BlobsCreated, 0, "Should create blobs") - // Verify S3 operations - calls := mockS3.GetCalls() - t.Logf("S3 operations performed: %v", calls) + // Verify storage operations + calls := mockStorage.GetCalls() + t.Logf("Storage operations performed: %v", calls) // Should have uploaded at least one blob blobUploads := 0 for _, call := range calls { - if len(call) > 10 && call[:10] == "PutObject:" { - if len(call) > 16 && call[10:16] == "blobs/" { + if len(call) > 4 && call[:4] == "Put:" { + if len(call) > 10 && call[4:10] == "blobs/" { blobUploads++ } } @@ -264,8 +285,8 @@ func TestEndToEndBackup(t *testing.T) { require.NoError(t, err) assert.Greater(t, len(fileChunks), 0, "Should have chunks for file1.txt") - // Verify blobs were uploaded to S3 - assert.Greater(t, mockS3.GetStorageSize(), 0, "Should have blobs in S3 storage") + // Verify blobs were uploaded to storage + assert.Greater(t, mockStorage.GetStorageSize(), 0, "Should have blobs in storage") // Complete the snapshot - just verify we got results // In a real integration test, we'd update the snapshot record @@ -283,7 +304,7 @@ func TestEndToEndBackup(t *testing.T) { t.Logf(" Bytes scanned: %d", result.BytesScanned) t.Logf(" Chunks created: %d", result.ChunksCreated) t.Logf(" Blobs created: %d", result.BlobsCreated) - t.Logf(" S3 storage size: %d objects", mockS3.GetStorageSize()) + t.Logf(" Storage size: %d objects", mockStorage.GetStorageSize()) } // TestBackupAndVerify tests backing up files and verifying the blobs @@ -301,8 +322,8 @@ func TestBackupAndVerify(t *testing.T) { err = afero.WriteFile(fs, "/data/test.txt", []byte(testContent), 0644) require.NoError(t, err) - // Create mock S3 client - mockS3 := NewMockS3Client() + // Create mock storage + mockStorage := NewMockStorer() // Create test database ctx := context.Background() @@ -321,7 +342,7 @@ func TestBackupAndVerify(t *testing.T) { FS: fs, ChunkSize: int64(1024 * 16), // 16KB chunks Repositories: repos, - S3Client: mockS3, + Storage: mockStorage, MaxBlobSize: int64(1024 * 1024), // 1MB blobs CompressionLevel: 3, AgeRecipients: []string{"age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"}, // Test public key @@ -346,25 +367,25 @@ func TestBackupAndVerify(t *testing.T) { // Verify backup created blobs assert.Greater(t, result.BlobsCreated, 0, "Should create at least one blob") - assert.Equal(t, mockS3.GetStorageSize(), result.BlobsCreated, "S3 should have the blobs") + assert.Equal(t, mockStorage.GetStorageSize(), result.BlobsCreated, "Storage should have the blobs") - // Verify we can retrieve the blob from S3 - objects, err := mockS3.ListObjects(ctx, "blobs/") + // Verify we can retrieve the blob from storage + objects, err := mockStorage.List(ctx, "blobs/") require.NoError(t, err) - assert.Len(t, objects, result.BlobsCreated, "Should have correct number of blobs in S3") + assert.Len(t, objects, result.BlobsCreated, "Should have correct number of blobs in storage") // Get the first blob and verify it exists if len(objects) > 0 { - blobKey := objects[0].Key + blobKey := objects[0] t.Logf("Verifying blob: %s", blobKey) // Get blob info - blobInfo, err := mockS3.StatObject(ctx, blobKey) + blobInfo, err := mockStorage.Stat(ctx, blobKey) require.NoError(t, err) assert.Greater(t, blobInfo.Size, int64(0), "Blob should have content") // Get blob content - reader, err := mockS3.GetObject(ctx, blobKey) + reader, err := mockStorage.Get(ctx, blobKey) require.NoError(t, err) defer func() { _ = reader.Close() }() diff --git a/internal/vaultik/prune.go b/internal/vaultik/prune.go index 030098f..b4049dc 100644 --- a/internal/vaultik/prune.go +++ b/internal/vaultik/prune.go @@ -21,9 +21,9 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { allBlobsReferenced := make(map[string]bool) manifestCount := 0 - // List all snapshots in S3 + // List all snapshots in storage log.Info("Listing remote snapshots") - objectCh := v.S3Client.ListObjectsStream(v.ctx, "metadata/", false) + objectCh := v.Storage.ListStream(v.ctx, "metadata/") var snapshotIDs []string for object := range objectCh { @@ -73,10 +73,10 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { log.Info("Processed manifests", "count", manifestCount, "unique_blobs_referenced", len(allBlobsReferenced)) - // List all blobs in S3 + // List all blobs in storage log.Info("Listing all blobs in storage") allBlobs := make(map[string]int64) // hash -> size - blobObjectCh := v.S3Client.ListObjectsStream(v.ctx, "blobs/", true) + blobObjectCh := v.Storage.ListStream(v.ctx, "blobs/") for object := range blobObjectCh { if object.Err != nil { @@ -136,7 +136,7 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { for i, hash := range unreferencedBlobs { blobPath := fmt.Sprintf("blobs/%s/%s/%s", hash[:2], hash[2:4], hash) - if err := v.S3Client.RemoveObject(v.ctx, blobPath); err != nil { + if err := v.Storage.Delete(v.ctx, blobPath); err != nil { log.Error("Failed to delete blob", "hash", hash, "error", err) continue } diff --git a/internal/vaultik/snapshot.go b/internal/vaultik/snapshot.go index 97c317b..f6a865f 100644 --- a/internal/vaultik/snapshot.go +++ b/internal/vaultik/snapshot.go @@ -265,7 +265,7 @@ func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error { func (v *Vaultik) ListSnapshots(jsonOutput bool) error { // Get all remote snapshots remoteSnapshots := make(map[string]bool) - objectCh := v.S3Client.ListObjectsStream(v.ctx, "metadata/", false) + objectCh := v.Storage.ListStream(v.ctx, "metadata/") for object := range objectCh { if object.Err != nil { @@ -546,7 +546,7 @@ func (v *Vaultik) VerifySnapshot(snapshotID string, deep bool) error { return nil } else { // Just check existence - _, err := v.S3Client.StatObject(v.ctx, blobPath) + _, err := v.Storage.Stat(v.ctx, blobPath) if err != nil { fmt.Printf(" Missing: %s (%s)\n", blob.Hash, humanize.Bytes(uint64(blob.CompressedSize))) missing++ @@ -581,7 +581,7 @@ func (v *Vaultik) VerifySnapshot(snapshotID string, deep bool) error { func (v *Vaultik) getManifestSize(snapshotID string) (int64, error) { manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) - reader, err := v.S3Client.GetObject(v.ctx, manifestPath) + reader, err := v.Storage.Get(v.ctx, manifestPath) if err != nil { return 0, fmt.Errorf("downloading manifest: %w", err) } @@ -598,7 +598,7 @@ func (v *Vaultik) getManifestSize(snapshotID string) (int64, error) { func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error) { manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) - reader, err := v.S3Client.GetObject(v.ctx, manifestPath) + reader, err := v.Storage.Get(v.ctx, manifestPath) if err != nil { return nil, err } @@ -613,10 +613,10 @@ func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error } func (v *Vaultik) deleteSnapshot(snapshotID string) error { - // First, delete from S3 + // First, delete from storage // List all objects under metadata/{snapshotID}/ prefix := fmt.Sprintf("metadata/%s/", snapshotID) - objectCh := v.S3Client.ListObjectsStream(v.ctx, prefix, true) + objectCh := v.Storage.ListStream(v.ctx, prefix) var objectsToDelete []string for object := range objectCh { @@ -628,7 +628,7 @@ func (v *Vaultik) deleteSnapshot(snapshotID string) error { // Delete all objects for _, key := range objectsToDelete { - if err := v.S3Client.RemoveObject(v.ctx, key); err != nil { + if err := v.Storage.Delete(v.ctx, key); err != nil { return fmt.Errorf("removing %s: %w", key, err) } } @@ -658,7 +658,7 @@ func (v *Vaultik) syncWithRemote() error { // Get all remote snapshot IDs remoteSnapshots := make(map[string]bool) - objectCh := v.S3Client.ListObjectsStream(v.ctx, "metadata/", false) + objectCh := v.Storage.ListStream(v.ctx, "metadata/") for object := range objectCh { if object.Err != nil { diff --git a/internal/vaultik/vaultik.go b/internal/vaultik/vaultik.go index ae5f410..1fbcb4d 100644 --- a/internal/vaultik/vaultik.go +++ b/internal/vaultik/vaultik.go @@ -10,8 +10,8 @@ import ( "git.eeqj.de/sneak/vaultik/internal/crypto" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/globals" - "git.eeqj.de/sneak/vaultik/internal/s3" "git.eeqj.de/sneak/vaultik/internal/snapshot" + "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/spf13/afero" "go.uber.org/fx" ) @@ -22,7 +22,7 @@ type Vaultik struct { Config *config.Config DB *database.DB Repositories *database.Repositories - S3Client *s3.Client + Storage storage.Storer ScannerFactory snapshot.ScannerFactory SnapshotManager *snapshot.SnapshotManager Shutdowner fx.Shutdowner @@ -46,7 +46,7 @@ type VaultikParams struct { Config *config.Config DB *database.DB Repositories *database.Repositories - S3Client *s3.Client + Storage storage.Storer ScannerFactory snapshot.ScannerFactory SnapshotManager *snapshot.SnapshotManager Shutdowner fx.Shutdowner @@ -72,7 +72,7 @@ func New(params VaultikParams) *Vaultik { Config: params.Config, DB: params.DB, Repositories: params.Repositories, - S3Client: params.S3Client, + Storage: params.Storage, ScannerFactory: params.ScannerFactory, SnapshotManager: params.SnapshotManager, Shutdowner: params.Shutdowner, diff --git a/internal/vaultik/verify.go b/internal/vaultik/verify.go index bb32054..b039c4c 100644 --- a/internal/vaultik/verify.go +++ b/internal/vaultik/verify.go @@ -36,7 +36,7 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) log.Info("Downloading manifest", "path", manifestPath) - manifestReader, err := v.S3Client.GetObject(v.ctx, manifestPath) + manifestReader, err := v.Storage.Get(v.ctx, manifestPath) if err != nil { return fmt.Errorf("failed to download manifest: %w", err) } @@ -57,7 +57,7 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID) log.Info("Downloading encrypted database", "path", dbPath) - dbReader, err := v.S3Client.GetObject(v.ctx, dbPath) + dbReader, err := v.Storage.Get(v.ctx, dbPath) if err != nil { return fmt.Errorf("failed to download database: %w", err) } @@ -236,10 +236,10 @@ func (v *Vaultik) verifyBlobExistence(manifest *snapshot.Manifest) error { // Construct blob path blobPath := fmt.Sprintf("blobs/%s/%s/%s", blob.Hash[:2], blob.Hash[2:4], blob.Hash) - // Check blob exists with HeadObject - stat, err := v.S3Client.StatObject(v.ctx, blobPath) + // Check blob exists + stat, err := v.Storage.Stat(v.ctx, blobPath) if err != nil { - return fmt.Errorf("blob %s missing from S3: %w", blob.Hash, err) + return fmt.Errorf("blob %s missing from storage: %w", blob.Hash, err) } // Verify size matches @@ -258,7 +258,7 @@ func (v *Vaultik) verifyBlobExistence(manifest *snapshot.Manifest) error { } } - log.Info("✓ All blobs exist in S3") + log.Info("✓ All blobs exist in storage") return nil } @@ -295,7 +295,7 @@ func (v *Vaultik) performDeepVerification(manifest *snapshot.Manifest, db *sql.D func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error { // Download blob blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobInfo.Hash[:2], blobInfo.Hash[2:4], blobInfo.Hash) - reader, err := v.S3Client.GetObject(v.ctx, blobPath) + reader, err := v.Storage.Get(v.ctx, blobPath) if err != nil { return fmt.Errorf("failed to download: %w", err) }