Refactor: Create file records only after successful chunking
- Scan phase now only collects files to process, no DB writes - Unchanged files get snapshot_files associations via batch (no new records) - New/changed files get records created during processing after chunking - Reduces DB writes significantly (only changed files need new records) - Avoids orphaned file records if backup is interrupted mid-way
This commit is contained in:
parent
40fff09594
commit
24c5e8c5a6
@ -145,16 +145,25 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
// (builds existingFiles map during walk to avoid double traversal)
|
||||
log.Info("Phase 1/3: Scanning directory structure")
|
||||
existingFiles := make(map[string]struct{})
|
||||
filesToProcess, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles)
|
||||
scanResult, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan phase failed: %w", err)
|
||||
}
|
||||
filesToProcess := scanResult.FilesToProcess
|
||||
|
||||
// Phase 1b: Detect deleted files by comparing DB against scanned files
|
||||
if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil {
|
||||
return nil, fmt.Errorf("detecting deleted files: %w", err)
|
||||
}
|
||||
|
||||
// Phase 1c: Associate unchanged files with this snapshot (no new records needed)
|
||||
if len(scanResult.UnchangedFileIDs) > 0 {
|
||||
fmt.Printf("Associating %s unchanged files with snapshot...\n", formatNumber(len(scanResult.UnchangedFileIDs)))
|
||||
if err := s.batchAddFilesToSnapshot(ctx, scanResult.UnchangedFileIDs); err != nil {
|
||||
return nil, fmt.Errorf("associating unchanged files: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate total size to process
|
||||
var totalSizeToProcess int64
|
||||
for _, file := range filesToProcess {
|
||||
@ -234,15 +243,22 @@ func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ScanPhaseResult contains the results of the scan phase
|
||||
type ScanPhaseResult struct {
|
||||
FilesToProcess []*FileToProcess
|
||||
UnchangedFileIDs []string // IDs of unchanged files to associate with snapshot
|
||||
}
|
||||
|
||||
// scanPhase performs the initial directory scan to identify files to process
|
||||
// It uses the pre-loaded knownFiles map for fast change detection without DB queries
|
||||
// It also populates existingFiles map for deletion detection
|
||||
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) ([]*FileToProcess, error) {
|
||||
// Returns files needing processing and IDs of unchanged files for snapshot association
|
||||
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) (*ScanPhaseResult, error) {
|
||||
// Use known file count as estimate for progress (accurate for subsequent backups)
|
||||
estimatedTotal := int64(len(knownFiles))
|
||||
|
||||
var filesToProcess []*FileToProcess
|
||||
var allFiles []*database.File // Collect all files for batch insert
|
||||
var unchangedFileIDs []string // Just IDs - no new records needed
|
||||
var mu sync.Mutex
|
||||
|
||||
// Set up periodic status output
|
||||
@ -277,13 +293,16 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles)
|
||||
|
||||
mu.Lock()
|
||||
allFiles = append(allFiles, file)
|
||||
if needsProcessing && info.Size() > 0 {
|
||||
if needsProcessing {
|
||||
// New or changed file - will create record after processing
|
||||
filesToProcess = append(filesToProcess, &FileToProcess{
|
||||
Path: filePath,
|
||||
FileInfo: info,
|
||||
File: file,
|
||||
})
|
||||
} else if file.ID != "" {
|
||||
// Unchanged file with existing ID - just need snapshot association
|
||||
unchangedFileIDs = append(unchangedFileIDs, file.ID)
|
||||
}
|
||||
filesScanned++
|
||||
changedCount := len(filesToProcess)
|
||||
@ -346,15 +365,10 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Batch insert all files and snapshot associations
|
||||
if len(allFiles) > 0 {
|
||||
fmt.Printf("Writing %s file records to database...\n", formatNumber(len(allFiles)))
|
||||
if err := s.batchInsertFiles(ctx, allFiles); err != nil {
|
||||
return nil, fmt.Errorf("batch inserting files: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return filesToProcess, nil
|
||||
return &ScanPhaseResult{
|
||||
FilesToProcess: filesToProcess,
|
||||
UnchangedFileIDs: unchangedFileIDs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// checkFileInMemory checks if a file needs processing using the in-memory map
|
||||
@ -406,15 +420,16 @@ func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles ma
|
||||
return file, false
|
||||
}
|
||||
|
||||
// batchInsertFiles inserts files and snapshot associations in batches
|
||||
func (s *Scanner) batchInsertFiles(ctx context.Context, files []*database.File) error {
|
||||
// batchAddFilesToSnapshot adds existing file IDs to the snapshot association table
|
||||
// This is used for unchanged files that already have records in the database
|
||||
func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []string) error {
|
||||
const batchSize = 1000
|
||||
|
||||
startTime := time.Now()
|
||||
lastStatusTime := time.Now()
|
||||
statusInterval := 5 * time.Second
|
||||
|
||||
for i := 0; i < len(files); i += batchSize {
|
||||
for i := 0; i < len(fileIDs); i += batchSize {
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -423,17 +438,14 @@ func (s *Scanner) batchInsertFiles(ctx context.Context, files []*database.File)
|
||||
}
|
||||
|
||||
end := i + batchSize
|
||||
if end > len(files) {
|
||||
end = len(files)
|
||||
if end > len(fileIDs) {
|
||||
end = len(fileIDs)
|
||||
}
|
||||
batch := files[i:end]
|
||||
batch := fileIDs[i:end]
|
||||
|
||||
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
for _, file := range batch {
|
||||
if err := s.repos.Files.Create(ctx, tx, file); err != nil {
|
||||
return fmt.Errorf("creating file %s: %w", file.Path, err)
|
||||
}
|
||||
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, file.ID); err != nil {
|
||||
for _, fileID := range batch {
|
||||
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, fileID); err != nil {
|
||||
return fmt.Errorf("adding file to snapshot: %w", err)
|
||||
}
|
||||
}
|
||||
@ -447,17 +459,17 @@ func (s *Scanner) batchInsertFiles(ctx context.Context, files []*database.File)
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
elapsed := time.Since(startTime)
|
||||
rate := float64(end) / elapsed.Seconds()
|
||||
pct := float64(end) / float64(len(files)) * 100
|
||||
fmt.Printf("Database write: %s/%s files (%.1f%%), %.0f files/sec\n",
|
||||
formatNumber(end), formatNumber(len(files)), pct, rate)
|
||||
pct := float64(end) / float64(len(fileIDs)) * 100
|
||||
fmt.Printf("Associating files: %s/%s (%.1f%%), %.0f files/sec\n",
|
||||
formatNumber(end), formatNumber(len(fileIDs)), pct, rate)
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
elapsed := time.Since(startTime)
|
||||
rate := float64(len(files)) / elapsed.Seconds()
|
||||
fmt.Printf("Database write complete: %s files in %s (%.0f files/sec)\n",
|
||||
formatNumber(len(files)), elapsed.Round(time.Second), rate)
|
||||
rate := float64(len(fileIDs)) / elapsed.Seconds()
|
||||
fmt.Printf("Associated %s unchanged files in %s (%.0f files/sec)\n",
|
||||
formatNumber(len(fileIDs)), elapsed.Round(time.Second), rate)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -818,9 +830,16 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
"file_hash", fileHash,
|
||||
"chunks", len(chunks))
|
||||
|
||||
// Store file-chunk associations and chunk-file mappings in database
|
||||
// Store file record, chunk associations, and snapshot association in database
|
||||
// This happens AFTER successful chunking to avoid orphaned records on interruption
|
||||
err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
// First, delete all existing file_chunks and chunk_files for this file
|
||||
// Create or update the file record
|
||||
// Files.Create uses INSERT OR REPLACE, so it handles both new and changed files
|
||||
if err := s.repos.Files.Create(txCtx, tx, fileToProcess.File); err != nil {
|
||||
return fmt.Errorf("creating file record: %w", err)
|
||||
}
|
||||
|
||||
// Delete any existing file_chunks and chunk_files for this file
|
||||
// This ensures old chunks are no longer associated when file content changes
|
||||
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil {
|
||||
return fmt.Errorf("deleting old file chunks: %w", err)
|
||||
@ -829,6 +848,11 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
return fmt.Errorf("deleting old chunk files: %w", err)
|
||||
}
|
||||
|
||||
// Update chunk associations with the file ID (now that we have it)
|
||||
for i := range chunks {
|
||||
chunks[i].fileChunk.FileID = fileToProcess.File.ID
|
||||
}
|
||||
|
||||
for _, ci := range chunks {
|
||||
// Create file-chunk mapping
|
||||
if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user