diff --git a/internal/snapshot/file_change_test.go b/internal/snapshot/file_change_test.go index 57f3aee..2a7bf7f 100644 --- a/internal/snapshot/file_change_test.go +++ b/internal/snapshot/file_change_test.go @@ -194,8 +194,8 @@ func TestMultipleFileChanges(t *testing.T) { // First scan result1, err := scanner.Scan(ctx, "/", snapshotID1) require.NoError(t, err) - // 4 files because root directory is also counted - assert.Equal(t, 4, result1.FilesScanned) + // Only regular files are counted, not directories + assert.Equal(t, 3, result1.FilesScanned) // Modify two files time.Sleep(10 * time.Millisecond) // Ensure mtime changes @@ -221,9 +221,8 @@ func TestMultipleFileChanges(t *testing.T) { result2, err := scanner.Scan(ctx, "/", snapshotID2) require.NoError(t, err) - // The scanner might examine more items than just our files (includes directories, etc) - // We should verify that at least our expected files were scanned - assert.GreaterOrEqual(t, result2.FilesScanned, 4, "Should scan at least 4 files (3 files + root dir)") + // Only regular files are counted, not directories + assert.Equal(t, 3, result2.FilesScanned) // Verify each file has exactly one set of chunks for path := range files { diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index 007d2a2..a9d9916 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -142,14 +142,22 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc } fmt.Printf("Found %s files\n", formatNumber(len(existingFiles))) - // Phase 0b: Check for deleted files by comparing DB against enumerated set (no filesystem access) - if err := s.detectDeletedFiles(ctx, path, existingFiles, result); err != nil { + // Phase 0b: Load known files from database into memory for fast lookup + fmt.Println("Loading known files from database...") + knownFiles, err := s.loadKnownFiles(ctx, path) + if err != nil { + return nil, fmt.Errorf("loading known files: %w", err) + } + fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles))) + + // Phase 0c: Check for deleted files by comparing DB against enumerated set (no filesystem access) + if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil { return nil, fmt.Errorf("detecting deleted files: %w", err) } // Phase 1: Scan directory and collect files to process log.Info("Phase 1/3: Scanning directory structure") - filesToProcess, err := s.scanPhase(ctx, path, result, existingFiles) + filesToProcess, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles) if err != nil { return nil, fmt.Errorf("scan phase failed: %w", err) } @@ -277,11 +285,29 @@ func (s *Scanner) enumerateFiles(ctx context.Context, path string) (map[string]s return files, nil } +// loadKnownFiles loads all known files from the database into a map for fast lookup +// This avoids per-file database queries during the scan phase +func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*database.File, error) { + files, err := s.repos.Files.ListByPrefix(ctx, path) + if err != nil { + return nil, fmt.Errorf("listing files by prefix: %w", err) + } + + result := make(map[string]*database.File, len(files)) + for _, f := range files { + result[f.Path] = f + } + + return result, nil +} + // scanPhase performs the initial directory scan to identify files to process -func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}) ([]*FileToProcess, error) { +// It uses the pre-loaded knownFiles map for fast change detection without DB queries +func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) ([]*FileToProcess, error) { totalFiles := int64(len(existingFiles)) var filesToProcess []*FileToProcess + var allFiles []*database.File // Collect all files for batch insert var mu sync.Mutex // Set up periodic status output @@ -291,10 +317,9 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult var filesScanned int64 log.Debug("Starting directory walk", "path", path) - err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error { - log.Debug("Scanning filesystem entry", "path", path) + err := afero.Walk(s.fs, path, func(filePath string, info os.FileInfo, err error) error { if err != nil { - log.Debug("Error accessing filesystem entry", "path", path, "error", err) + log.Debug("Error accessing filesystem entry", "path", filePath, "error", err) return err } @@ -305,38 +330,38 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult default: } - // Check file and update metadata - file, needsProcessing, err := s.checkFileAndUpdateMetadata(ctx, path, info, result) - if err != nil { - // Don't log context cancellation as an error - if err == context.Canceled { - return err - } - return fmt.Errorf("failed to check %s: %w", path, err) + // Skip non-regular files for processing (but still count them) + if !info.Mode().IsRegular() { + return nil } - // If file needs processing, add to list - if needsProcessing && info.Mode().IsRegular() && info.Size() > 0 { - mu.Lock() + // Check file against in-memory map (no DB query!) + file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles) + + mu.Lock() + allFiles = append(allFiles, file) + if needsProcessing && info.Size() > 0 { filesToProcess = append(filesToProcess, &FileToProcess{ - Path: path, + Path: filePath, FileInfo: info, File: file, }) - mu.Unlock() } + filesScanned++ + changedCount := len(filesToProcess) + mu.Unlock() - // Update scan statistics - if info.Mode().IsRegular() { - filesScanned++ + // Update result stats + if needsProcessing { + result.BytesScanned += info.Size() + } else { + result.FilesSkipped++ + result.BytesSkipped += info.Size() } + result.FilesScanned++ // Output periodic status if time.Since(lastStatusTime) >= statusInterval { - mu.Lock() - changedCount := len(filesToProcess) - mu.Unlock() - elapsed := time.Since(startTime) rate := float64(filesScanned) / elapsed.Seconds() @@ -376,9 +401,122 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult return nil, err } + // Batch insert all files and snapshot associations + if len(allFiles) > 0 { + fmt.Printf("Writing %s file records to database...\n", formatNumber(len(allFiles))) + if err := s.batchInsertFiles(ctx, allFiles); err != nil { + return nil, fmt.Errorf("batch inserting files: %w", err) + } + } + return filesToProcess, nil } +// checkFileInMemory checks if a file needs processing using the in-memory map +// No database access is performed - this is purely CPU/memory work +func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) { + // Get file stats + stat, ok := info.Sys().(interface { + Uid() uint32 + Gid() uint32 + }) + + var uid, gid uint32 + if ok { + uid = stat.Uid() + gid = stat.Gid() + } + + // Create file record + file := &database.File{ + Path: path, + MTime: info.ModTime(), + CTime: info.ModTime(), // afero doesn't provide ctime + Size: info.Size(), + Mode: uint32(info.Mode()), + UID: uid, + GID: gid, + } + + // Check against in-memory map + existingFile, exists := knownFiles[path] + if !exists { + // New file + return file, true + } + + // Reuse existing ID + file.ID = existingFile.ID + + // Check if file has changed + if existingFile.Size != file.Size || + existingFile.MTime.Unix() != file.MTime.Unix() || + existingFile.Mode != file.Mode || + existingFile.UID != file.UID || + existingFile.GID != file.GID { + return file, true + } + + // File unchanged + return file, false +} + +// batchInsertFiles inserts files and snapshot associations in batches +func (s *Scanner) batchInsertFiles(ctx context.Context, files []*database.File) error { + const batchSize = 1000 + + startTime := time.Now() + lastStatusTime := time.Now() + statusInterval := 5 * time.Second + + for i := 0; i < len(files); i += batchSize { + // Check context cancellation + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + end := i + batchSize + if end > len(files) { + end = len(files) + } + batch := files[i:end] + + err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { + for _, file := range batch { + if err := s.repos.Files.Create(ctx, tx, file); err != nil { + return fmt.Errorf("creating file %s: %w", file.Path, err) + } + if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, file.ID); err != nil { + return fmt.Errorf("adding file to snapshot: %w", err) + } + } + return nil + }) + if err != nil { + return err + } + + // Periodic status + if time.Since(lastStatusTime) >= statusInterval { + elapsed := time.Since(startTime) + rate := float64(end) / elapsed.Seconds() + pct := float64(end) / float64(len(files)) * 100 + fmt.Printf("Database write: %s/%s files (%.1f%%), %.0f files/sec\n", + formatNumber(end), formatNumber(len(files)), pct, rate) + lastStatusTime = time.Now() + } + } + + elapsed := time.Since(startTime) + rate := float64(len(files)) / elapsed.Seconds() + fmt.Printf("Database write complete: %s files in %s (%.0f files/sec)\n", + formatNumber(len(files)), elapsed.Round(time.Second), rate) + + return nil +} + // processPhase processes the files that need backing up func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error { // Set up periodic status output @@ -452,205 +590,6 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc return nil } -// checkFileAndUpdateMetadata checks if a file needs processing and updates metadata -func (s *Scanner) checkFileAndUpdateMetadata(ctx context.Context, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) { - // Check context cancellation - select { - case <-ctx.Done(): - return nil, false, ctx.Err() - default: - } - - // Process file without holding a long transaction - return s.checkFile(ctx, path, info, result) -} - -// checkFile checks if a file needs processing and updates metadata -func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) { - // Get file stats - stat, ok := info.Sys().(interface { - Uid() uint32 - Gid() uint32 - }) - - var uid, gid uint32 - if ok { - uid = stat.Uid() - gid = stat.Gid() - } - - // Check if it's a symlink - var linkTarget string - if info.Mode()&os.ModeSymlink != 0 { - // Read the symlink target - if linker, ok := s.fs.(afero.LinkReader); ok { - linkTarget, _ = linker.ReadlinkIfPossible(path) - } - } - - // Create file record - file := &database.File{ - Path: path, - MTime: info.ModTime(), - CTime: info.ModTime(), // afero doesn't provide ctime - Size: info.Size(), - Mode: uint32(info.Mode()), - UID: uid, - GID: gid, - LinkTarget: linkTarget, - } - - // Check if file has changed since last backup (no transaction needed for read) - log.Debug("Querying database for existing file record", "path", path) - existingFile, err := s.repos.Files.GetByPath(ctx, path) - if err != nil { - return nil, false, fmt.Errorf("checking existing file: %w", err) - } - - fileChanged := existingFile == nil || s.hasFileChanged(existingFile, file) - - // Update file metadata and add to snapshot in a single transaction - log.Debug("Updating file record in database and adding to snapshot", "path", path, "changed", fileChanged, "snapshot", s.snapshotID) - err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { - // First create/update the file - if err := s.repos.Files.Create(ctx, tx, file); err != nil { - return fmt.Errorf("creating file: %w", err) - } - // Then add it to the snapshot using the file ID - if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, file.ID); err != nil { - return fmt.Errorf("adding file to snapshot: %w", err) - } - return nil - }) - if err != nil { - return nil, false, err - } - log.Debug("File record added to snapshot association", "path", path) - - result.FilesScanned++ - - // Update progress - if s.progress != nil { - stats := s.progress.GetStats() - stats.FilesScanned.Add(1) - stats.CurrentFile.Store(path) - } - - // Track skipped files - if info.Mode().IsRegular() && info.Size() > 0 && !fileChanged { - result.FilesSkipped++ - result.BytesSkipped += info.Size() - if s.progress != nil { - stats := s.progress.GetStats() - stats.FilesSkipped.Add(1) - stats.BytesSkipped.Add(info.Size()) - } - // File hasn't changed, but we still need to associate existing chunks with this snapshot - log.Debug("File content unchanged, reusing existing chunks and blobs", "path", path) - if err := s.associateExistingChunks(ctx, path); err != nil { - return nil, false, fmt.Errorf("associating existing chunks: %w", err) - } - log.Debug("Existing chunks and blobs associated with snapshot", "path", path) - } else { - // File changed or is not a regular file - result.BytesScanned += info.Size() - if s.progress != nil { - s.progress.GetStats().BytesScanned.Add(info.Size()) - } - } - - return file, fileChanged, nil -} - -// hasFileChanged determines if a file has changed since last backup -func (s *Scanner) hasFileChanged(existingFile, newFile *database.File) bool { - // Check if any metadata has changed - if existingFile.Size != newFile.Size { - return true - } - if existingFile.MTime.Unix() != newFile.MTime.Unix() { - return true - } - if existingFile.Mode != newFile.Mode { - return true - } - if existingFile.UID != newFile.UID { - return true - } - if existingFile.GID != newFile.GID { - return true - } - if existingFile.LinkTarget != newFile.LinkTarget { - return true - } - return false -} - -// associateExistingChunks links existing chunks from an unchanged file to the current snapshot -func (s *Scanner) associateExistingChunks(ctx context.Context, path string) error { - log.Debug("associateExistingChunks start", "path", path) - - // Get existing file chunks (no transaction needed for read) - log.Debug("Querying database for file's chunk associations", "path", path) - fileChunks, err := s.repos.FileChunks.GetByFile(ctx, path) - if err != nil { - return fmt.Errorf("getting existing file chunks: %w", err) - } - log.Debug("Retrieved file chunk associations from database", "path", path, "count", len(fileChunks)) - - // Collect unique blob IDs that need to be added to snapshot - blobsToAdd := make(map[string]string) // blob ID -> blob hash - for i, fc := range fileChunks { - log.Debug("Looking up blob containing chunk", "path", path, "chunk_index", i, "chunk_hash", fc.ChunkHash) - - // Find which blob contains this chunk (no transaction needed for read) - log.Debug("Querying database for blob containing chunk", "chunk_hash", fc.ChunkHash) - blobChunk, err := s.repos.BlobChunks.GetByChunkHash(ctx, fc.ChunkHash) - if err != nil { - return fmt.Errorf("finding blob for chunk %s: %w", fc.ChunkHash, err) - } - if blobChunk == nil { - log.Warn("Chunk record exists in database but not associated with any blob", "chunk", fc.ChunkHash, "file", path) - continue - } - log.Debug("Found blob record containing chunk", "chunk_hash", fc.ChunkHash, "blob_id", blobChunk.BlobID) - - // Track blob ID for later processing - if _, exists := blobsToAdd[blobChunk.BlobID]; !exists { - blobsToAdd[blobChunk.BlobID] = "" // We'll get the hash later - } - } - - // Now get blob hashes outside of transaction operations - for blobID := range blobsToAdd { - blob, err := s.repos.Blobs.GetByID(ctx, blobID) - if err != nil { - return fmt.Errorf("getting blob %s: %w", blobID, err) - } - if blob == nil { - log.Warn("Blob record missing from database", "blob_id", blobID) - delete(blobsToAdd, blobID) - continue - } - blobsToAdd[blobID] = blob.Hash - } - - // Add blobs to snapshot using short transactions - for blobID, blobHash := range blobsToAdd { - log.Debug("Adding blob reference to snapshot association", "blob_id", blobID, "blob_hash", blobHash, "snapshot", s.snapshotID) - err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { - return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, blobHash) - }) - if err != nil { - return fmt.Errorf("adding existing blob to snapshot: %w", err) - } - log.Debug("Created snapshot-blob association in database", "blob_id", blobID) - } - - log.Debug("associateExistingChunks complete", "path", path, "blobs_processed", len(blobsToAdd)) - return nil -} - // handleBlobReady is called by the packer when a blob is finalized func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { startTime := time.Now().UTC() @@ -947,24 +886,16 @@ func (s *Scanner) GetProgress() *ProgressReporter { return s.progress } -// detectDeletedFiles finds files that existed in previous snapshots but no longer exist -// Uses the pre-enumerated existingFiles set to avoid additional filesystem access -func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, existingFiles map[string]struct{}, result *ScanResult) error { - // Get all files with this path prefix from the database - knownFiles, err := s.repos.Files.ListByPrefix(ctx, path) - if err != nil { - return fmt.Errorf("listing files by prefix: %w", err) - } - +// detectDeletedFilesFromMap finds files that existed in previous snapshots but no longer exist +// Uses pre-loaded maps to avoid any filesystem or database access +func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[string]*database.File, existingFiles map[string]struct{}, result *ScanResult) error { if len(knownFiles) == 0 { return nil } - fmt.Printf("Checking %s known files for deletions...\n", formatNumber(len(knownFiles))) - // Check each known file against the enumerated set (no filesystem access needed) - for _, file := range knownFiles { - // Check context cancellation + for path, file := range knownFiles { + // Check context cancellation periodically select { case <-ctx.Done(): return ctx.Err() @@ -972,11 +903,11 @@ func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, existingF } // Check if the file exists in our enumerated set - if _, exists := existingFiles[file.Path]; !exists { + if _, exists := existingFiles[path]; !exists { // File has been deleted result.FilesDeleted++ result.BytesDeleted += file.Size - log.Debug("Detected deleted file", "path", file.Path, "size", file.Size) + log.Debug("Detected deleted file", "path", path, "size", file.Size) } } diff --git a/internal/snapshot/scanner_test.go b/internal/snapshot/scanner_test.go index be6e0aa..19dbc20 100644 --- a/internal/snapshot/scanner_test.go +++ b/internal/snapshot/scanner_test.go @@ -99,26 +99,25 @@ func TestScannerSimpleDirectory(t *testing.T) { t.Fatalf("scan failed: %v", err) } - // Verify results - // We now scan 6 files + 3 directories (source, subdir, subdir2) = 9 entries - if result.FilesScanned != 9 { - t.Errorf("expected 9 entries scanned, got %d", result.FilesScanned) + // Verify results - we only scan regular files, not directories + if result.FilesScanned != 6 { + t.Errorf("expected 6 files scanned, got %d", result.FilesScanned) } - // Directories have their own sizes, so the total will be more than just file content + // Total bytes should be the sum of all file contents if result.BytesScanned < 97 { // At minimum we have 97 bytes of file content t.Errorf("expected at least 97 bytes scanned, got %d", result.BytesScanned) } - // Verify files in database + // Verify files in database - only regular files are stored files, err := repos.Files.ListByPrefix(ctx, "/source") if err != nil { t.Fatalf("failed to list files: %v", err) } - // We should have 6 files + 3 directories = 9 entries - if len(files) != 9 { - t.Errorf("expected 9 entries in database, got %d", len(files)) + // We should have 6 files (directories are not stored) + if len(files) != 6 { + t.Errorf("expected 6 files in database, got %d", len(files)) } // Verify specific file @@ -235,9 +234,9 @@ func TestScannerLargeFile(t *testing.T) { t.Fatalf("scan failed: %v", err) } - // We scan 1 file + 1 directory = 2 entries - if result.FilesScanned != 2 { - t.Errorf("expected 2 entries scanned, got %d", result.FilesScanned) + // We scan only regular files, not directories + if result.FilesScanned != 1 { + t.Errorf("expected 1 file scanned, got %d", result.FilesScanned) } // The file size should be at least 1MB