From 75c25280c30688b9a960c8f6e90090e92d1cb2ef Mon Sep 17 00:00:00 2001 From: user Date: Tue, 17 Mar 2026 02:38:55 -0700 Subject: [PATCH] refactor: break up scanner.go methods into smaller helpers --- internal/snapshot/scanner.go | 393 +++++++++++++++++++++-------------- 1 file changed, 238 insertions(+), 155 deletions(-) diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index 5c8fa88..8804dc2 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -180,18 +180,10 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc } // Phase 0: Load known files and chunks from database into memory for fast lookup - fmt.Println("Loading known files from database...") - knownFiles, err := s.loadKnownFiles(ctx, path) + knownFiles, err := s.loadDatabaseState(ctx, path) if err != nil { - return nil, fmt.Errorf("loading known files: %w", err) + return nil, err } - fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles))) - - fmt.Println("Loading known chunks from database...") - if err := s.loadKnownChunks(ctx); err != nil { - return nil, fmt.Errorf("loading known chunks: %w", err) - } - fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks))) // Phase 1: Scan directory, collect files to process, and track existing files // (builds existingFiles map during walk to avoid double traversal) @@ -216,36 +208,8 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc } } - // Calculate total size to process - var totalSizeToProcess int64 - for _, file := range filesToProcess { - totalSizeToProcess += file.FileInfo.Size() - } - - // Update progress with total size and file count - if s.progress != nil { - s.progress.SetTotalSize(totalSizeToProcess) - s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess))) - } - - log.Info("Phase 1 complete", - "total_files", len(filesToProcess), - "total_size", humanize.Bytes(uint64(totalSizeToProcess)), - "files_skipped", result.FilesSkipped, - "bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped))) - - // Print scan summary - fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)", - formatNumber(result.FilesScanned), - humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)), - formatNumber(len(filesToProcess)), - humanize.Bytes(uint64(totalSizeToProcess))) - if result.FilesDeleted > 0 { - fmt.Printf(", %s deleted (%s)", - formatNumber(result.FilesDeleted), - humanize.Bytes(uint64(result.BytesDeleted))) - } - fmt.Println() + // Summarize scan phase results and update progress + s.summarizeScanPhase(result, filesToProcess) // Phase 2: Process files and create chunks if len(filesToProcess) > 0 { @@ -259,7 +223,66 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)") } - // Get final stats from packer + // Finalize result with blob statistics + s.finalizeScanResult(ctx, result) + + return result, nil +} + +// loadDatabaseState loads known files and chunks from the database into memory for fast lookup +// This avoids per-file and per-chunk database queries during the scan and process phases +func (s *Scanner) loadDatabaseState(ctx context.Context, path string) (map[string]*database.File, error) { + fmt.Println("Loading known files from database...") + knownFiles, err := s.loadKnownFiles(ctx, path) + if err != nil { + return nil, fmt.Errorf("loading known files: %w", err) + } + fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles))) + + fmt.Println("Loading known chunks from database...") + if err := s.loadKnownChunks(ctx); err != nil { + return nil, fmt.Errorf("loading known chunks: %w", err) + } + fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks))) + + return knownFiles, nil +} + +// summarizeScanPhase calculates total size to process, updates progress tracking, +// and prints the scan phase summary with file counts and sizes +func (s *Scanner) summarizeScanPhase(result *ScanResult, filesToProcess []*FileToProcess) { + var totalSizeToProcess int64 + for _, file := range filesToProcess { + totalSizeToProcess += file.FileInfo.Size() + } + + if s.progress != nil { + s.progress.SetTotalSize(totalSizeToProcess) + s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess))) + } + + log.Info("Phase 1 complete", + "total_files", len(filesToProcess), + "total_size", humanize.Bytes(uint64(totalSizeToProcess)), + "files_skipped", result.FilesSkipped, + "bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped))) + + fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)", + formatNumber(result.FilesScanned), + humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)), + formatNumber(len(filesToProcess)), + humanize.Bytes(uint64(totalSizeToProcess))) + if result.FilesDeleted > 0 { + fmt.Printf(", %s deleted (%s)", + formatNumber(result.FilesDeleted), + humanize.Bytes(uint64(result.BytesDeleted))) + } + fmt.Println() +} + +// finalizeScanResult populates final blob statistics in the scan result +// by querying the packer and database for blob/upload counts +func (s *Scanner) finalizeScanResult(ctx context.Context, result *ScanResult) { blobs := s.packer.GetFinishedBlobs() result.BlobsCreated += len(blobs) @@ -276,7 +299,6 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc } result.EndTime = time.Now().UTC() - return result, nil } // loadKnownFiles loads all known files from the database into a map for fast lookup @@ -424,12 +446,38 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error { flushStart := time.Now() log.Debug("flushCompletedPendingFiles: starting") + // Partition pending files into those ready to flush and those still waiting + canFlush, stillPendingCount := s.partitionPendingByChunkStatus() + + if len(canFlush) == 0 { + log.Debug("flushCompletedPendingFiles: nothing to flush") + return nil + } + + log.Debug("Flushing completed files after blob finalize", + "files_to_flush", len(canFlush), + "files_still_pending", stillPendingCount) + + // Collect all data for batch operations + allFiles, allFileIDs, allFileChunks, allChunkFiles := s.collectBatchFlushData(canFlush) + + // Execute the batch flush in a single transaction + log.Debug("flushCompletedPendingFiles: starting transaction") + txStart := time.Now() + err := s.executeBatchFileFlush(ctx, allFiles, allFileIDs, allFileChunks, allChunkFiles) + log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart)) + log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart)) + return err +} + +// partitionPendingByChunkStatus separates pending files into those whose chunks +// are all committed to DB (ready to flush) and those still waiting on pending chunks. +// Updates s.pendingFiles to contain only the still-pending files. +func (s *Scanner) partitionPendingByChunkStatus() (canFlush []pendingFileData, stillPendingCount int) { log.Debug("flushCompletedPendingFiles: acquiring pendingFilesMu lock") s.pendingFilesMu.Lock() log.Debug("flushCompletedPendingFiles: acquired lock", "pending_files", len(s.pendingFiles)) - // Separate files into complete (can flush) and incomplete (keep pending) - var canFlush []pendingFileData var stillPending []pendingFileData log.Debug("flushCompletedPendingFiles: checking which files can flush") @@ -454,18 +502,15 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error { s.pendingFilesMu.Unlock() log.Debug("flushCompletedPendingFiles: released lock") - if len(canFlush) == 0 { - log.Debug("flushCompletedPendingFiles: nothing to flush") - return nil - } + return canFlush, len(stillPending) +} - log.Debug("Flushing completed files after blob finalize", - "files_to_flush", len(canFlush), - "files_still_pending", len(stillPending)) - - // Collect all data for batch operations +// collectBatchFlushData aggregates file records, IDs, file-chunk mappings, and chunk-file +// mappings from the given pending file data for efficient batch database operations +func (s *Scanner) collectBatchFlushData(canFlush []pendingFileData) ([]*database.File, []types.FileID, []database.FileChunk, []database.ChunkFile) { log.Debug("flushCompletedPendingFiles: collecting data for batch ops") collectStart := time.Now() + var allFileChunks []database.FileChunk var allChunkFiles []database.ChunkFile var allFileIDs []types.FileID @@ -477,16 +522,20 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error { allFileIDs = append(allFileIDs, data.file.ID) allFiles = append(allFiles, data.file) } + log.Debug("flushCompletedPendingFiles: collected data", "duration", time.Since(collectStart), "file_chunks", len(allFileChunks), "chunk_files", len(allChunkFiles), "files", len(allFiles)) - // Flush the complete files using batch operations - log.Debug("flushCompletedPendingFiles: starting transaction") - txStart := time.Now() - err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + return allFiles, allFileIDs, allFileChunks, allChunkFiles +} + +// executeBatchFileFlush writes all collected file data to the database in a single transaction, +// including deleting old mappings, creating file records, and adding snapshot associations +func (s *Scanner) executeBatchFileFlush(ctx context.Context, allFiles []*database.File, allFileIDs []types.FileID, allFileChunks []database.FileChunk, allChunkFiles []database.ChunkFile) error { + return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { log.Debug("flushCompletedPendingFiles: inside transaction") // Batch delete old file_chunks and chunk_files @@ -539,9 +588,6 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error { log.Debug("flushCompletedPendingFiles: transaction complete") return nil }) - log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart)) - log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart)) - return err } // ScanPhaseResult contains the results of the scan phase @@ -623,62 +669,11 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult mu.Unlock() // Update result stats - if needsProcessing { - result.BytesScanned += info.Size() - if s.progress != nil { - s.progress.GetStats().BytesScanned.Add(info.Size()) - } - } else { - result.FilesSkipped++ - result.BytesSkipped += info.Size() - if s.progress != nil { - s.progress.GetStats().FilesSkipped.Add(1) - s.progress.GetStats().BytesSkipped.Add(info.Size()) - } - } - result.FilesScanned++ - if s.progress != nil { - s.progress.GetStats().FilesScanned.Add(1) - } + s.updateScanEntryStats(result, needsProcessing, info) // Output periodic status if time.Since(lastStatusTime) >= statusInterval { - elapsed := time.Since(startTime) - rate := float64(filesScanned) / elapsed.Seconds() - - // Build status line - use estimate if available (not first backup) - if estimatedTotal > 0 { - // Show actual scanned vs estimate (may exceed estimate if files were added) - pct := float64(filesScanned) / float64(estimatedTotal) * 100 - if pct > 100 { - pct = 100 // Cap at 100% for display - } - remaining := estimatedTotal - filesScanned - if remaining < 0 { - remaining = 0 - } - var eta time.Duration - if rate > 0 && remaining > 0 { - eta = time.Duration(float64(remaining)/rate) * time.Second - } - fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed", - formatNumber(int(filesScanned)), - pct, - formatNumber(changedCount), - rate, - elapsed.Round(time.Second)) - if eta > 0 { - fmt.Printf(", ETA %s", eta.Round(time.Second)) - } - fmt.Println() - } else { - // First backup - no estimate available - fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n", - formatNumber(int(filesScanned)), - formatNumber(changedCount), - rate, - elapsed.Round(time.Second)) - } + printScanProgressLine(filesScanned, changedCount, estimatedTotal, startTime) lastStatusTime = time.Now() } @@ -695,6 +690,68 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult }, nil } +// updateScanEntryStats updates the scan result and progress reporter statistics +// for a single scanned file entry based on whether it needs processing +func (s *Scanner) updateScanEntryStats(result *ScanResult, needsProcessing bool, info os.FileInfo) { + if needsProcessing { + result.BytesScanned += info.Size() + if s.progress != nil { + s.progress.GetStats().BytesScanned.Add(info.Size()) + } + } else { + result.FilesSkipped++ + result.BytesSkipped += info.Size() + if s.progress != nil { + s.progress.GetStats().FilesSkipped.Add(1) + s.progress.GetStats().BytesSkipped.Add(info.Size()) + } + } + result.FilesScanned++ + if s.progress != nil { + s.progress.GetStats().FilesScanned.Add(1) + } +} + +// printScanProgressLine prints a periodic progress line during the scan phase, +// showing files scanned, percentage complete (if estimate available), and ETA +func printScanProgressLine(filesScanned int64, changedCount int, estimatedTotal int64, startTime time.Time) { + elapsed := time.Since(startTime) + rate := float64(filesScanned) / elapsed.Seconds() + + if estimatedTotal > 0 { + // Show actual scanned vs estimate (may exceed estimate if files were added) + pct := float64(filesScanned) / float64(estimatedTotal) * 100 + if pct > 100 { + pct = 100 // Cap at 100% for display + } + remaining := estimatedTotal - filesScanned + if remaining < 0 { + remaining = 0 + } + var eta time.Duration + if rate > 0 && remaining > 0 { + eta = time.Duration(float64(remaining)/rate) * time.Second + } + fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed", + formatNumber(int(filesScanned)), + pct, + formatNumber(changedCount), + rate, + elapsed.Round(time.Second)) + if eta > 0 { + fmt.Printf(", ETA %s", eta.Round(time.Second)) + } + fmt.Println() + } else { + // First backup - no estimate available + fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n", + formatNumber(int(filesScanned)), + formatNumber(changedCount), + rate, + elapsed.Round(time.Second)) + } +} + // checkFileInMemory checks if a file needs processing using the in-memory map // No database access is performed - this is purely CPU/memory work func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) { @@ -830,22 +887,13 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc s.progress.GetStats().CurrentFile.Store(fileToProcess.Path) } - // Process file in streaming fashion - if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil { - // Handle files that were deleted between scan and process phases - if errors.Is(err, os.ErrNotExist) { - log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path) - result.FilesSkipped++ - continue - } - // Skip file read errors if --skip-errors is enabled - if s.skipErrors { - log.Error("ERROR: Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err) - fmt.Printf("ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err) - result.FilesSkipped++ - continue - } - return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err) + // Process file with error handling for deleted files and skip-errors mode + skipped, err := s.processFileWithErrorHandling(ctx, fileToProcess, result) + if err != nil { + return err + } + if skipped { + continue } // Update files processed counter @@ -858,36 +906,71 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc // Output periodic status if time.Since(lastStatusTime) >= statusInterval { - elapsed := time.Since(startTime) - pct := float64(bytesProcessed) / float64(totalBytes) * 100 - byteRate := float64(bytesProcessed) / elapsed.Seconds() - fileRate := float64(filesProcessed) / elapsed.Seconds() - - // Calculate ETA based on bytes (more accurate than files) - remainingBytes := totalBytes - bytesProcessed - var eta time.Duration - if byteRate > 0 { - eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second - } - - // Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s - fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s", - formatCompact(filesProcessed), - formatCompact(totalFiles), - humanize.Bytes(uint64(bytesProcessed)), - humanize.Bytes(uint64(totalBytes)), - pct, - humanize.Bytes(uint64(byteRate)), - fileRate, - elapsed.Round(time.Second)) - if eta > 0 { - fmt.Printf(", ETA: %s", eta.Round(time.Second)) - } - fmt.Println() + printProcessingProgress(filesProcessed, totalFiles, bytesProcessed, totalBytes, startTime) lastStatusTime = time.Now() } } + // Finalize: flush packer, pending files, and handle local blobs + return s.finalizeProcessPhase(ctx, result) +} + +// processFileWithErrorHandling wraps processFileStreaming with error recovery for +// deleted files and skip-errors mode. Returns (skipped, error). +func (s *Scanner) processFileWithErrorHandling(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) (bool, error) { + if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil { + // Handle files that were deleted between scan and process phases + if errors.Is(err, os.ErrNotExist) { + log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path) + result.FilesSkipped++ + return true, nil + } + // Skip file read errors if --skip-errors is enabled + if s.skipErrors { + log.Error("ERROR: Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err) + fmt.Printf("ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err) + result.FilesSkipped++ + return true, nil + } + return false, fmt.Errorf("processing file %s: %w", fileToProcess.Path, err) + } + return false, nil +} + +// printProcessingProgress prints a periodic progress line during the process phase, +// showing files processed, bytes transferred, throughput, and ETA +func printProcessingProgress(filesProcessed, totalFiles int, bytesProcessed, totalBytes int64, startTime time.Time) { + elapsed := time.Since(startTime) + pct := float64(bytesProcessed) / float64(totalBytes) * 100 + byteRate := float64(bytesProcessed) / elapsed.Seconds() + fileRate := float64(filesProcessed) / elapsed.Seconds() + + // Calculate ETA based on bytes (more accurate than files) + remainingBytes := totalBytes - bytesProcessed + var eta time.Duration + if byteRate > 0 { + eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second + } + + // Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s + fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s", + formatCompact(filesProcessed), + formatCompact(totalFiles), + humanize.Bytes(uint64(bytesProcessed)), + humanize.Bytes(uint64(totalBytes)), + pct, + humanize.Bytes(uint64(byteRate)), + fileRate, + elapsed.Round(time.Second)) + if eta > 0 { + fmt.Printf(", ETA: %s", eta.Round(time.Second)) + } + fmt.Println() +} + +// finalizeProcessPhase flushes the packer, writes remaining pending files to the database, +// and handles local blob storage when no remote storage is configured +func (s *Scanner) finalizeProcessPhase(ctx context.Context, result *ScanResult) error { // Final packer flush first - this commits remaining chunks to DB // and handleBlobReady will flush files whose chunks are now committed s.packerMu.Lock()