refactor: break up scanner.go methods into smaller helpers

This commit is contained in:
user
2026-03-17 02:38:55 -07:00
parent e5b94bd917
commit 75c25280c3

View File

@@ -180,18 +180,10 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
}
// Phase 0: Load known files and chunks from database into memory for fast lookup
fmt.Println("Loading known files from database...")
knownFiles, err := s.loadKnownFiles(ctx, path)
knownFiles, err := s.loadDatabaseState(ctx, path)
if err != nil {
return nil, fmt.Errorf("loading known files: %w", err)
return nil, err
}
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
fmt.Println("Loading known chunks from database...")
if err := s.loadKnownChunks(ctx); err != nil {
return nil, fmt.Errorf("loading known chunks: %w", err)
}
fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
// Phase 1: Scan directory, collect files to process, and track existing files
// (builds existingFiles map during walk to avoid double traversal)
@@ -216,36 +208,8 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
}
}
// Calculate total size to process
var totalSizeToProcess int64
for _, file := range filesToProcess {
totalSizeToProcess += file.FileInfo.Size()
}
// Update progress with total size and file count
if s.progress != nil {
s.progress.SetTotalSize(totalSizeToProcess)
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
}
log.Info("Phase 1 complete",
"total_files", len(filesToProcess),
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
"files_skipped", result.FilesSkipped,
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
// Print scan summary
fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)",
formatNumber(result.FilesScanned),
humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)),
formatNumber(len(filesToProcess)),
humanize.Bytes(uint64(totalSizeToProcess)))
if result.FilesDeleted > 0 {
fmt.Printf(", %s deleted (%s)",
formatNumber(result.FilesDeleted),
humanize.Bytes(uint64(result.BytesDeleted)))
}
fmt.Println()
// Summarize scan phase results and update progress
s.summarizeScanPhase(result, filesToProcess)
// Phase 2: Process files and create chunks
if len(filesToProcess) > 0 {
@@ -259,7 +223,66 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)")
}
// Get final stats from packer
// Finalize result with blob statistics
s.finalizeScanResult(ctx, result)
return result, nil
}
// loadDatabaseState loads known files and chunks from the database into memory for fast lookup
// This avoids per-file and per-chunk database queries during the scan and process phases
func (s *Scanner) loadDatabaseState(ctx context.Context, path string) (map[string]*database.File, error) {
fmt.Println("Loading known files from database...")
knownFiles, err := s.loadKnownFiles(ctx, path)
if err != nil {
return nil, fmt.Errorf("loading known files: %w", err)
}
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
fmt.Println("Loading known chunks from database...")
if err := s.loadKnownChunks(ctx); err != nil {
return nil, fmt.Errorf("loading known chunks: %w", err)
}
fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
return knownFiles, nil
}
// summarizeScanPhase calculates total size to process, updates progress tracking,
// and prints the scan phase summary with file counts and sizes
func (s *Scanner) summarizeScanPhase(result *ScanResult, filesToProcess []*FileToProcess) {
var totalSizeToProcess int64
for _, file := range filesToProcess {
totalSizeToProcess += file.FileInfo.Size()
}
if s.progress != nil {
s.progress.SetTotalSize(totalSizeToProcess)
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
}
log.Info("Phase 1 complete",
"total_files", len(filesToProcess),
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
"files_skipped", result.FilesSkipped,
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)",
formatNumber(result.FilesScanned),
humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)),
formatNumber(len(filesToProcess)),
humanize.Bytes(uint64(totalSizeToProcess)))
if result.FilesDeleted > 0 {
fmt.Printf(", %s deleted (%s)",
formatNumber(result.FilesDeleted),
humanize.Bytes(uint64(result.BytesDeleted)))
}
fmt.Println()
}
// finalizeScanResult populates final blob statistics in the scan result
// by querying the packer and database for blob/upload counts
func (s *Scanner) finalizeScanResult(ctx context.Context, result *ScanResult) {
blobs := s.packer.GetFinishedBlobs()
result.BlobsCreated += len(blobs)
@@ -276,7 +299,6 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
}
result.EndTime = time.Now().UTC()
return result, nil
}
// loadKnownFiles loads all known files from the database into a map for fast lookup
@@ -424,12 +446,38 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
flushStart := time.Now()
log.Debug("flushCompletedPendingFiles: starting")
// Partition pending files into those ready to flush and those still waiting
canFlush, stillPendingCount := s.partitionPendingByChunkStatus()
if len(canFlush) == 0 {
log.Debug("flushCompletedPendingFiles: nothing to flush")
return nil
}
log.Debug("Flushing completed files after blob finalize",
"files_to_flush", len(canFlush),
"files_still_pending", stillPendingCount)
// Collect all data for batch operations
allFiles, allFileIDs, allFileChunks, allChunkFiles := s.collectBatchFlushData(canFlush)
// Execute the batch flush in a single transaction
log.Debug("flushCompletedPendingFiles: starting transaction")
txStart := time.Now()
err := s.executeBatchFileFlush(ctx, allFiles, allFileIDs, allFileChunks, allChunkFiles)
log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart))
log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart))
return err
}
// partitionPendingByChunkStatus separates pending files into those whose chunks
// are all committed to DB (ready to flush) and those still waiting on pending chunks.
// Updates s.pendingFiles to contain only the still-pending files.
func (s *Scanner) partitionPendingByChunkStatus() (canFlush []pendingFileData, stillPendingCount int) {
log.Debug("flushCompletedPendingFiles: acquiring pendingFilesMu lock")
s.pendingFilesMu.Lock()
log.Debug("flushCompletedPendingFiles: acquired lock", "pending_files", len(s.pendingFiles))
// Separate files into complete (can flush) and incomplete (keep pending)
var canFlush []pendingFileData
var stillPending []pendingFileData
log.Debug("flushCompletedPendingFiles: checking which files can flush")
@@ -454,18 +502,15 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
s.pendingFilesMu.Unlock()
log.Debug("flushCompletedPendingFiles: released lock")
if len(canFlush) == 0 {
log.Debug("flushCompletedPendingFiles: nothing to flush")
return nil
}
return canFlush, len(stillPending)
}
log.Debug("Flushing completed files after blob finalize",
"files_to_flush", len(canFlush),
"files_still_pending", len(stillPending))
// Collect all data for batch operations
// collectBatchFlushData aggregates file records, IDs, file-chunk mappings, and chunk-file
// mappings from the given pending file data for efficient batch database operations
func (s *Scanner) collectBatchFlushData(canFlush []pendingFileData) ([]*database.File, []types.FileID, []database.FileChunk, []database.ChunkFile) {
log.Debug("flushCompletedPendingFiles: collecting data for batch ops")
collectStart := time.Now()
var allFileChunks []database.FileChunk
var allChunkFiles []database.ChunkFile
var allFileIDs []types.FileID
@@ -477,16 +522,20 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
allFileIDs = append(allFileIDs, data.file.ID)
allFiles = append(allFiles, data.file)
}
log.Debug("flushCompletedPendingFiles: collected data",
"duration", time.Since(collectStart),
"file_chunks", len(allFileChunks),
"chunk_files", len(allChunkFiles),
"files", len(allFiles))
// Flush the complete files using batch operations
log.Debug("flushCompletedPendingFiles: starting transaction")
txStart := time.Now()
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
return allFiles, allFileIDs, allFileChunks, allChunkFiles
}
// executeBatchFileFlush writes all collected file data to the database in a single transaction,
// including deleting old mappings, creating file records, and adding snapshot associations
func (s *Scanner) executeBatchFileFlush(ctx context.Context, allFiles []*database.File, allFileIDs []types.FileID, allFileChunks []database.FileChunk, allChunkFiles []database.ChunkFile) error {
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
log.Debug("flushCompletedPendingFiles: inside transaction")
// Batch delete old file_chunks and chunk_files
@@ -539,9 +588,6 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
log.Debug("flushCompletedPendingFiles: transaction complete")
return nil
})
log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart))
log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart))
return err
}
// ScanPhaseResult contains the results of the scan phase
@@ -623,62 +669,11 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
mu.Unlock()
// Update result stats
if needsProcessing {
result.BytesScanned += info.Size()
if s.progress != nil {
s.progress.GetStats().BytesScanned.Add(info.Size())
}
} else {
result.FilesSkipped++
result.BytesSkipped += info.Size()
if s.progress != nil {
s.progress.GetStats().FilesSkipped.Add(1)
s.progress.GetStats().BytesSkipped.Add(info.Size())
}
}
result.FilesScanned++
if s.progress != nil {
s.progress.GetStats().FilesScanned.Add(1)
}
s.updateScanEntryStats(result, needsProcessing, info)
// Output periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime)
rate := float64(filesScanned) / elapsed.Seconds()
// Build status line - use estimate if available (not first backup)
if estimatedTotal > 0 {
// Show actual scanned vs estimate (may exceed estimate if files were added)
pct := float64(filesScanned) / float64(estimatedTotal) * 100
if pct > 100 {
pct = 100 // Cap at 100% for display
}
remaining := estimatedTotal - filesScanned
if remaining < 0 {
remaining = 0
}
var eta time.Duration
if rate > 0 && remaining > 0 {
eta = time.Duration(float64(remaining)/rate) * time.Second
}
fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
formatNumber(int(filesScanned)),
pct,
formatNumber(changedCount),
rate,
elapsed.Round(time.Second))
if eta > 0 {
fmt.Printf(", ETA %s", eta.Round(time.Second))
}
fmt.Println()
} else {
// First backup - no estimate available
fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
formatNumber(int(filesScanned)),
formatNumber(changedCount),
rate,
elapsed.Round(time.Second))
}
printScanProgressLine(filesScanned, changedCount, estimatedTotal, startTime)
lastStatusTime = time.Now()
}
@@ -695,6 +690,68 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
}, nil
}
// updateScanEntryStats updates the scan result and progress reporter statistics
// for a single scanned file entry based on whether it needs processing
func (s *Scanner) updateScanEntryStats(result *ScanResult, needsProcessing bool, info os.FileInfo) {
if needsProcessing {
result.BytesScanned += info.Size()
if s.progress != nil {
s.progress.GetStats().BytesScanned.Add(info.Size())
}
} else {
result.FilesSkipped++
result.BytesSkipped += info.Size()
if s.progress != nil {
s.progress.GetStats().FilesSkipped.Add(1)
s.progress.GetStats().BytesSkipped.Add(info.Size())
}
}
result.FilesScanned++
if s.progress != nil {
s.progress.GetStats().FilesScanned.Add(1)
}
}
// printScanProgressLine prints a periodic progress line during the scan phase,
// showing files scanned, percentage complete (if estimate available), and ETA
func printScanProgressLine(filesScanned int64, changedCount int, estimatedTotal int64, startTime time.Time) {
elapsed := time.Since(startTime)
rate := float64(filesScanned) / elapsed.Seconds()
if estimatedTotal > 0 {
// Show actual scanned vs estimate (may exceed estimate if files were added)
pct := float64(filesScanned) / float64(estimatedTotal) * 100
if pct > 100 {
pct = 100 // Cap at 100% for display
}
remaining := estimatedTotal - filesScanned
if remaining < 0 {
remaining = 0
}
var eta time.Duration
if rate > 0 && remaining > 0 {
eta = time.Duration(float64(remaining)/rate) * time.Second
}
fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
formatNumber(int(filesScanned)),
pct,
formatNumber(changedCount),
rate,
elapsed.Round(time.Second))
if eta > 0 {
fmt.Printf(", ETA %s", eta.Round(time.Second))
}
fmt.Println()
} else {
// First backup - no estimate available
fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
formatNumber(int(filesScanned)),
formatNumber(changedCount),
rate,
elapsed.Round(time.Second))
}
}
// checkFileInMemory checks if a file needs processing using the in-memory map
// No database access is performed - this is purely CPU/memory work
func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) {
@@ -830,22 +887,13 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
s.progress.GetStats().CurrentFile.Store(fileToProcess.Path)
}
// Process file in streaming fashion
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
// Handle files that were deleted between scan and process phases
if errors.Is(err, os.ErrNotExist) {
log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path)
result.FilesSkipped++
continue
}
// Skip file read errors if --skip-errors is enabled
if s.skipErrors {
log.Error("ERROR: Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err)
fmt.Printf("ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err)
result.FilesSkipped++
continue
}
return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
// Process file with error handling for deleted files and skip-errors mode
skipped, err := s.processFileWithErrorHandling(ctx, fileToProcess, result)
if err != nil {
return err
}
if skipped {
continue
}
// Update files processed counter
@@ -858,36 +906,71 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
// Output periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime)
pct := float64(bytesProcessed) / float64(totalBytes) * 100
byteRate := float64(bytesProcessed) / elapsed.Seconds()
fileRate := float64(filesProcessed) / elapsed.Seconds()
// Calculate ETA based on bytes (more accurate than files)
remainingBytes := totalBytes - bytesProcessed
var eta time.Duration
if byteRate > 0 {
eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second
}
// Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s
fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s",
formatCompact(filesProcessed),
formatCompact(totalFiles),
humanize.Bytes(uint64(bytesProcessed)),
humanize.Bytes(uint64(totalBytes)),
pct,
humanize.Bytes(uint64(byteRate)),
fileRate,
elapsed.Round(time.Second))
if eta > 0 {
fmt.Printf(", ETA: %s", eta.Round(time.Second))
}
fmt.Println()
printProcessingProgress(filesProcessed, totalFiles, bytesProcessed, totalBytes, startTime)
lastStatusTime = time.Now()
}
}
// Finalize: flush packer, pending files, and handle local blobs
return s.finalizeProcessPhase(ctx, result)
}
// processFileWithErrorHandling wraps processFileStreaming with error recovery for
// deleted files and skip-errors mode. Returns (skipped, error).
func (s *Scanner) processFileWithErrorHandling(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) (bool, error) {
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
// Handle files that were deleted between scan and process phases
if errors.Is(err, os.ErrNotExist) {
log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path)
result.FilesSkipped++
return true, nil
}
// Skip file read errors if --skip-errors is enabled
if s.skipErrors {
log.Error("ERROR: Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err)
fmt.Printf("ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err)
result.FilesSkipped++
return true, nil
}
return false, fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
}
return false, nil
}
// printProcessingProgress prints a periodic progress line during the process phase,
// showing files processed, bytes transferred, throughput, and ETA
func printProcessingProgress(filesProcessed, totalFiles int, bytesProcessed, totalBytes int64, startTime time.Time) {
elapsed := time.Since(startTime)
pct := float64(bytesProcessed) / float64(totalBytes) * 100
byteRate := float64(bytesProcessed) / elapsed.Seconds()
fileRate := float64(filesProcessed) / elapsed.Seconds()
// Calculate ETA based on bytes (more accurate than files)
remainingBytes := totalBytes - bytesProcessed
var eta time.Duration
if byteRate > 0 {
eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second
}
// Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s
fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s",
formatCompact(filesProcessed),
formatCompact(totalFiles),
humanize.Bytes(uint64(bytesProcessed)),
humanize.Bytes(uint64(totalBytes)),
pct,
humanize.Bytes(uint64(byteRate)),
fileRate,
elapsed.Round(time.Second))
if eta > 0 {
fmt.Printf(", ETA: %s", eta.Round(time.Second))
}
fmt.Println()
}
// finalizeProcessPhase flushes the packer, writes remaining pending files to the database,
// and handles local blob storage when no remote storage is configured
func (s *Scanner) finalizeProcessPhase(ctx context.Context, result *ScanResult) error {
// Final packer flush first - this commits remaining chunks to DB
// and handleBlobReady will flush files whose chunks are now committed
s.packerMu.Lock()