Optimize scan phase: in-memory change detection and batched DB writes

Performance improvements:
- Load all known files from DB into memory at startup
- Check file changes against in-memory map (no per-file DB queries)
- Batch database writes in groups of 1000 files per transaction
- Scan phase now only counts regular files, not directories

This should improve scan speed from ~600 files/sec to potentially
10,000+ files/sec by eliminating per-file database round trips.
This commit is contained in:
Jeffrey Paul 2025-12-19 12:08:47 +07:00
parent badc0c07e0
commit c3725e745e
3 changed files with 187 additions and 258 deletions

View File

@ -194,8 +194,8 @@ func TestMultipleFileChanges(t *testing.T) {
// First scan // First scan
result1, err := scanner.Scan(ctx, "/", snapshotID1) result1, err := scanner.Scan(ctx, "/", snapshotID1)
require.NoError(t, err) require.NoError(t, err)
// 4 files because root directory is also counted // Only regular files are counted, not directories
assert.Equal(t, 4, result1.FilesScanned) assert.Equal(t, 3, result1.FilesScanned)
// Modify two files // Modify two files
time.Sleep(10 * time.Millisecond) // Ensure mtime changes time.Sleep(10 * time.Millisecond) // Ensure mtime changes
@ -221,9 +221,8 @@ func TestMultipleFileChanges(t *testing.T) {
result2, err := scanner.Scan(ctx, "/", snapshotID2) result2, err := scanner.Scan(ctx, "/", snapshotID2)
require.NoError(t, err) require.NoError(t, err)
// The scanner might examine more items than just our files (includes directories, etc) // Only regular files are counted, not directories
// We should verify that at least our expected files were scanned assert.Equal(t, 3, result2.FilesScanned)
assert.GreaterOrEqual(t, result2.FilesScanned, 4, "Should scan at least 4 files (3 files + root dir)")
// Verify each file has exactly one set of chunks // Verify each file has exactly one set of chunks
for path := range files { for path := range files {

View File

@ -142,14 +142,22 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
} }
fmt.Printf("Found %s files\n", formatNumber(len(existingFiles))) fmt.Printf("Found %s files\n", formatNumber(len(existingFiles)))
// Phase 0b: Check for deleted files by comparing DB against enumerated set (no filesystem access) // Phase 0b: Load known files from database into memory for fast lookup
if err := s.detectDeletedFiles(ctx, path, existingFiles, result); err != nil { fmt.Println("Loading known files from database...")
knownFiles, err := s.loadKnownFiles(ctx, path)
if err != nil {
return nil, fmt.Errorf("loading known files: %w", err)
}
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
// Phase 0c: Check for deleted files by comparing DB against enumerated set (no filesystem access)
if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil {
return nil, fmt.Errorf("detecting deleted files: %w", err) return nil, fmt.Errorf("detecting deleted files: %w", err)
} }
// Phase 1: Scan directory and collect files to process // Phase 1: Scan directory and collect files to process
log.Info("Phase 1/3: Scanning directory structure") log.Info("Phase 1/3: Scanning directory structure")
filesToProcess, err := s.scanPhase(ctx, path, result, existingFiles) filesToProcess, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles)
if err != nil { if err != nil {
return nil, fmt.Errorf("scan phase failed: %w", err) return nil, fmt.Errorf("scan phase failed: %w", err)
} }
@ -277,11 +285,29 @@ func (s *Scanner) enumerateFiles(ctx context.Context, path string) (map[string]s
return files, nil return files, nil
} }
// loadKnownFiles loads all known files from the database into a map for fast lookup
// This avoids per-file database queries during the scan phase
func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*database.File, error) {
files, err := s.repos.Files.ListByPrefix(ctx, path)
if err != nil {
return nil, fmt.Errorf("listing files by prefix: %w", err)
}
result := make(map[string]*database.File, len(files))
for _, f := range files {
result[f.Path] = f
}
return result, nil
}
// scanPhase performs the initial directory scan to identify files to process // scanPhase performs the initial directory scan to identify files to process
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}) ([]*FileToProcess, error) { // It uses the pre-loaded knownFiles map for fast change detection without DB queries
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) ([]*FileToProcess, error) {
totalFiles := int64(len(existingFiles)) totalFiles := int64(len(existingFiles))
var filesToProcess []*FileToProcess var filesToProcess []*FileToProcess
var allFiles []*database.File // Collect all files for batch insert
var mu sync.Mutex var mu sync.Mutex
// Set up periodic status output // Set up periodic status output
@ -291,10 +317,9 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
var filesScanned int64 var filesScanned int64
log.Debug("Starting directory walk", "path", path) log.Debug("Starting directory walk", "path", path)
err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error { err := afero.Walk(s.fs, path, func(filePath string, info os.FileInfo, err error) error {
log.Debug("Scanning filesystem entry", "path", path)
if err != nil { if err != nil {
log.Debug("Error accessing filesystem entry", "path", path, "error", err) log.Debug("Error accessing filesystem entry", "path", filePath, "error", err)
return err return err
} }
@ -305,38 +330,38 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
default: default:
} }
// Check file and update metadata // Skip non-regular files for processing (but still count them)
file, needsProcessing, err := s.checkFileAndUpdateMetadata(ctx, path, info, result) if !info.Mode().IsRegular() {
if err != nil { return nil
// Don't log context cancellation as an error
if err == context.Canceled {
return err
}
return fmt.Errorf("failed to check %s: %w", path, err)
} }
// If file needs processing, add to list // Check file against in-memory map (no DB query!)
if needsProcessing && info.Mode().IsRegular() && info.Size() > 0 { file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles)
mu.Lock() mu.Lock()
allFiles = append(allFiles, file)
if needsProcessing && info.Size() > 0 {
filesToProcess = append(filesToProcess, &FileToProcess{ filesToProcess = append(filesToProcess, &FileToProcess{
Path: path, Path: filePath,
FileInfo: info, FileInfo: info,
File: file, File: file,
}) })
mu.Unlock()
} }
// Update scan statistics
if info.Mode().IsRegular() {
filesScanned++ filesScanned++
}
// Output periodic status
if time.Since(lastStatusTime) >= statusInterval {
mu.Lock()
changedCount := len(filesToProcess) changedCount := len(filesToProcess)
mu.Unlock() mu.Unlock()
// Update result stats
if needsProcessing {
result.BytesScanned += info.Size()
} else {
result.FilesSkipped++
result.BytesSkipped += info.Size()
}
result.FilesScanned++
// Output periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime) elapsed := time.Since(startTime)
rate := float64(filesScanned) / elapsed.Seconds() rate := float64(filesScanned) / elapsed.Seconds()
@ -376,9 +401,122 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
return nil, err return nil, err
} }
// Batch insert all files and snapshot associations
if len(allFiles) > 0 {
fmt.Printf("Writing %s file records to database...\n", formatNumber(len(allFiles)))
if err := s.batchInsertFiles(ctx, allFiles); err != nil {
return nil, fmt.Errorf("batch inserting files: %w", err)
}
}
return filesToProcess, nil return filesToProcess, nil
} }
// checkFileInMemory checks if a file needs processing using the in-memory map
// No database access is performed - this is purely CPU/memory work
func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) {
// Get file stats
stat, ok := info.Sys().(interface {
Uid() uint32
Gid() uint32
})
var uid, gid uint32
if ok {
uid = stat.Uid()
gid = stat.Gid()
}
// Create file record
file := &database.File{
Path: path,
MTime: info.ModTime(),
CTime: info.ModTime(), // afero doesn't provide ctime
Size: info.Size(),
Mode: uint32(info.Mode()),
UID: uid,
GID: gid,
}
// Check against in-memory map
existingFile, exists := knownFiles[path]
if !exists {
// New file
return file, true
}
// Reuse existing ID
file.ID = existingFile.ID
// Check if file has changed
if existingFile.Size != file.Size ||
existingFile.MTime.Unix() != file.MTime.Unix() ||
existingFile.Mode != file.Mode ||
existingFile.UID != file.UID ||
existingFile.GID != file.GID {
return file, true
}
// File unchanged
return file, false
}
// batchInsertFiles inserts files and snapshot associations in batches
func (s *Scanner) batchInsertFiles(ctx context.Context, files []*database.File) error {
const batchSize = 1000
startTime := time.Now()
lastStatusTime := time.Now()
statusInterval := 5 * time.Second
for i := 0; i < len(files); i += batchSize {
// Check context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
end := i + batchSize
if end > len(files) {
end = len(files)
}
batch := files[i:end]
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
for _, file := range batch {
if err := s.repos.Files.Create(ctx, tx, file); err != nil {
return fmt.Errorf("creating file %s: %w", file.Path, err)
}
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, file.ID); err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
}
return nil
})
if err != nil {
return err
}
// Periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime)
rate := float64(end) / elapsed.Seconds()
pct := float64(end) / float64(len(files)) * 100
fmt.Printf("Database write: %s/%s files (%.1f%%), %.0f files/sec\n",
formatNumber(end), formatNumber(len(files)), pct, rate)
lastStatusTime = time.Now()
}
}
elapsed := time.Since(startTime)
rate := float64(len(files)) / elapsed.Seconds()
fmt.Printf("Database write complete: %s files in %s (%.0f files/sec)\n",
formatNumber(len(files)), elapsed.Round(time.Second), rate)
return nil
}
// processPhase processes the files that need backing up // processPhase processes the files that need backing up
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error { func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
// Set up periodic status output // Set up periodic status output
@ -452,205 +590,6 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
return nil return nil
} }
// checkFileAndUpdateMetadata checks if a file needs processing and updates metadata
func (s *Scanner) checkFileAndUpdateMetadata(ctx context.Context, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) {
// Check context cancellation
select {
case <-ctx.Done():
return nil, false, ctx.Err()
default:
}
// Process file without holding a long transaction
return s.checkFile(ctx, path, info, result)
}
// checkFile checks if a file needs processing and updates metadata
func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) {
// Get file stats
stat, ok := info.Sys().(interface {
Uid() uint32
Gid() uint32
})
var uid, gid uint32
if ok {
uid = stat.Uid()
gid = stat.Gid()
}
// Check if it's a symlink
var linkTarget string
if info.Mode()&os.ModeSymlink != 0 {
// Read the symlink target
if linker, ok := s.fs.(afero.LinkReader); ok {
linkTarget, _ = linker.ReadlinkIfPossible(path)
}
}
// Create file record
file := &database.File{
Path: path,
MTime: info.ModTime(),
CTime: info.ModTime(), // afero doesn't provide ctime
Size: info.Size(),
Mode: uint32(info.Mode()),
UID: uid,
GID: gid,
LinkTarget: linkTarget,
}
// Check if file has changed since last backup (no transaction needed for read)
log.Debug("Querying database for existing file record", "path", path)
existingFile, err := s.repos.Files.GetByPath(ctx, path)
if err != nil {
return nil, false, fmt.Errorf("checking existing file: %w", err)
}
fileChanged := existingFile == nil || s.hasFileChanged(existingFile, file)
// Update file metadata and add to snapshot in a single transaction
log.Debug("Updating file record in database and adding to snapshot", "path", path, "changed", fileChanged, "snapshot", s.snapshotID)
err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
// First create/update the file
if err := s.repos.Files.Create(ctx, tx, file); err != nil {
return fmt.Errorf("creating file: %w", err)
}
// Then add it to the snapshot using the file ID
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, file.ID); err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
return nil
})
if err != nil {
return nil, false, err
}
log.Debug("File record added to snapshot association", "path", path)
result.FilesScanned++
// Update progress
if s.progress != nil {
stats := s.progress.GetStats()
stats.FilesScanned.Add(1)
stats.CurrentFile.Store(path)
}
// Track skipped files
if info.Mode().IsRegular() && info.Size() > 0 && !fileChanged {
result.FilesSkipped++
result.BytesSkipped += info.Size()
if s.progress != nil {
stats := s.progress.GetStats()
stats.FilesSkipped.Add(1)
stats.BytesSkipped.Add(info.Size())
}
// File hasn't changed, but we still need to associate existing chunks with this snapshot
log.Debug("File content unchanged, reusing existing chunks and blobs", "path", path)
if err := s.associateExistingChunks(ctx, path); err != nil {
return nil, false, fmt.Errorf("associating existing chunks: %w", err)
}
log.Debug("Existing chunks and blobs associated with snapshot", "path", path)
} else {
// File changed or is not a regular file
result.BytesScanned += info.Size()
if s.progress != nil {
s.progress.GetStats().BytesScanned.Add(info.Size())
}
}
return file, fileChanged, nil
}
// hasFileChanged determines if a file has changed since last backup
func (s *Scanner) hasFileChanged(existingFile, newFile *database.File) bool {
// Check if any metadata has changed
if existingFile.Size != newFile.Size {
return true
}
if existingFile.MTime.Unix() != newFile.MTime.Unix() {
return true
}
if existingFile.Mode != newFile.Mode {
return true
}
if existingFile.UID != newFile.UID {
return true
}
if existingFile.GID != newFile.GID {
return true
}
if existingFile.LinkTarget != newFile.LinkTarget {
return true
}
return false
}
// associateExistingChunks links existing chunks from an unchanged file to the current snapshot
func (s *Scanner) associateExistingChunks(ctx context.Context, path string) error {
log.Debug("associateExistingChunks start", "path", path)
// Get existing file chunks (no transaction needed for read)
log.Debug("Querying database for file's chunk associations", "path", path)
fileChunks, err := s.repos.FileChunks.GetByFile(ctx, path)
if err != nil {
return fmt.Errorf("getting existing file chunks: %w", err)
}
log.Debug("Retrieved file chunk associations from database", "path", path, "count", len(fileChunks))
// Collect unique blob IDs that need to be added to snapshot
blobsToAdd := make(map[string]string) // blob ID -> blob hash
for i, fc := range fileChunks {
log.Debug("Looking up blob containing chunk", "path", path, "chunk_index", i, "chunk_hash", fc.ChunkHash)
// Find which blob contains this chunk (no transaction needed for read)
log.Debug("Querying database for blob containing chunk", "chunk_hash", fc.ChunkHash)
blobChunk, err := s.repos.BlobChunks.GetByChunkHash(ctx, fc.ChunkHash)
if err != nil {
return fmt.Errorf("finding blob for chunk %s: %w", fc.ChunkHash, err)
}
if blobChunk == nil {
log.Warn("Chunk record exists in database but not associated with any blob", "chunk", fc.ChunkHash, "file", path)
continue
}
log.Debug("Found blob record containing chunk", "chunk_hash", fc.ChunkHash, "blob_id", blobChunk.BlobID)
// Track blob ID for later processing
if _, exists := blobsToAdd[blobChunk.BlobID]; !exists {
blobsToAdd[blobChunk.BlobID] = "" // We'll get the hash later
}
}
// Now get blob hashes outside of transaction operations
for blobID := range blobsToAdd {
blob, err := s.repos.Blobs.GetByID(ctx, blobID)
if err != nil {
return fmt.Errorf("getting blob %s: %w", blobID, err)
}
if blob == nil {
log.Warn("Blob record missing from database", "blob_id", blobID)
delete(blobsToAdd, blobID)
continue
}
blobsToAdd[blobID] = blob.Hash
}
// Add blobs to snapshot using short transactions
for blobID, blobHash := range blobsToAdd {
log.Debug("Adding blob reference to snapshot association", "blob_id", blobID, "blob_hash", blobHash, "snapshot", s.snapshotID)
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, blobHash)
})
if err != nil {
return fmt.Errorf("adding existing blob to snapshot: %w", err)
}
log.Debug("Created snapshot-blob association in database", "blob_id", blobID)
}
log.Debug("associateExistingChunks complete", "path", path, "blobs_processed", len(blobsToAdd))
return nil
}
// handleBlobReady is called by the packer when a blob is finalized // handleBlobReady is called by the packer when a blob is finalized
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
startTime := time.Now().UTC() startTime := time.Now().UTC()
@ -947,24 +886,16 @@ func (s *Scanner) GetProgress() *ProgressReporter {
return s.progress return s.progress
} }
// detectDeletedFiles finds files that existed in previous snapshots but no longer exist // detectDeletedFilesFromMap finds files that existed in previous snapshots but no longer exist
// Uses the pre-enumerated existingFiles set to avoid additional filesystem access // Uses pre-loaded maps to avoid any filesystem or database access
func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, existingFiles map[string]struct{}, result *ScanResult) error { func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[string]*database.File, existingFiles map[string]struct{}, result *ScanResult) error {
// Get all files with this path prefix from the database
knownFiles, err := s.repos.Files.ListByPrefix(ctx, path)
if err != nil {
return fmt.Errorf("listing files by prefix: %w", err)
}
if len(knownFiles) == 0 { if len(knownFiles) == 0 {
return nil return nil
} }
fmt.Printf("Checking %s known files for deletions...\n", formatNumber(len(knownFiles)))
// Check each known file against the enumerated set (no filesystem access needed) // Check each known file against the enumerated set (no filesystem access needed)
for _, file := range knownFiles { for path, file := range knownFiles {
// Check context cancellation // Check context cancellation periodically
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -972,11 +903,11 @@ func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, existingF
} }
// Check if the file exists in our enumerated set // Check if the file exists in our enumerated set
if _, exists := existingFiles[file.Path]; !exists { if _, exists := existingFiles[path]; !exists {
// File has been deleted // File has been deleted
result.FilesDeleted++ result.FilesDeleted++
result.BytesDeleted += file.Size result.BytesDeleted += file.Size
log.Debug("Detected deleted file", "path", file.Path, "size", file.Size) log.Debug("Detected deleted file", "path", path, "size", file.Size)
} }
} }

