From bb2292de7f7e9bb08e1e692aaf7614921060962c Mon Sep 17 00:00:00 2001 From: sneak Date: Sat, 26 Jul 2025 02:38:50 +0200 Subject: [PATCH] Fix file content change handling and improve log messages - Delete old file_chunks and chunk_files when file content changes - Add DeleteByFileID method to ChunkFileRepository - Add tests to verify old chunks are properly disassociated - Make log messages more precise throughout scanner and snapshot - Support metadata-only snapshots when no files have changed - Add periodic status output during scan and snapshot operations - Improve scan summary output with clearer information --- internal/backup/file_change_test.go | 236 ++++++++++++++++++++++++++++ internal/backup/scanner.go | 121 +++++++++++--- internal/backup/snapshot.go | 56 +++---- internal/blob/packer.go | 14 +- internal/database/chunk_files.go | 18 +++ 5 files changed, 388 insertions(+), 57 deletions(-) create mode 100644 internal/backup/file_change_test.go diff --git a/internal/backup/file_change_test.go b/internal/backup/file_change_test.go new file mode 100644 index 0000000..2c5c0fd --- /dev/null +++ b/internal/backup/file_change_test.go @@ -0,0 +1,236 @@ +package backup_test + +import ( + "context" + "database/sql" + "testing" + "time" + + "git.eeqj.de/sneak/vaultik/internal/backup" + "git.eeqj.de/sneak/vaultik/internal/database" + "git.eeqj.de/sneak/vaultik/internal/log" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestFileContentChange verifies that when a file's content changes, +// the old chunks are properly disassociated +func TestFileContentChange(t *testing.T) { + // Initialize logger for tests + log.Initialize(log.Config{}) + + // Create in-memory filesystem + fs := afero.NewMemMapFs() + + // Create initial file + err := afero.WriteFile(fs, "/test.txt", []byte("Initial content"), 0644) + require.NoError(t, err) + + // Create test database + db, err := database.NewTestDB() + require.NoError(t, err) + defer func() { + if err := db.Close(); err != nil { + t.Errorf("failed to close database: %v", err) + } + }() + + repos := database.NewRepositories(db) + + // Create scanner + scanner := backup.NewScanner(backup.ScannerConfig{ + FS: fs, + ChunkSize: int64(1024 * 16), // 16KB chunks for testing + Repositories: repos, + MaxBlobSize: int64(1024 * 1024), // 1MB blobs + CompressionLevel: 3, + AgeRecipients: []string{"age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"}, // Test public key + }) + + // Create first snapshot + ctx := context.Background() + snapshotID1 := "snapshot1" + err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { + snapshot := &database.Snapshot{ + ID: snapshotID1, + Hostname: "test-host", + VaultikVersion: "test", + StartedAt: time.Now(), + } + return repos.Snapshots.Create(ctx, tx, snapshot) + }) + require.NoError(t, err) + + // First scan - should create chunks for initial content + result1, err := scanner.Scan(ctx, "/", snapshotID1) + require.NoError(t, err) + t.Logf("First scan: %d files scanned", result1.FilesScanned) + + // Get file chunks from first scan + fileChunks1, err := repos.FileChunks.GetByPath(ctx, "/test.txt") + require.NoError(t, err) + assert.Len(t, fileChunks1, 1) // Small file = 1 chunk + oldChunkHash := fileChunks1[0].ChunkHash + + // Get chunk files from first scan + chunkFiles1, err := repos.ChunkFiles.GetByFilePath(ctx, "/test.txt") + require.NoError(t, err) + assert.Len(t, chunkFiles1, 1) + + // Modify the file + time.Sleep(10 * time.Millisecond) // Ensure mtime changes + err = afero.WriteFile(fs, "/test.txt", []byte("Modified content with different data"), 0644) + require.NoError(t, err) + + // Create second snapshot + snapshotID2 := "snapshot2" + err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { + snapshot := &database.Snapshot{ + ID: snapshotID2, + Hostname: "test-host", + VaultikVersion: "test", + StartedAt: time.Now(), + } + return repos.Snapshots.Create(ctx, tx, snapshot) + }) + require.NoError(t, err) + + // Second scan - should create new chunks and remove old associations + result2, err := scanner.Scan(ctx, "/", snapshotID2) + require.NoError(t, err) + t.Logf("Second scan: %d files scanned", result2.FilesScanned) + + // Get file chunks from second scan + fileChunks2, err := repos.FileChunks.GetByPath(ctx, "/test.txt") + require.NoError(t, err) + assert.Len(t, fileChunks2, 1) // Still 1 chunk but different hash + newChunkHash := fileChunks2[0].ChunkHash + + // Verify the chunk hashes are different + assert.NotEqual(t, oldChunkHash, newChunkHash, "Chunk hash should change when content changes") + + // Get chunk files from second scan + chunkFiles2, err := repos.ChunkFiles.GetByFilePath(ctx, "/test.txt") + require.NoError(t, err) + assert.Len(t, chunkFiles2, 1) + assert.Equal(t, newChunkHash, chunkFiles2[0].ChunkHash) + + // Verify old chunk still exists (it's still valid data) + oldChunk, err := repos.Chunks.GetByHash(ctx, oldChunkHash) + require.NoError(t, err) + assert.NotNil(t, oldChunk) + + // Verify new chunk exists + newChunk, err := repos.Chunks.GetByHash(ctx, newChunkHash) + require.NoError(t, err) + assert.NotNil(t, newChunk) + + // Verify that chunk_files for old chunk no longer references this file + oldChunkFiles, err := repos.ChunkFiles.GetByChunkHash(ctx, oldChunkHash) + require.NoError(t, err) + for _, cf := range oldChunkFiles { + file, err := repos.Files.GetByID(ctx, cf.FileID) + require.NoError(t, err) + assert.NotEqual(t, "/data/test.txt", file.Path, "Old chunk should not be associated with the modified file") + } +} + +// TestMultipleFileChanges verifies handling of multiple file changes in one scan +func TestMultipleFileChanges(t *testing.T) { + // Initialize logger for tests + log.Initialize(log.Config{}) + + // Create in-memory filesystem + fs := afero.NewMemMapFs() + + // Create initial files + files := map[string]string{ + "/file1.txt": "Content 1", + "/file2.txt": "Content 2", + "/file3.txt": "Content 3", + } + + for path, content := range files { + err := afero.WriteFile(fs, path, []byte(content), 0644) + require.NoError(t, err) + } + + // Create test database + db, err := database.NewTestDB() + require.NoError(t, err) + defer func() { + if err := db.Close(); err != nil { + t.Errorf("failed to close database: %v", err) + } + }() + + repos := database.NewRepositories(db) + + // Create scanner + scanner := backup.NewScanner(backup.ScannerConfig{ + FS: fs, + ChunkSize: int64(1024 * 16), // 16KB chunks for testing + Repositories: repos, + MaxBlobSize: int64(1024 * 1024), // 1MB blobs + CompressionLevel: 3, + AgeRecipients: []string{"age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"}, // Test public key + }) + + // Create first snapshot + ctx := context.Background() + snapshotID1 := "snapshot1" + err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { + snapshot := &database.Snapshot{ + ID: snapshotID1, + Hostname: "test-host", + VaultikVersion: "test", + StartedAt: time.Now(), + } + return repos.Snapshots.Create(ctx, tx, snapshot) + }) + require.NoError(t, err) + + // First scan + result1, err := scanner.Scan(ctx, "/", snapshotID1) + require.NoError(t, err) + // 4 files because root directory is also counted + assert.Equal(t, 4, result1.FilesScanned) + + // Modify two files + time.Sleep(10 * time.Millisecond) // Ensure mtime changes + err = afero.WriteFile(fs, "/file1.txt", []byte("Modified content 1"), 0644) + require.NoError(t, err) + err = afero.WriteFile(fs, "/file3.txt", []byte("Modified content 3"), 0644) + require.NoError(t, err) + + // Create second snapshot + snapshotID2 := "snapshot2" + err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { + snapshot := &database.Snapshot{ + ID: snapshotID2, + Hostname: "test-host", + VaultikVersion: "test", + StartedAt: time.Now(), + } + return repos.Snapshots.Create(ctx, tx, snapshot) + }) + require.NoError(t, err) + + // Second scan + result2, err := scanner.Scan(ctx, "/", snapshotID2) + require.NoError(t, err) + // 4 files because root directory is also counted + assert.Equal(t, 4, result2.FilesScanned) + + // Verify each file has exactly one set of chunks + for path := range files { + fileChunks, err := repos.FileChunks.GetByPath(ctx, path) + require.NoError(t, err) + assert.Len(t, fileChunks, 1, "File %s should have exactly 1 chunk association", path) + + chunkFiles, err := repos.ChunkFiles.GetByFilePath(ctx, path) + require.NoError(t, err) + assert.Len(t, chunkFiles, 1, "File %s should have exactly 1 chunk-file association", path) + } +} diff --git a/internal/backup/scanner.go b/internal/backup/scanner.go index 1fc8e03..e4b5cd3 100644 --- a/internal/backup/scanner.go +++ b/internal/backup/scanner.go @@ -163,12 +163,28 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc "files_skipped", result.FilesSkipped, "bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped))) + // Print detailed scan summary + fmt.Printf("\n=== Scan Summary ===\n") + fmt.Printf("Total files examined: %d\n", result.FilesScanned) + fmt.Printf("Files with content changes: %d\n", len(filesToProcess)) + fmt.Printf("Files with unchanged content: %d\n", result.FilesSkipped) + fmt.Printf("Total size of changed files: %s\n", humanize.Bytes(uint64(totalSizeToProcess))) + fmt.Printf("Total size of unchanged files: %s\n", humanize.Bytes(uint64(result.BytesSkipped))) + if len(filesToProcess) > 0 { + fmt.Printf("\nStarting snapshot of %d changed files...\n\n", len(filesToProcess)) + } else { + fmt.Printf("\nNo file contents have changed.\n") + fmt.Printf("Creating metadata-only snapshot to capture current state...\n\n") + } + // Phase 2: Process files and create chunks if len(filesToProcess) > 0 { log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)") if err := s.processPhase(ctx, filesToProcess, result); err != nil { return nil, fmt.Errorf("process phase failed: %w", err) } + } else { + log.Info("Phase 2/3: Skipping (no file contents changed, metadata-only snapshot)") } // Get final stats from packer @@ -181,7 +197,7 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc if s.snapshotID != "" { uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID) if err != nil { - log.Warn("Failed to get upload count from database", "error", err) + log.Warn("Failed to query upload count from database", "error", err) } else { result.BlobsCreated = int(uploadCount) } @@ -196,11 +212,17 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult var filesToProcess []*FileToProcess var mu sync.Mutex + // Set up periodic status output + lastStatusTime := time.Now() + statusInterval := 15 * time.Second + var filesScanned int64 + var bytesScanned int64 + log.Debug("Starting directory walk", "path", path) err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error { - log.Debug("Walking file", "path", path) + log.Debug("Scanning filesystem entry", "path", path) if err != nil { - log.Debug("Error walking path", "path", path, "error", err) + log.Debug("Error accessing filesystem entry", "path", path, "error", err) return err } @@ -232,6 +254,25 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult mu.Unlock() } + // Update scan statistics + if info.Mode().IsRegular() { + filesScanned++ + bytesScanned += info.Size() + } + + // Output periodic status + if time.Since(lastStatusTime) >= statusInterval { + mu.Lock() + changedCount := len(filesToProcess) + mu.Unlock() + + fmt.Printf("Scan progress: %d files examined, %s total size, %d files changed\n", + filesScanned, + humanize.Bytes(uint64(bytesScanned)), + changedCount) + lastStatusTime = time.Now() + } + return nil }) @@ -244,6 +285,13 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult // processPhase processes the files that need backing up func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error { + // Set up periodic status output + lastStatusTime := time.Now() + statusInterval := 15 * time.Second + startTime := time.Now() + filesProcessed := 0 + totalFiles := len(filesToProcess) + // Process each file for _, fileToProcess := range filesToProcess { // Update progress @@ -260,6 +308,26 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc if s.progress != nil { s.progress.GetStats().FilesProcessed.Add(1) } + + filesProcessed++ + + // Output periodic status + if time.Since(lastStatusTime) >= statusInterval { + elapsed := time.Since(startTime) + remaining := totalFiles - filesProcessed + var eta time.Duration + if filesProcessed > 0 { + eta = elapsed / time.Duration(filesProcessed) * time.Duration(remaining) + } + + fmt.Printf("Snapshot progress: %d/%d files processed, %d chunks created, %d blobs uploaded", + filesProcessed, totalFiles, result.ChunksCreated, result.BlobsCreated) + if remaining > 0 && eta > 0 { + fmt.Printf(", ETA: %s", eta.Round(time.Second)) + } + fmt.Println() + lastStatusTime = time.Now() + } } // Final flush (outside any transaction) @@ -338,7 +406,7 @@ func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo, } // Check if file has changed since last backup (no transaction needed for read) - log.Debug("Checking if file exists in database", "path", path) + log.Debug("Querying database for existing file record", "path", path) existingFile, err := s.repos.Files.GetByPath(ctx, path) if err != nil { return nil, false, fmt.Errorf("checking existing file: %w", err) @@ -347,7 +415,7 @@ func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo, fileChanged := existingFile == nil || s.hasFileChanged(existingFile, file) // Update file metadata and add to snapshot in a single transaction - log.Debug("Updating file metadata and adding to snapshot", "path", path, "changed", fileChanged, "snapshot", s.snapshotID) + log.Debug("Updating file record in database and adding to snapshot", "path", path, "changed", fileChanged, "snapshot", s.snapshotID) err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { // First create/update the file if err := s.repos.Files.Create(ctx, tx, file); err != nil { @@ -362,7 +430,7 @@ func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo, if err != nil { return nil, false, err } - log.Debug("File added to snapshot", "path", path) + log.Debug("File record added to snapshot association", "path", path) result.FilesScanned++ @@ -383,11 +451,11 @@ func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo, stats.BytesSkipped.Add(info.Size()) } // File hasn't changed, but we still need to associate existing chunks with this snapshot - log.Debug("File hasn't changed, associating existing chunks", "path", path) + log.Debug("File content unchanged, reusing existing chunks and blobs", "path", path) if err := s.associateExistingChunks(ctx, path); err != nil { return nil, false, fmt.Errorf("associating existing chunks: %w", err) } - log.Debug("Existing chunks associated", "path", path) + log.Debug("Existing chunks and blobs associated with snapshot", "path", path) } else { // File changed or is not a regular file result.BytesScanned += info.Size() @@ -428,29 +496,29 @@ func (s *Scanner) associateExistingChunks(ctx context.Context, path string) erro log.Debug("associateExistingChunks start", "path", path) // Get existing file chunks (no transaction needed for read) - log.Debug("Getting existing file chunks", "path", path) + log.Debug("Querying database for file's chunk associations", "path", path) fileChunks, err := s.repos.FileChunks.GetByFile(ctx, path) if err != nil { return fmt.Errorf("getting existing file chunks: %w", err) } - log.Debug("Got file chunks", "path", path, "count", len(fileChunks)) + log.Debug("Retrieved file chunk associations from database", "path", path, "count", len(fileChunks)) // Collect unique blob IDs that need to be added to snapshot blobsToAdd := make(map[string]string) // blob ID -> blob hash for i, fc := range fileChunks { - log.Debug("Processing chunk", "path", path, "chunk_index", i, "chunk_hash", fc.ChunkHash) + log.Debug("Looking up blob containing chunk", "path", path, "chunk_index", i, "chunk_hash", fc.ChunkHash) // Find which blob contains this chunk (no transaction needed for read) - log.Debug("Finding blob for chunk", "chunk_hash", fc.ChunkHash) + log.Debug("Querying database for blob containing chunk", "chunk_hash", fc.ChunkHash) blobChunk, err := s.repos.BlobChunks.GetByChunkHash(ctx, fc.ChunkHash) if err != nil { return fmt.Errorf("finding blob for chunk %s: %w", fc.ChunkHash, err) } if blobChunk == nil { - log.Warn("Chunk exists but not in any blob", "chunk", fc.ChunkHash, "file", path) + log.Warn("Chunk record exists in database but not associated with any blob", "chunk", fc.ChunkHash, "file", path) continue } - log.Debug("Found blob for chunk", "chunk_hash", fc.ChunkHash, "blob_id", blobChunk.BlobID) + log.Debug("Found blob record containing chunk", "chunk_hash", fc.ChunkHash, "blob_id", blobChunk.BlobID) // Track blob ID for later processing if _, exists := blobsToAdd[blobChunk.BlobID]; !exists { @@ -465,7 +533,7 @@ func (s *Scanner) associateExistingChunks(ctx context.Context, path string) erro return fmt.Errorf("getting blob %s: %w", blobID, err) } if blob == nil { - log.Warn("Blob record not found", "blob_id", blobID) + log.Warn("Blob record missing from database", "blob_id", blobID) delete(blobsToAdd, blobID) continue } @@ -474,14 +542,14 @@ func (s *Scanner) associateExistingChunks(ctx context.Context, path string) erro // Add blobs to snapshot using short transactions for blobID, blobHash := range blobsToAdd { - log.Debug("Adding blob to snapshot", "blob_id", blobID, "blob_hash", blobHash, "snapshot", s.snapshotID) + log.Debug("Adding blob reference to snapshot association", "blob_id", blobID, "blob_hash", blobHash, "snapshot", s.snapshotID) err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, blobHash) }) if err != nil { return fmt.Errorf("adding existing blob to snapshot: %w", err) } - log.Debug("Added blob to snapshot", "blob_id", blobID) + log.Debug("Created snapshot-blob association in database", "blob_id", blobID) } log.Debug("associateExistingChunks complete", "path", path, "blobs_processed", len(blobsToAdd)) @@ -490,7 +558,7 @@ func (s *Scanner) associateExistingChunks(ctx context.Context, path string) erro // handleBlobReady is called by the packer when a blob is finalized func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { - log.Debug("Blob handler called", "blob_hash", blobWithReader.Hash[:8]+"...") + log.Debug("Invoking blob upload handler", "blob_hash", blobWithReader.Hash[:8]+"...") startTime := time.Now().UTC() finishedBlob := blobWithReader.FinishedBlob @@ -547,7 +615,7 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { // Log upload stats uploadSpeed := float64(finishedBlob.Compressed) * 8 / uploadDuration.Seconds() // bits per second - log.Info("Uploaded blob to S3", + log.Info("Successfully uploaded blob to S3 storage", "path", blobPath, "size", humanize.Bytes(uint64(finishedBlob.Compressed)), "duration", uploadDuration, @@ -639,9 +707,9 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT default: } - log.Debug("Processing chunk", + log.Debug("Processing content-defined chunk from file", "file", fileToProcess.Path, - "chunk", chunkIndex, + "chunk_index", chunkIndex, "hash", chunk.Hash, "size", chunk.Size) @@ -737,13 +805,22 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT return fmt.Errorf("chunking file: %w", err) } - log.Debug("Completed chunking file", + log.Debug("Completed snapshotting file", "path", fileToProcess.Path, "file_hash", fileHash, "chunks", len(chunks)) // Store file-chunk associations and chunk-file mappings in database err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + // First, delete all existing file_chunks and chunk_files for this file + // This ensures old chunks are no longer associated when file content changes + if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil { + return fmt.Errorf("deleting old file chunks: %w", err) + } + if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil { + return fmt.Errorf("deleting old chunk files: %w", err) + } + for _, ci := range chunks { // Create file-chunk mapping if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil { diff --git a/internal/backup/snapshot.go b/internal/backup/snapshot.go index 5a15189..388bb3f 100644 --- a/internal/backup/snapshot.go +++ b/internal/backup/snapshot.go @@ -215,12 +215,12 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st log.Debug("Database copy complete", "size", getFileSize(tempDBPath)) // Step 2: Clean the temp database to only contain current snapshot data - log.Debug("Cleaning temporary snapshot database to contain only current snapshot", "snapshot_id", snapshotID, "db_path", tempDBPath) + log.Debug("Cleaning temporary database to contain only current snapshot data", "snapshot_id", snapshotID, "db_path", tempDBPath) stats, err := sm.cleanSnapshotDB(ctx, tempDBPath, snapshotID) if err != nil { return fmt.Errorf("cleaning snapshot database: %w", err) } - log.Info("Snapshot database cleanup complete", + log.Info("Temporary database cleanup complete", "db_path", tempDBPath, "size_after_clean", humanize.Bytes(uint64(getFileSize(tempDBPath))), "files", stats.FileCount, @@ -642,20 +642,20 @@ func (sm *SnapshotManager) CleanupIncompleteSnapshots(ctx context.Context, hostn if err != nil { // Metadata doesn't exist in S3 - this is an incomplete snapshot - log.Info("Cleaning up incomplete snapshot", "snapshot_id", snapshot.ID, "started_at", snapshot.StartedAt) + log.Info("Cleaning up incomplete snapshot record", "snapshot_id", snapshot.ID, "started_at", snapshot.StartedAt) // Delete the snapshot and all its associations if err := sm.deleteSnapshot(ctx, snapshot.ID); err != nil { return fmt.Errorf("deleting incomplete snapshot %s: %w", snapshot.ID, err) } - log.Info("Deleted incomplete snapshot", "snapshot_id", snapshot.ID) + log.Info("Deleted incomplete snapshot record and associated data", "snapshot_id", snapshot.ID) } else { // Metadata exists - this snapshot was completed but database wasn't updated // This shouldn't happen in normal operation, but mark it complete - log.Warn("Found snapshot with metadata but incomplete in DB", "snapshot_id", snapshot.ID) + log.Warn("Found snapshot with S3 metadata but incomplete in database", "snapshot_id", snapshot.ID) if err := sm.repos.Snapshots.MarkComplete(ctx, nil, snapshot.ID); err != nil { - log.Error("Failed to mark snapshot complete", "snapshot_id", snapshot.ID, "error", err) + log.Error("Failed to mark snapshot as complete in database", "snapshot_id", snapshot.ID, "error", err) } } } @@ -681,7 +681,7 @@ func (sm *SnapshotManager) deleteSnapshot(ctx context.Context, snapshotID string } // Clean up orphaned data - log.Debug("Cleaning up orphaned data in main database") + log.Debug("Cleaning up orphaned records in main database") if err := sm.cleanupOrphanedData(ctx); err != nil { return fmt.Errorf("cleaning up orphaned data: %w", err) } @@ -698,28 +698,28 @@ func (sm *SnapshotManager) cleanupOrphanedData(ctx context.Context) error { // 4. Delete orphaned chunks (now safe after all blob_chunks are gone) // Delete orphaned files (files not in any snapshot) - log.Debug("Deleting orphaned files") + log.Debug("Deleting orphaned file records from database") if err := sm.repos.Files.DeleteOrphaned(ctx); err != nil { return fmt.Errorf("deleting orphaned files: %w", err) } // Delete orphaned blobs (blobs not in any snapshot) // This will cascade delete blob_chunks for deleted blobs - log.Debug("Deleting orphaned blobs") + log.Debug("Deleting orphaned blob records from database") if err := sm.repos.Blobs.DeleteOrphaned(ctx); err != nil { return fmt.Errorf("deleting orphaned blobs: %w", err) } // Delete orphaned blob_chunks entries // This handles cases where the blob still exists but chunks were deleted - log.Debug("Deleting orphaned blob_chunks") + log.Debug("Deleting orphaned blob_chunks associations from database") if err := sm.repos.BlobChunks.DeleteOrphaned(ctx); err != nil { return fmt.Errorf("deleting orphaned blob_chunks: %w", err) } // Delete orphaned chunks (chunks not referenced by any file) // This must come after cleaning up blob_chunks to avoid foreign key violations - log.Debug("Deleting orphaned chunks") + log.Debug("Deleting orphaned chunk records from database") if err := sm.repos.Chunks.DeleteOrphaned(ctx); err != nil { return fmt.Errorf("deleting orphaned chunks: %w", err) } @@ -729,44 +729,44 @@ func (sm *SnapshotManager) cleanupOrphanedData(ctx context.Context) error { // deleteOtherSnapshots deletes all snapshots except the current one func (sm *SnapshotManager) deleteOtherSnapshots(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error { - log.Debug("[Temp DB Cleanup] Deleting other snapshots", "keeping", currentSnapshotID) + log.Debug("[Temp DB Cleanup] Deleting all snapshot records except current", "keeping", currentSnapshotID) database.LogSQL("Execute", "DELETE FROM snapshots WHERE id != ?", currentSnapshotID) result, err := tx.ExecContext(ctx, "DELETE FROM snapshots WHERE id != ?", currentSnapshotID) if err != nil { return fmt.Errorf("deleting other snapshots: %w", err) } rowsAffected, _ := result.RowsAffected() - log.Debug("[Temp DB Cleanup] Deleted snapshots", "count", rowsAffected) + log.Debug("[Temp DB Cleanup] Deleted snapshot records from database", "count", rowsAffected) return nil } // deleteOrphanedSnapshotAssociations deletes snapshot_files and snapshot_blobs for deleted snapshots func (sm *SnapshotManager) deleteOrphanedSnapshotAssociations(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error { // Delete orphaned snapshot_files - log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_files") + log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_files associations") database.LogSQL("Execute", "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID) result, err := tx.ExecContext(ctx, "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID) if err != nil { return fmt.Errorf("deleting orphaned snapshot_files: %w", err) } rowsAffected, _ := result.RowsAffected() - log.Debug("[Temp DB Cleanup] Deleted snapshot_files", "count", rowsAffected) + log.Debug("[Temp DB Cleanup] Deleted snapshot_files associations", "count", rowsAffected) // Delete orphaned snapshot_blobs - log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_blobs") + log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_blobs associations") database.LogSQL("Execute", "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID) result, err = tx.ExecContext(ctx, "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID) if err != nil { return fmt.Errorf("deleting orphaned snapshot_blobs: %w", err) } rowsAffected, _ = result.RowsAffected() - log.Debug("[Temp DB Cleanup] Deleted snapshot_blobs", "count", rowsAffected) + log.Debug("[Temp DB Cleanup] Deleted snapshot_blobs associations", "count", rowsAffected) return nil } // deleteOrphanedFiles deletes files not in the current snapshot func (sm *SnapshotManager) deleteOrphanedFiles(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error { - log.Debug("[Temp DB Cleanup] Deleting files not in current snapshot") + log.Debug("[Temp DB Cleanup] Deleting file records not referenced by current snapshot") database.LogSQL("Execute", `DELETE FROM files WHERE NOT EXISTS (SELECT 1 FROM snapshot_files WHERE snapshot_files.file_id = files.id AND snapshot_files.snapshot_id = ?)`, currentSnapshotID) result, err := tx.ExecContext(ctx, ` DELETE FROM files @@ -779,16 +779,16 @@ func (sm *SnapshotManager) deleteOrphanedFiles(ctx context.Context, tx *sql.Tx, return fmt.Errorf("deleting orphaned files: %w", err) } rowsAffected, _ := result.RowsAffected() - log.Debug("[Temp DB Cleanup] Deleted files", "count", rowsAffected) + log.Debug("[Temp DB Cleanup] Deleted file records from database", "count", rowsAffected) // Note: file_chunks will be deleted via CASCADE - log.Debug("[Temp DB Cleanup] file_chunks will be deleted via CASCADE") + log.Debug("[Temp DB Cleanup] file_chunks associations deleted via CASCADE") return nil } // deleteOrphanedChunkToFileMappings deletes chunk_files entries for deleted files func (sm *SnapshotManager) deleteOrphanedChunkToFileMappings(ctx context.Context, tx *sql.Tx) error { - log.Debug("[Temp DB Cleanup] Deleting orphaned chunk_files") + log.Debug("[Temp DB Cleanup] Deleting orphaned chunk_files associations") database.LogSQL("Execute", `DELETE FROM chunk_files WHERE NOT EXISTS (SELECT 1 FROM files WHERE files.id = chunk_files.file_id)`) result, err := tx.ExecContext(ctx, ` DELETE FROM chunk_files @@ -800,13 +800,13 @@ func (sm *SnapshotManager) deleteOrphanedChunkToFileMappings(ctx context.Context return fmt.Errorf("deleting orphaned chunk_files: %w", err) } rowsAffected, _ := result.RowsAffected() - log.Debug("[Temp DB Cleanup] Deleted chunk_files", "count", rowsAffected) + log.Debug("[Temp DB Cleanup] Deleted chunk_files associations", "count", rowsAffected) return nil } // deleteOrphanedBlobs deletes blobs not in the current snapshot func (sm *SnapshotManager) deleteOrphanedBlobs(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error { - log.Debug("[Temp DB Cleanup] Deleting blobs not in current snapshot") + log.Debug("[Temp DB Cleanup] Deleting blob records not referenced by current snapshot") database.LogSQL("Execute", `DELETE FROM blobs WHERE NOT EXISTS (SELECT 1 FROM snapshot_blobs WHERE snapshot_blobs.blob_hash = blobs.blob_hash AND snapshot_blobs.snapshot_id = ?)`, currentSnapshotID) result, err := tx.ExecContext(ctx, ` DELETE FROM blobs @@ -819,13 +819,13 @@ func (sm *SnapshotManager) deleteOrphanedBlobs(ctx context.Context, tx *sql.Tx, return fmt.Errorf("deleting orphaned blobs: %w", err) } rowsAffected, _ := result.RowsAffected() - log.Debug("[Temp DB Cleanup] Deleted blobs not in snapshot", "count", rowsAffected) + log.Debug("[Temp DB Cleanup] Deleted blob records from database", "count", rowsAffected) return nil } // deleteOrphanedBlobToChunkMappings deletes blob_chunks entries for deleted blobs func (sm *SnapshotManager) deleteOrphanedBlobToChunkMappings(ctx context.Context, tx *sql.Tx) error { - log.Debug("[Temp DB Cleanup] Deleting orphaned blob_chunks") + log.Debug("[Temp DB Cleanup] Deleting orphaned blob_chunks associations") database.LogSQL("Execute", `DELETE FROM blob_chunks WHERE NOT EXISTS (SELECT 1 FROM blobs WHERE blobs.id = blob_chunks.blob_id)`) result, err := tx.ExecContext(ctx, ` DELETE FROM blob_chunks @@ -837,13 +837,13 @@ func (sm *SnapshotManager) deleteOrphanedBlobToChunkMappings(ctx context.Context return fmt.Errorf("deleting orphaned blob_chunks: %w", err) } rowsAffected, _ := result.RowsAffected() - log.Debug("[Temp DB Cleanup] Deleted blob_chunks", "count", rowsAffected) + log.Debug("[Temp DB Cleanup] Deleted blob_chunks associations", "count", rowsAffected) return nil } // deleteOrphanedChunks deletes chunks not referenced by any file func (sm *SnapshotManager) deleteOrphanedChunks(ctx context.Context, tx *sql.Tx) error { - log.Debug("[Temp DB Cleanup] Deleting orphaned chunks") + log.Debug("[Temp DB Cleanup] Deleting orphaned chunk records") database.LogSQL("Execute", `DELETE FROM chunks WHERE NOT EXISTS (SELECT 1 FROM file_chunks WHERE file_chunks.chunk_hash = chunks.chunk_hash)`) result, err := tx.ExecContext(ctx, ` DELETE FROM chunks @@ -855,6 +855,6 @@ func (sm *SnapshotManager) deleteOrphanedChunks(ctx context.Context, tx *sql.Tx) return fmt.Errorf("deleting orphaned chunks: %w", err) } rowsAffected, _ := result.RowsAffected() - log.Debug("[Temp DB Cleanup] Deleted chunks", "count", rowsAffected) + log.Debug("[Temp DB Cleanup] Deleted chunk records from database", "count", rowsAffected) return nil } diff --git a/internal/blob/packer.go b/internal/blob/packer.go index 75b6ab4..bc462f1 100644 --- a/internal/blob/packer.go +++ b/internal/blob/packer.go @@ -278,7 +278,7 @@ func (p *Packer) startNewBlob() error { size: 0, } - log.Debug("Started new blob", "blob_id", blobID, "temp_file", tempFile.Name()) + log.Debug("Created new blob container", "blob_id", blobID, "temp_file", tempFile.Name()) return nil } @@ -286,7 +286,7 @@ func (p *Packer) startNewBlob() error { func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error { // Skip if chunk already in current blob if p.currentBlob.chunkSet[chunk.Hash] { - log.Debug("Skipping duplicate chunk in blob", "chunk_hash", chunk.Hash) + log.Debug("Skipping duplicate chunk already in current blob", "chunk_hash", chunk.Hash) return nil } @@ -320,7 +320,7 @@ func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error { return p.repos.BlobChunks.Create(ctx, tx, blobChunk) }) if err != nil { - log.Error("Failed to store blob-chunk association", "error", err, + log.Error("Failed to store blob-chunk association in database", "error", err, "blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash) // Continue anyway - we can reconstruct this later if needed } @@ -329,7 +329,7 @@ func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error { // Update total size p.currentBlob.size += chunkSize - log.Debug("Added chunk to blob", + log.Debug("Added chunk to blob container", "blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash, "chunk_size", len(chunk.Data), @@ -410,7 +410,7 @@ func (p *Packer) finalizeCurrentBlob() error { } compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed) - log.Info("Finalized blob", + log.Info("Finalized blob (compressed and encrypted)", "hash", blobHash, "chunks", len(chunkRefs), "uncompressed", finished.Uncompressed, @@ -420,7 +420,7 @@ func (p *Packer) finalizeCurrentBlob() error { // Call blob handler if set if p.blobHandler != nil { - log.Debug("Calling blob handler", "blob_hash", blobHash[:8]+"...") + log.Debug("Invoking blob handler callback", "blob_hash", blobHash[:8]+"...") // Reset file position for handler if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { p.cleanupTempFile() @@ -441,7 +441,7 @@ func (p *Packer) finalizeCurrentBlob() error { // Note: blob handler is responsible for closing/cleaning up temp file p.currentBlob = nil } else { - log.Debug("No blob handler set", "blob_hash", blobHash[:8]+"...") + log.Debug("No blob handler callback configured", "blob_hash", blobHash[:8]+"...") // No handler, need to read data for legacy behavior if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { p.cleanupTempFile() diff --git a/internal/database/chunk_files.go b/internal/database/chunk_files.go index 7c9d6f9..cb03d85 100644 --- a/internal/database/chunk_files.go +++ b/internal/database/chunk_files.go @@ -114,3 +114,21 @@ func (r *ChunkFileRepository) GetByFileID(ctx context.Context, fileID string) ([ return chunkFiles, rows.Err() } + +// DeleteByFileID deletes all chunk_files entries for a given file ID +func (r *ChunkFileRepository) DeleteByFileID(ctx context.Context, tx *sql.Tx, fileID string) error { + query := `DELETE FROM chunk_files WHERE file_id = ?` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, fileID) + } else { + _, err = r.db.ExecWithLog(ctx, query, fileID) + } + + if err != nil { + return fmt.Errorf("deleting chunk files: %w", err) + } + + return nil +}