From 43a69c2cfb334947663e3990bbcb3ae28677b601 Mon Sep 17 00:00:00 2001 From: sneak Date: Fri, 19 Dec 2025 19:48:48 +0700 Subject: [PATCH] Fix FK constraint errors in batched file insertion Generate file UUIDs upfront in checkFileInMemory() rather than deferring to Files.Create(). This ensures file_chunks and chunk_files records have valid FileID values when constructed during file processing, before the batch insert transaction. Root cause: For new files, file.ID was empty when building the fileChunks and chunkFiles slices. The ID was only generated later in Files.Create(), but by then the slices already had empty FileID values, causing FK constraint failures. Also adds PROCESS.md documenting the snapshot creation lifecycle, database transactions, and FK dependency ordering. --- CLAUDE.md | 10 +- PROCESS.md | 556 +++++++++++++++++++++++++++++++++++ internal/snapshot/scanner.go | 187 ++++++++---- 3 files changed, 692 insertions(+), 61 deletions(-) create mode 100644 PROCESS.md diff --git a/CLAUDE.md b/CLAUDE.md index 42f26aa..cec4213 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,6 +10,9 @@ Read the rules in AGENTS.md and follow them. corporate advertising for Anthropic and is therefore completely unacceptable in commit messages. +* NEVER use `git add -A`. Always add only the files you intentionally + changed. + * Tests should always be run before committing code. No commits should be made that do not pass tests. @@ -33,6 +36,9 @@ Read the rules in AGENTS.md and follow them. * When testing on a 2.5Gbit/s ethernet to an s3 server backed by 2000MB/sec SSD, estimate about 4 seconds per gigabyte of backup time. -* When running tests, don't run individual tests, or grep the output. run the entire test suite every time and read the full output. +* When running tests, don't run individual tests, or grep the output. run + the entire test suite every time and read the full output. -* When running tests, don't run individual tests, or try to grep the output. never run "go test". only ever run "make test" to run the full test suite, and examine the full output. \ No newline at end of file +* When running tests, don't run individual tests, or try to grep the output. + never run "go test". only ever run "make test" to run the full test + suite, and examine the full output. diff --git a/PROCESS.md b/PROCESS.md new file mode 100644 index 0000000..356b90e --- /dev/null +++ b/PROCESS.md @@ -0,0 +1,556 @@ +# Vaultik Snapshot Creation Process + +This document describes the lifecycle of objects during snapshot creation, with a focus on database transactions and foreign key constraints. + +## Database Schema Overview + +### Tables and Foreign Key Dependencies + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ FOREIGN KEY GRAPH │ +│ │ +│ snapshots ◄────── snapshot_files ────────► files │ +│ │ │ │ +│ └───────── snapshot_blobs ────────► blobs │ │ +│ │ │ │ +│ │ ├──► file_chunks ◄── chunks│ +│ │ │ ▲ │ +│ │ └──► chunk_files ────┘ │ +│ │ │ +│ └──► blob_chunks ─────────────┘│ +│ │ +│ uploads ───────► blobs.blob_hash │ +│ └──────────► snapshots.id │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### Critical Constraint: `chunks` Must Exist First + +These tables reference `chunks.chunk_hash` **without CASCADE**: +- `file_chunks.chunk_hash` → `chunks.chunk_hash` +- `chunk_files.chunk_hash` → `chunks.chunk_hash` +- `blob_chunks.chunk_hash` → `chunks.chunk_hash` + +**Implication**: A chunk record MUST be committed to the database BEFORE any of these referencing records can be created. + +### Order of Operations Required by Schema + +``` +1. snapshots (created first, before scan) +2. blobs (created when packer starts new blob) +3. chunks (created during file processing) +4. blob_chunks (created immediately after chunk added to packer) +5. files (created after file fully chunked) +6. file_chunks (created with file record) +7. chunk_files (created with file record) +8. snapshot_files (created with file record) +9. snapshot_blobs (created after blob uploaded) +10. uploads (created after blob uploaded) +``` + +--- + +## Snapshot Creation Phases + +### Phase 0: Initialization + +**Actions:** +1. Snapshot record created in database (Transaction T0) +2. Known files loaded into memory from `files` table +3. Known chunks loaded into memory from `chunks` table + +**Transactions:** +``` +T0: INSERT INTO snapshots (id, hostname, ...) VALUES (...) + COMMIT +``` + +--- + +### Phase 1: Scan Directory + +**Actions:** +1. Walk filesystem directory tree +2. For each file, compare against in-memory `knownFiles` map +3. Classify files as: unchanged, new, or modified +4. Collect unchanged file IDs for later association +5. Collect new/modified files for processing + +**Transactions:** +``` +(None during scan - all in-memory) +``` + +--- + +### Phase 1b: Associate Unchanged Files + +**Actions:** +1. For unchanged files, add entries to `snapshot_files` table +2. Done in batches of 1000 + +**Transactions:** +``` +For each batch of 1000 file IDs: + T: BEGIN + INSERT INTO snapshot_files (snapshot_id, file_id) VALUES (?, ?) + ... (up to 1000 inserts) + COMMIT +``` + +--- + +### Phase 2: Process Files + +For each file that needs processing: + +#### Step 2a: Open and Chunk File + +**Location:** `processFileStreaming()` + +For each chunk produced by content-defined chunking: + +##### Step 2a-1: Check Chunk Existence +```go +chunkExists := s.chunkExists(chunk.Hash) // In-memory lookup +``` + +##### Step 2a-2: Create Chunk Record (if new) +```go +// TRANSACTION: Create chunk in database +err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + dbChunk := &database.Chunk{ChunkHash: chunk.Hash, Size: chunk.Size} + return s.repos.Chunks.Create(txCtx, tx, dbChunk) +}) +// COMMIT immediately after WithTx returns + +// Update in-memory cache +s.addKnownChunk(chunk.Hash) +``` + +**Transaction:** +``` +T_chunk: BEGIN + INSERT INTO chunks (chunk_hash, size) VALUES (?, ?) + COMMIT +``` + +##### Step 2a-3: Add Chunk to Packer + +```go +s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data}) +``` + +**Inside packer.AddChunk → addChunkToCurrentBlob():** + +```go +// TRANSACTION: Create blob_chunks record IMMEDIATELY +if p.repos != nil { + blobChunk := &database.BlobChunk{ + BlobID: p.currentBlob.id, + ChunkHash: chunk.Hash, + Offset: offset, + Length: chunkSize, + } + err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { + return p.repos.BlobChunks.Create(ctx, tx, blobChunk) + }) + // COMMIT immediately +} +``` + +**Transaction:** +``` +T_blob_chunk: BEGIN + INSERT INTO blob_chunks (blob_id, chunk_hash, offset, length) VALUES (?, ?, ?, ?) + COMMIT +``` + +**⚠️ CRITICAL DEPENDENCY**: This transaction requires `chunks.chunk_hash` to exist (FK constraint). +The chunk MUST be committed in Step 2a-2 BEFORE this can succeed. + +--- + +#### Step 2b: Blob Size Limit Handling + +If adding a chunk would exceed blob size limit: + +```go +if err == blob.ErrBlobSizeLimitExceeded { + if err := s.packer.FinalizeBlob(); err != nil { ... } + // Retry adding the chunk + if err := s.packer.AddChunk(...); err != nil { ... } +} +``` + +**FinalizeBlob() transactions:** +``` +T_blob_finish: BEGIN + UPDATE blobs SET blob_hash=?, uncompressed_size=?, compressed_size=?, finished_ts=? WHERE id=? + COMMIT +``` + +Then blob handler is called (handleBlobReady): +``` +(Upload to S3 - no transaction) + +T_blob_uploaded: BEGIN + UPDATE blobs SET uploaded_ts=? WHERE id=? + INSERT INTO snapshot_blobs (snapshot_id, blob_id, blob_hash) VALUES (?, ?, ?) + INSERT INTO uploads (blob_hash, snapshot_id, uploaded_at, size, duration_ms) VALUES (?, ?, ?, ?, ?) + COMMIT +``` + +--- + +#### Step 2c: Queue File for Batch Insertion + +After all chunks for a file are processed: + +```go +// Build file data (in-memory, no DB) +fileChunks := make([]database.FileChunk, len(chunks)) +chunkFiles := make([]database.ChunkFile, len(chunks)) + +// Queue for batch insertion +return s.addPendingFile(ctx, pendingFileData{ + file: fileToProcess.File, + fileChunks: fileChunks, + chunkFiles: chunkFiles, +}) +``` + +**No transaction yet** - just adds to `pendingFiles` slice. + +If `len(pendingFiles) >= fileBatchSize (100)`, triggers `flushPendingFiles()`. + +--- + +### Step 2d: Flush Pending Files + +**Location:** `flushPendingFiles()` - called when batch is full or at end of processing + +```go +return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + for _, data := range files { + // 1. Create file record + s.repos.Files.Create(txCtx, tx, data.file) // INSERT OR REPLACE + + // 2. Delete old associations + s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID) + s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID) + + // 3. Create file_chunks records + for _, fc := range data.fileChunks { + s.repos.FileChunks.Create(txCtx, tx, &fc) // FK: chunks.chunk_hash + } + + // 4. Create chunk_files records + for _, cf := range data.chunkFiles { + s.repos.ChunkFiles.Create(txCtx, tx, &cf) // FK: chunks.chunk_hash + } + + // 5. Add file to snapshot + s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID) + } + return nil +}) +// COMMIT (all or nothing for the batch) +``` + +**Transaction:** +``` +T_files_batch: BEGIN + -- For each file in batch: + INSERT OR REPLACE INTO files (...) VALUES (...) + DELETE FROM file_chunks WHERE file_id = ? + DELETE FROM chunk_files WHERE file_id = ? + INSERT INTO file_chunks (file_id, idx, chunk_hash) VALUES (?, ?, ?) -- FK: chunks + INSERT INTO chunk_files (chunk_hash, file_id, ...) VALUES (?, ?, ...) -- FK: chunks + INSERT INTO snapshot_files (snapshot_id, file_id) VALUES (?, ?) + -- Repeat for each file + COMMIT +``` + +**⚠️ CRITICAL DEPENDENCY**: `file_chunks` and `chunk_files` require `chunks.chunk_hash` to exist. + +--- + +### Phase 2 End: Final Flush + +```go +// Flush any remaining pending files +if err := s.flushAllPending(ctx); err != nil { ... } + +// Final packer flush +s.packer.Flush() +``` + +--- + +## The Current Bug + +### Problem + +The current code attempts to batch file insertions, but `file_chunks` and `chunk_files` have foreign keys to `chunks.chunk_hash`. The batched file flush tries to insert these records, but if the chunks haven't been committed yet, the FK constraint fails. + +### Why It's Happening + +Looking at the sequence: + +1. Process file A, chunk X +2. Create chunk X in DB (Transaction commits) +3. Add chunk X to packer +4. Packer creates blob_chunks for chunk X (needs chunk X - OK, committed in step 2) +5. Queue file A with chunk references +6. Process file B, chunk Y +7. Create chunk Y in DB (Transaction commits) +8. ... etc ... +9. At end: flushPendingFiles() +10. Insert file_chunks for file A referencing chunk X (chunk X committed - should work) + +The chunks ARE being created individually. But something is going wrong. + +### Actual Issue + +Wait - let me re-read the code. The issue is: + +In `processFileStreaming`, when we queue file data: +```go +fileChunks[i] = database.FileChunk{ + FileID: fileToProcess.File.ID, + Idx: ci.fileChunk.Idx, + ChunkHash: ci.fileChunk.ChunkHash, +} +``` + +The `FileID` is set, but `fileToProcess.File.ID` might be empty at this point because the file record hasn't been created yet! + +Looking at `checkFileInMemory`: +```go +// For new files: +if !exists { + return file, true // file.ID is empty string! +} + +// For existing files: +file.ID = existingFile.ID // Reuse existing ID +``` + +**For NEW files, `file.ID` is empty!** + +Then in `flushPendingFiles`: +```go +s.repos.Files.Create(txCtx, tx, data.file) // This generates/uses the ID +``` + +But `data.fileChunks` was built with the EMPTY ID! + +### The Real Problem + +For new files: +1. `checkFileInMemory` creates file record with empty ID +2. `processFileStreaming` queues file_chunks with empty `FileID` +3. `flushPendingFiles` creates file (generates ID), but file_chunks still have empty `FileID` + +Wait, but `Files.Create` should be INSERT OR REPLACE by path, and the file struct should get updated... Let me check. + +Actually, looking more carefully at the code path - the file IS created first in the flush, but the `fileChunks` slice was already built with the old (possibly empty) ID. The ID isn't updated after the file is created. + +Hmm, but looking at the current code: +```go +fileChunks[i] = database.FileChunk{ + FileID: fileToProcess.File.ID, // This uses the ID from the File struct +``` + +And in `checkFileInMemory` for new files, we create a file struct but don't set the ID. However, looking at the database repository, `Files.Create` should be doing `INSERT OR REPLACE` and the ID should be pre-generated... + +Let me check if IDs are being generated. Looking at the File struct usage, it seems like UUIDs should be generated somewhere... + +Actually, looking at the test failures again: +``` +creating file chunk: inserting file_chunk: constraint failed: FOREIGN KEY constraint failed (787) +``` + +Error 787 is SQLite's foreign key constraint error. The failing FK is on `file_chunks.chunk_hash → chunks.chunk_hash`. + +So the chunks ARE NOT in the database when we try to insert file_chunks. Let me trace through more carefully... + +--- + +## Transaction Timing Issue + +The problem is transaction visibility in SQLite. + +Each `WithTx` creates a new transaction that commits at the end. But with batched file insertion: + +1. Chunk transactions commit one at a time +2. File batch transaction runs later + +If chunks are being inserted but something goes wrong with transaction isolation, the file batch might not see them. + +But actually SQLite in WAL mode should have SERIALIZABLE isolation by default, so committed transactions should be visible. + +Let me check if the in-memory cache is masking a database problem... + +Actually, wait. Let me re-check the current broken code more carefully. The issue might be simpler. + +--- + +## Current Code Flow Analysis + +Looking at `processFileStreaming` in the current broken state: + +```go +// For each chunk: +if !chunkExists { + err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + dbChunk := &database.Chunk{ChunkHash: chunk.Hash, Size: chunk.Size} + return s.repos.Chunks.Create(txCtx, tx, dbChunk) + }) + // ... check error ... + s.addKnownChunk(chunk.Hash) +} + +// ... add to packer (creates blob_chunks) ... + +// Collect chunk info for file +chunks = append(chunks, chunkInfo{...}) +``` + +Then at end of function: +```go +// Queue file for batch insertion +return s.addPendingFile(ctx, pendingFileData{ + file: fileToProcess.File, + fileChunks: fileChunks, + chunkFiles: chunkFiles, +}) +``` + +At end of `processPhase`: +```go +if err := s.flushAllPending(ctx); err != nil { ... } +``` + +The chunks are being created one-by-one with individual transactions. By the time `flushPendingFiles` runs, all chunk transactions should have committed. + +Unless... there's a bug in how the chunks are being referenced. Let me check if the chunk_hash values are correct. + +Or... maybe the test database is being recreated between operations somehow? + +Actually, let me check the test setup. Maybe the issue is specific to the test environment. + +--- + +## Summary of Object Lifecycle + +| Object | When Created | Transaction | Dependencies | +|--------|--------------|-------------|--------------| +| snapshot | Before scan | Individual tx | None | +| blob | When packer needs new blob | Individual tx | None | +| chunk | During file chunking (each chunk) | Individual tx | None | +| blob_chunks | Immediately after adding chunk to packer | Individual tx | chunks, blobs | +| files | Batched at end of processing | Batch tx | None | +| file_chunks | With file (batched) | Batch tx | files, chunks | +| chunk_files | With file (batched) | Batch tx | files, chunks | +| snapshot_files | With file (batched) | Batch tx | snapshots, files | +| snapshot_blobs | After blob upload | Individual tx | snapshots, blobs | +| uploads | After blob upload | Same tx as snapshot_blobs | blobs, snapshots | + +--- + +## Root Cause Analysis + +After detailed analysis, I believe the issue is one of the following: + +### Hypothesis 1: File ID Not Set + +Looking at `checkFileInMemory()` for NEW files: +```go +if !exists { + return file, true // file.ID is empty string! +} +``` + +For new files, `file.ID` is empty. Then in `processFileStreaming`: +```go +fileChunks[i] = database.FileChunk{ + FileID: fileToProcess.File.ID, // Empty for new files! + ... +} +``` + +The `FileID` in the built `fileChunks` slice is empty. + +Then in `flushPendingFiles`: +```go +s.repos.Files.Create(txCtx, tx, data.file) // This generates the ID +// But data.fileChunks still has empty FileID! +for i := range data.fileChunks { + s.repos.FileChunks.Create(...) // Uses empty FileID +} +``` + +**Solution**: Generate file IDs upfront in `checkFileInMemory()`: +```go +file := &database.File{ + ID: uuid.New().String(), // Generate ID immediately + Path: path, + ... +} +``` + +### Hypothesis 2: Transaction Isolation + +SQLite with a single connection pool (`MaxOpenConns(1)`) should serialize all transactions. Committed data should be visible to subsequent transactions. + +However, there might be a subtle issue with how `context.Background()` is used in the packer vs the scanner's context. + +## Recommended Fix + +**Step 1: Generate file IDs upfront** + +In `checkFileInMemory()`, generate the UUID for new files immediately: +```go +file := &database.File{ + ID: uuid.New().String(), // Always generate ID + Path: path, + ... +} +``` + +This ensures `file.ID` is set when building `fileChunks` and `chunkFiles` slices. + +**Step 2: Verify by reverting to per-file transactions** + +If Step 1 doesn't fix it, revert to non-batched file insertion to isolate the issue: + +```go +// Instead of queuing: +// return s.addPendingFile(ctx, pendingFileData{...}) + +// Do immediate insertion: +return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + // Create file + s.repos.Files.Create(txCtx, tx, fileToProcess.File) + // Delete old associations + s.repos.FileChunks.DeleteByFileID(...) + s.repos.ChunkFiles.DeleteByFileID(...) + // Create new associations + for _, fc := range fileChunks { + s.repos.FileChunks.Create(...) + } + for _, cf := range chunkFiles { + s.repos.ChunkFiles.Create(...) + } + // Add to snapshot + s.repos.Snapshots.AddFileByID(...) + return nil +}) +``` + +**Step 3: If batching is still desired** + +After confirming per-file transactions work, re-implement batching with the ID fix in place, and add debug logging to trace exactly which chunk_hash is failing and why. diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index 7022c7e..0507f5e 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -15,6 +15,7 @@ import ( "git.eeqj.de/sneak/vaultik/internal/log" "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/dustin/go-humanize" + "github.com/google/uuid" "github.com/spf13/afero" ) @@ -25,6 +26,13 @@ type FileToProcess struct { File *database.File } +// pendingFileData holds all data needed to commit a file to the database +type pendingFileData struct { + file *database.File + fileChunks []database.FileChunk + chunkFiles []database.ChunkFile +} + // Scanner scans directories and populates the database with file and chunk information type Scanner struct { fs afero.Fs @@ -42,6 +50,10 @@ type Scanner struct { knownChunks map[string]struct{} knownChunksMu sync.RWMutex + // Pending file data buffer for batch insertion + pendingFiles []pendingFileData + pendingFilesMu sync.Mutex + // Mutex for coordinating blob creation packerMu sync.Mutex // Blocks chunk production during blob creation @@ -49,6 +61,11 @@ type Scanner struct { scanCtx context.Context } +const ( + // Batch size for file database operations + fileBatchSize = 100 +) + // ScannerConfig contains configuration for the scanner type ScannerConfig struct { FS afero.Fs @@ -286,6 +303,73 @@ func (s *Scanner) addKnownChunk(hash string) { s.knownChunksMu.Unlock() } +// addPendingFile adds a file to the pending buffer and flushes if needed +func (s *Scanner) addPendingFile(ctx context.Context, data pendingFileData) error { + s.pendingFilesMu.Lock() + s.pendingFiles = append(s.pendingFiles, data) + needsFlush := len(s.pendingFiles) >= fileBatchSize + s.pendingFilesMu.Unlock() + + if needsFlush { + return s.flushPendingFiles(ctx) + } + return nil +} + +// flushPendingFiles writes all pending files to the database in a single transaction +func (s *Scanner) flushPendingFiles(ctx context.Context) error { + s.pendingFilesMu.Lock() + files := s.pendingFiles + s.pendingFiles = nil + s.pendingFilesMu.Unlock() + + if len(files) == 0 { + return nil + } + + return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + for _, data := range files { + // Create or update the file record + if err := s.repos.Files.Create(txCtx, tx, data.file); err != nil { + return fmt.Errorf("creating file record: %w", err) + } + + // Delete any existing file_chunks and chunk_files for this file + if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID); err != nil { + return fmt.Errorf("deleting old file chunks: %w", err) + } + if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID); err != nil { + return fmt.Errorf("deleting old chunk files: %w", err) + } + + // Create file-chunk mappings + for i := range data.fileChunks { + if err := s.repos.FileChunks.Create(txCtx, tx, &data.fileChunks[i]); err != nil { + return fmt.Errorf("creating file chunk: %w", err) + } + } + + // Create chunk-file mappings + for i := range data.chunkFiles { + if err := s.repos.ChunkFiles.Create(txCtx, tx, &data.chunkFiles[i]); err != nil { + return fmt.Errorf("creating chunk file: %w", err) + } + } + + // Add file to snapshot + if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID); err != nil { + return fmt.Errorf("adding file to snapshot: %w", err) + } + } + return nil + }) +} + +// flushAllPending flushes all pending files to the database +func (s *Scanner) flushAllPending(ctx context.Context) error { + return s.flushPendingFiles(ctx) +} + // ScanPhaseResult contains the results of the scan phase type ScanPhaseResult struct { FilesToProcess []*FileToProcess @@ -429,8 +513,21 @@ func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles ma gid = stat.Gid() } - // Create file record + // Check against in-memory map first to get existing ID if available + existingFile, exists := knownFiles[path] + + // Create file record with ID set upfront + // For new files, generate UUID immediately so it's available for chunk associations + // For existing files, reuse the existing ID + var fileID string + if exists { + fileID = existingFile.ID + } else { + fileID = uuid.New().String() + } + file := &database.File{ + ID: fileID, Path: path, MTime: info.ModTime(), CTime: info.ModTime(), // afero doesn't provide ctime @@ -440,16 +537,11 @@ func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles ma GID: gid, } - // Check against in-memory map - existingFile, exists := knownFiles[path] + // New file - needs processing if !exists { - // New file return file, true } - // Reuse existing ID - file.ID = existingFile.ID - // Check if file has changed if existingFile.Size != file.Size || existingFile.MTime.Unix() != file.MTime.Unix() || @@ -585,6 +677,11 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc } } + // Flush any remaining pending chunks and files to database + if err := s.flushAllPending(ctx); err != nil { + return fmt.Errorf("flushing pending database operations: %w", err) + } + // Final flush (outside any transaction) s.packerMu.Lock() if err := s.packer.Flush(); err != nil { @@ -779,20 +876,18 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT // Check if chunk already exists (fast in-memory lookup) chunkExists := s.chunkExists(chunk.Hash) - // Store chunk if new + // Store chunk in database if new (must happen before packer.AddChunk + // because packer creates blob_chunk entries that reference chunks) if !chunkExists { err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { dbChunk := &database.Chunk{ ChunkHash: chunk.Hash, Size: chunk.Size, } - if err := s.repos.Chunks.Create(txCtx, tx, dbChunk); err != nil { - return fmt.Errorf("creating chunk: %w", err) - } - return nil + return s.repos.Chunks.Create(txCtx, tx, dbChunk) }) if err != nil { - return fmt.Errorf("storing chunk: %w", err) + return fmt.Errorf("creating chunk: %w", err) } // Add to in-memory cache for fast duplicate detection s.addKnownChunk(chunk.Hash) @@ -871,56 +966,30 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT "file_hash", fileHash, "chunks", len(chunks)) - // Store file record, chunk associations, and snapshot association in database - // This happens AFTER successful chunking to avoid orphaned records on interruption - err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { - // Create or update the file record - // Files.Create uses INSERT OR REPLACE, so it handles both new and changed files - if err := s.repos.Files.Create(txCtx, tx, fileToProcess.File); err != nil { - return fmt.Errorf("creating file record: %w", err) + // Build file data for batch insertion + // Update chunk associations with the file ID + fileChunks := make([]database.FileChunk, len(chunks)) + chunkFiles := make([]database.ChunkFile, len(chunks)) + for i, ci := range chunks { + fileChunks[i] = database.FileChunk{ + FileID: fileToProcess.File.ID, + Idx: ci.fileChunk.Idx, + ChunkHash: ci.fileChunk.ChunkHash, } - - // Delete any existing file_chunks and chunk_files for this file - // This ensures old chunks are no longer associated when file content changes - if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil { - return fmt.Errorf("deleting old file chunks: %w", err) - } - if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil { - return fmt.Errorf("deleting old chunk files: %w", err) + chunkFiles[i] = database.ChunkFile{ + ChunkHash: ci.fileChunk.ChunkHash, + FileID: fileToProcess.File.ID, + FileOffset: ci.offset, + Length: ci.size, } + } - // Update chunk associations with the file ID (now that we have it) - for i := range chunks { - chunks[i].fileChunk.FileID = fileToProcess.File.ID - } - - for _, ci := range chunks { - // Create file-chunk mapping - if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil { - return fmt.Errorf("creating file chunk: %w", err) - } - - // Create chunk-file mapping - chunkFile := &database.ChunkFile{ - ChunkHash: ci.fileChunk.ChunkHash, - FileID: fileToProcess.File.ID, - FileOffset: ci.offset, - Length: ci.size, - } - if err := s.repos.ChunkFiles.Create(txCtx, tx, chunkFile); err != nil { - return fmt.Errorf("creating chunk file: %w", err) - } - } - - // Add file to snapshot - if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, fileToProcess.File.ID); err != nil { - return fmt.Errorf("adding file to snapshot: %w", err) - } - - return nil + // Queue file for batch insertion + return s.addPendingFile(ctx, pendingFileData{ + file: fileToProcess.File, + fileChunks: fileChunks, + chunkFiles: chunkFiles, }) - - return err } // GetProgress returns the progress reporter for this scanner