View File

@ -99,26 +99,25 @@ func TestScannerSimpleDirectory(t *testing.T) {
t.Fatalf("scan failed: %v", err) t.Fatalf("scan failed: %v", err)
} }
// Verify results // Verify results - we only scan regular files, not directories
// We now scan 6 files + 3 directories (source, subdir, subdir2) = 9 entries if result.FilesScanned != 6 {
if result.FilesScanned != 9 { t.Errorf("expected 6 files scanned, got %d", result.FilesScanned)
t.Errorf("expected 9 entries scanned, got %d", result.FilesScanned)
} }
// Directories have their own sizes, so the total will be more than just file content // Total bytes should be the sum of all file contents
if result.BytesScanned < 97 { // At minimum we have 97 bytes of file content if result.BytesScanned < 97 { // At minimum we have 97 bytes of file content
t.Errorf("expected at least 97 bytes scanned, got %d", result.BytesScanned) t.Errorf("expected at least 97 bytes scanned, got %d", result.BytesScanned)
} }
// Verify files in database // Verify files in database - only regular files are stored
files, err := repos.Files.ListByPrefix(ctx, "/source") files, err := repos.Files.ListByPrefix(ctx, "/source")
if err != nil { if err != nil {
t.Fatalf("failed to list files: %v", err) t.Fatalf("failed to list files: %v", err)
} }
// We should have 6 files + 3 directories = 9 entries // We should have 6 files (directories are not stored)
if len(files) != 9 { if len(files) != 6 {
t.Errorf("expected 9 entries in database, got %d", len(files)) t.Errorf("expected 6 files in database, got %d", len(files))
} }
// Verify specific file // Verify specific file
@ -235,9 +234,9 @@ func TestScannerLargeFile(t *testing.T) {
t.Fatalf("scan failed: %v", err) t.Fatalf("scan failed: %v", err)
} }
// We scan 1 file + 1 directory = 2 entries // We scan only regular files, not directories
if result.FilesScanned != 2 { if result.FilesScanned != 1 {
t.Errorf("expected 2 entries scanned, got %d", result.FilesScanned) t.Errorf("expected 1 file scanned, got %d", result.FilesScanned)
} }
// The file size should be at least 1MB // The file size should be at least 1MB