From 24c5e8c5a6b2c6276bc1aaa636eb260befec70f9 Mon Sep 17 00:00:00 2001 From: sneak Date: Fri, 19 Dec 2025 12:40:45 +0700 Subject: [PATCH] Refactor: Create file records only after successful chunking - Scan phase now only collects files to process, no DB writes - Unchanged files get snapshot_files associations via batch (no new records) - New/changed files get records created during processing after chunking - Reduces DB writes significantly (only changed files need new records) - Avoids orphaned file records if backup is interrupted mid-way --- internal/snapshot/scanner.go | 90 +++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 33 deletions(-) diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index db752b2..ba7b109 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -145,16 +145,25 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc // (builds existingFiles map during walk to avoid double traversal) log.Info("Phase 1/3: Scanning directory structure") existingFiles := make(map[string]struct{}) - filesToProcess, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles) + scanResult, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles) if err != nil { return nil, fmt.Errorf("scan phase failed: %w", err) } + filesToProcess := scanResult.FilesToProcess // Phase 1b: Detect deleted files by comparing DB against scanned files if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil { return nil, fmt.Errorf("detecting deleted files: %w", err) } + // Phase 1c: Associate unchanged files with this snapshot (no new records needed) + if len(scanResult.UnchangedFileIDs) > 0 { + fmt.Printf("Associating %s unchanged files with snapshot...\n", formatNumber(len(scanResult.UnchangedFileIDs))) + if err := s.batchAddFilesToSnapshot(ctx, scanResult.UnchangedFileIDs); err != nil { + return nil, fmt.Errorf("associating unchanged files: %w", err) + } + } + // Calculate total size to process var totalSizeToProcess int64 for _, file := range filesToProcess { @@ -234,15 +243,22 @@ func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]* return result, nil } +// ScanPhaseResult contains the results of the scan phase +type ScanPhaseResult struct { + FilesToProcess []*FileToProcess + UnchangedFileIDs []string // IDs of unchanged files to associate with snapshot +} + // scanPhase performs the initial directory scan to identify files to process // It uses the pre-loaded knownFiles map for fast change detection without DB queries // It also populates existingFiles map for deletion detection -func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) ([]*FileToProcess, error) { +// Returns files needing processing and IDs of unchanged files for snapshot association +func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) (*ScanPhaseResult, error) { // Use known file count as estimate for progress (accurate for subsequent backups) estimatedTotal := int64(len(knownFiles)) var filesToProcess []*FileToProcess - var allFiles []*database.File // Collect all files for batch insert + var unchangedFileIDs []string // Just IDs - no new records needed var mu sync.Mutex // Set up periodic status output @@ -277,13 +293,16 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles) mu.Lock() - allFiles = append(allFiles, file) - if needsProcessing && info.Size() > 0 { + if needsProcessing { + // New or changed file - will create record after processing filesToProcess = append(filesToProcess, &FileToProcess{ Path: filePath, FileInfo: info, File: file, }) + } else if file.ID != "" { + // Unchanged file with existing ID - just need snapshot association + unchangedFileIDs = append(unchangedFileIDs, file.ID) } filesScanned++ changedCount := len(filesToProcess) @@ -346,15 +365,10 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult return nil, err } - // Batch insert all files and snapshot associations - if len(allFiles) > 0 { - fmt.Printf("Writing %s file records to database...\n", formatNumber(len(allFiles))) - if err := s.batchInsertFiles(ctx, allFiles); err != nil { - return nil, fmt.Errorf("batch inserting files: %w", err) - } - } - - return filesToProcess, nil + return &ScanPhaseResult{ + FilesToProcess: filesToProcess, + UnchangedFileIDs: unchangedFileIDs, + }, nil } // checkFileInMemory checks if a file needs processing using the in-memory map @@ -406,15 +420,16 @@ func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles ma return file, false } -// batchInsertFiles inserts files and snapshot associations in batches -func (s *Scanner) batchInsertFiles(ctx context.Context, files []*database.File) error { +// batchAddFilesToSnapshot adds existing file IDs to the snapshot association table +// This is used for unchanged files that already have records in the database +func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []string) error { const batchSize = 1000 startTime := time.Now() lastStatusTime := time.Now() statusInterval := 5 * time.Second - for i := 0; i < len(files); i += batchSize { + for i := 0; i < len(fileIDs); i += batchSize { // Check context cancellation select { case <-ctx.Done(): @@ -423,17 +438,14 @@ func (s *Scanner) batchInsertFiles(ctx context.Context, files []*database.File) } end := i + batchSize - if end > len(files) { - end = len(files) + if end > len(fileIDs) { + end = len(fileIDs) } - batch := files[i:end] + batch := fileIDs[i:end] err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { - for _, file := range batch { - if err := s.repos.Files.Create(ctx, tx, file); err != nil { - return fmt.Errorf("creating file %s: %w", file.Path, err) - } - if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, file.ID); err != nil { + for _, fileID := range batch { + if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, fileID); err != nil { return fmt.Errorf("adding file to snapshot: %w", err) } } @@ -447,17 +459,17 @@ func (s *Scanner) batchInsertFiles(ctx context.Context, files []*database.File) if time.Since(lastStatusTime) >= statusInterval { elapsed := time.Since(startTime) rate := float64(end) / elapsed.Seconds() - pct := float64(end) / float64(len(files)) * 100 - fmt.Printf("Database write: %s/%s files (%.1f%%), %.0f files/sec\n", - formatNumber(end), formatNumber(len(files)), pct, rate) + pct := float64(end) / float64(len(fileIDs)) * 100 + fmt.Printf("Associating files: %s/%s (%.1f%%), %.0f files/sec\n", + formatNumber(end), formatNumber(len(fileIDs)), pct, rate) lastStatusTime = time.Now() } } elapsed := time.Since(startTime) - rate := float64(len(files)) / elapsed.Seconds() - fmt.Printf("Database write complete: %s files in %s (%.0f files/sec)\n", - formatNumber(len(files)), elapsed.Round(time.Second), rate) + rate := float64(len(fileIDs)) / elapsed.Seconds() + fmt.Printf("Associated %s unchanged files in %s (%.0f files/sec)\n", + formatNumber(len(fileIDs)), elapsed.Round(time.Second), rate) return nil } @@ -818,9 +830,16 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT "file_hash", fileHash, "chunks", len(chunks)) - // Store file-chunk associations and chunk-file mappings in database + // Store file record, chunk associations, and snapshot association in database + // This happens AFTER successful chunking to avoid orphaned records on interruption err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { - // First, delete all existing file_chunks and chunk_files for this file + // Create or update the file record + // Files.Create uses INSERT OR REPLACE, so it handles both new and changed files + if err := s.repos.Files.Create(txCtx, tx, fileToProcess.File); err != nil { + return fmt.Errorf("creating file record: %w", err) + } + + // Delete any existing file_chunks and chunk_files for this file // This ensures old chunks are no longer associated when file content changes if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil { return fmt.Errorf("deleting old file chunks: %w", err) @@ -829,6 +848,11 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT return fmt.Errorf("deleting old chunk files: %w", err) } + // Update chunk associations with the file ID (now that we have it) + for i := range chunks { + chunks[i].fileChunk.FileID = fileToProcess.File.ID + } + for _, ci := range chunks { // Create file-chunk mapping if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil {