package snapshot import ( "context" "database/sql" "fmt" "os" "strings" "sync" "time" "git.eeqj.de/sneak/vaultik/internal/blob" "git.eeqj.de/sneak/vaultik/internal/chunker" "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/log" "git.eeqj.de/sneak/vaultik/internal/storage" "github.com/dustin/go-humanize" "github.com/spf13/afero" ) // FileToProcess holds information about a file that needs processing type FileToProcess struct { Path string FileInfo os.FileInfo File *database.File } // Scanner scans directories and populates the database with file and chunk information type Scanner struct { fs afero.Fs chunker *chunker.Chunker packer *blob.Packer repos *database.Repositories storage storage.Storer maxBlobSize int64 compressionLevel int ageRecipient string snapshotID string // Current snapshot being processed progress *ProgressReporter // In-memory cache of known chunk hashes for fast existence checks knownChunks map[string]struct{} knownChunksMu sync.RWMutex // Mutex for coordinating blob creation packerMu sync.Mutex // Blocks chunk production during blob creation // Context for cancellation scanCtx context.Context } // ScannerConfig contains configuration for the scanner type ScannerConfig struct { FS afero.Fs ChunkSize int64 Repositories *database.Repositories Storage storage.Storer MaxBlobSize int64 CompressionLevel int AgeRecipients []string // Optional, empty means no encryption EnableProgress bool // Enable progress reporting } // ScanResult contains the results of a scan operation type ScanResult struct { FilesScanned int FilesSkipped int FilesDeleted int BytesScanned int64 BytesSkipped int64 BytesDeleted int64 ChunksCreated int BlobsCreated int StartTime time.Time EndTime time.Time } // NewScanner creates a new scanner instance func NewScanner(cfg ScannerConfig) *Scanner { // Create encryptor (required for blob packing) if len(cfg.AgeRecipients) == 0 { log.Error("No age recipients configured - encryption is required") return nil } // Create blob packer with encryption packerCfg := blob.PackerConfig{ MaxBlobSize: cfg.MaxBlobSize, CompressionLevel: cfg.CompressionLevel, Recipients: cfg.AgeRecipients, Repositories: cfg.Repositories, Fs: cfg.FS, } packer, err := blob.NewPacker(packerCfg) if err != nil { log.Error("Failed to create packer", "error", err) return nil } var progress *ProgressReporter if cfg.EnableProgress { progress = NewProgressReporter() } return &Scanner{ fs: cfg.FS, chunker: chunker.NewChunker(cfg.ChunkSize), packer: packer, repos: cfg.Repositories, storage: cfg.Storage, maxBlobSize: cfg.MaxBlobSize, compressionLevel: cfg.CompressionLevel, ageRecipient: strings.Join(cfg.AgeRecipients, ","), progress: progress, } } // Scan scans a directory and populates the database func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*ScanResult, error) { s.snapshotID = snapshotID s.scanCtx = ctx result := &ScanResult{ StartTime: time.Now().UTC(), } // Set blob handler for concurrent upload if s.storage != nil { log.Debug("Setting blob handler for storage uploads") s.packer.SetBlobHandler(s.handleBlobReady) } else { log.Debug("No storage configured, blobs will not be uploaded") } // Start progress reporting if enabled if s.progress != nil { s.progress.Start() defer s.progress.Stop() } // Phase 0: Load known files and chunks from database into memory for fast lookup fmt.Println("Loading known files from database...") knownFiles, err := s.loadKnownFiles(ctx, path) if err != nil { return nil, fmt.Errorf("loading known files: %w", err) } fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles))) fmt.Println("Loading known chunks from database...") if err := s.loadKnownChunks(ctx); err != nil { return nil, fmt.Errorf("loading known chunks: %w", err) } fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks))) // Phase 1: Scan directory, collect files to process, and track existing files // (builds existingFiles map during walk to avoid double traversal) log.Info("Phase 1/3: Scanning directory structure") existingFiles := make(map[string]struct{}) scanResult, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles) if err != nil { return nil, fmt.Errorf("scan phase failed: %w", err) } filesToProcess := scanResult.FilesToProcess // Phase 1b: Detect deleted files by comparing DB against scanned files if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil { return nil, fmt.Errorf("detecting deleted files: %w", err) } // Phase 1c: Associate unchanged files with this snapshot (no new records needed) if len(scanResult.UnchangedFileIDs) > 0 { fmt.Printf("Associating %s unchanged files with snapshot...\n", formatNumber(len(scanResult.UnchangedFileIDs))) if err := s.batchAddFilesToSnapshot(ctx, scanResult.UnchangedFileIDs); err != nil { return nil, fmt.Errorf("associating unchanged files: %w", err) } } // Calculate total size to process var totalSizeToProcess int64 for _, file := range filesToProcess { totalSizeToProcess += file.FileInfo.Size() } // Update progress with total size and file count if s.progress != nil { s.progress.SetTotalSize(totalSizeToProcess) s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess))) } log.Info("Phase 1 complete", "total_files", len(filesToProcess), "total_size", humanize.Bytes(uint64(totalSizeToProcess)), "files_skipped", result.FilesSkipped, "bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped))) // Print scan summary fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)", formatNumber(result.FilesScanned), humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)), formatNumber(len(filesToProcess)), humanize.Bytes(uint64(totalSizeToProcess))) if result.FilesDeleted > 0 { fmt.Printf(", %s deleted (%s)", formatNumber(result.FilesDeleted), humanize.Bytes(uint64(result.BytesDeleted))) } fmt.Println() // Phase 2: Process files and create chunks if len(filesToProcess) > 0 { fmt.Printf("Processing %s files...\n", formatNumber(len(filesToProcess))) log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)") if err := s.processPhase(ctx, filesToProcess, result); err != nil { return nil, fmt.Errorf("process phase failed: %w", err) } } else { fmt.Printf("No files need processing. Creating metadata-only snapshot.\n") log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)") } // Get final stats from packer blobs := s.packer.GetFinishedBlobs() result.BlobsCreated += len(blobs) // Query database for actual blob count created during this snapshot // The database is authoritative, especially for concurrent blob uploads // We count uploads rather than all snapshot_blobs to get only NEW blobs if s.snapshotID != "" { uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID) if err != nil { log.Warn("Failed to query upload count from database", "error", err) } else { result.BlobsCreated = int(uploadCount) } } result.EndTime = time.Now().UTC() return result, nil } // loadKnownFiles loads all known files from the database into a map for fast lookup // This avoids per-file database queries during the scan phase func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*database.File, error) { files, err := s.repos.Files.ListByPrefix(ctx, path) if err != nil { return nil, fmt.Errorf("listing files by prefix: %w", err) } result := make(map[string]*database.File, len(files)) for _, f := range files { result[f.Path] = f } return result, nil } // loadKnownChunks loads all known chunk hashes from the database into a map for fast lookup // This avoids per-chunk database queries during file processing func (s *Scanner) loadKnownChunks(ctx context.Context) error { chunks, err := s.repos.Chunks.List(ctx) if err != nil { return fmt.Errorf("listing chunks: %w", err) } s.knownChunksMu.Lock() s.knownChunks = make(map[string]struct{}, len(chunks)) for _, c := range chunks { s.knownChunks[c.ChunkHash] = struct{}{} } s.knownChunksMu.Unlock() return nil } // chunkExists checks if a chunk hash exists in the in-memory cache func (s *Scanner) chunkExists(hash string) bool { s.knownChunksMu.RLock() _, exists := s.knownChunks[hash] s.knownChunksMu.RUnlock() return exists } // addKnownChunk adds a chunk hash to the in-memory cache func (s *Scanner) addKnownChunk(hash string) { s.knownChunksMu.Lock() s.knownChunks[hash] = struct{}{} s.knownChunksMu.Unlock() } // ScanPhaseResult contains the results of the scan phase type ScanPhaseResult struct { FilesToProcess []*FileToProcess UnchangedFileIDs []string // IDs of unchanged files to associate with snapshot } // scanPhase performs the initial directory scan to identify files to process // It uses the pre-loaded knownFiles map for fast change detection without DB queries // It also populates existingFiles map for deletion detection // Returns files needing processing and IDs of unchanged files for snapshot association func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) (*ScanPhaseResult, error) { // Use known file count as estimate for progress (accurate for subsequent backups) estimatedTotal := int64(len(knownFiles)) var filesToProcess []*FileToProcess var unchangedFileIDs []string // Just IDs - no new records needed var mu sync.Mutex // Set up periodic status output startTime := time.Now() lastStatusTime := time.Now() statusInterval := 15 * time.Second var filesScanned int64 log.Debug("Starting directory walk", "path", path) err := afero.Walk(s.fs, path, func(filePath string, info os.FileInfo, err error) error { if err != nil { log.Debug("Error accessing filesystem entry", "path", filePath, "error", err) return err } // Check context cancellation select { case <-ctx.Done(): return ctx.Err() default: } // Skip non-regular files for processing (but still count them) if !info.Mode().IsRegular() { return nil } // Track this file as existing (for deletion detection) existingFiles[filePath] = struct{}{} // Check file against in-memory map (no DB query!) file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles) mu.Lock() if needsProcessing { // New or changed file - will create record after processing filesToProcess = append(filesToProcess, &FileToProcess{ Path: filePath, FileInfo: info, File: file, }) } else if file.ID != "" { // Unchanged file with existing ID - just need snapshot association unchangedFileIDs = append(unchangedFileIDs, file.ID) } filesScanned++ changedCount := len(filesToProcess) mu.Unlock() // Update result stats if needsProcessing { result.BytesScanned += info.Size() } else { result.FilesSkipped++ result.BytesSkipped += info.Size() } result.FilesScanned++ // Output periodic status if time.Since(lastStatusTime) >= statusInterval { elapsed := time.Since(startTime) rate := float64(filesScanned) / elapsed.Seconds() // Build status line - use estimate if available (not first backup) if estimatedTotal > 0 { // Show actual scanned vs estimate (may exceed estimate if files were added) pct := float64(filesScanned) / float64(estimatedTotal) * 100 if pct > 100 { pct = 100 // Cap at 100% for display } remaining := estimatedTotal - filesScanned if remaining < 0 { remaining = 0 } var eta time.Duration if rate > 0 && remaining > 0 { eta = time.Duration(float64(remaining)/rate) * time.Second } fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed", formatNumber(int(filesScanned)), pct, formatNumber(changedCount), rate, elapsed.Round(time.Second)) if eta > 0 { fmt.Printf(", ETA %s", eta.Round(time.Second)) } fmt.Println() } else { // First backup - no estimate available fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n", formatNumber(int(filesScanned)), formatNumber(changedCount), rate, elapsed.Round(time.Second)) } lastStatusTime = time.Now() } return nil }) if err != nil { return nil, err } return &ScanPhaseResult{ FilesToProcess: filesToProcess, UnchangedFileIDs: unchangedFileIDs, }, nil } // checkFileInMemory checks if a file needs processing using the in-memory map // No database access is performed - this is purely CPU/memory work func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) { // Get file stats stat, ok := info.Sys().(interface { Uid() uint32 Gid() uint32 }) var uid, gid uint32 if ok { uid = stat.Uid() gid = stat.Gid() } // Create file record file := &database.File{ Path: path, MTime: info.ModTime(), CTime: info.ModTime(), // afero doesn't provide ctime Size: info.Size(), Mode: uint32(info.Mode()), UID: uid, GID: gid, } // Check against in-memory map existingFile, exists := knownFiles[path] if !exists { // New file return file, true } // Reuse existing ID file.ID = existingFile.ID // Check if file has changed if existingFile.Size != file.Size || existingFile.MTime.Unix() != file.MTime.Unix() || existingFile.Mode != file.Mode || existingFile.UID != file.UID || existingFile.GID != file.GID { return file, true } // File unchanged return file, false } // batchAddFilesToSnapshot adds existing file IDs to the snapshot association table // This is used for unchanged files that already have records in the database func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []string) error { const batchSize = 1000 startTime := time.Now() lastStatusTime := time.Now() statusInterval := 5 * time.Second for i := 0; i < len(fileIDs); i += batchSize { // Check context cancellation select { case <-ctx.Done(): return ctx.Err() default: } end := i + batchSize if end > len(fileIDs) { end = len(fileIDs) } batch := fileIDs[i:end] err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { for _, fileID := range batch { if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, fileID); err != nil { return fmt.Errorf("adding file to snapshot: %w", err) } } return nil }) if err != nil { return err } // Periodic status if time.Since(lastStatusTime) >= statusInterval { elapsed := time.Since(startTime) rate := float64(end) / elapsed.Seconds() pct := float64(end) / float64(len(fileIDs)) * 100 fmt.Printf("Associating files: %s/%s (%.1f%%), %.0f files/sec\n", formatNumber(end), formatNumber(len(fileIDs)), pct, rate) lastStatusTime = time.Now() } } elapsed := time.Since(startTime) rate := float64(len(fileIDs)) / elapsed.Seconds() fmt.Printf("Associated %s unchanged files in %s (%.0f files/sec)\n", formatNumber(len(fileIDs)), elapsed.Round(time.Second), rate) return nil } // processPhase processes the files that need backing up func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error { // Calculate total bytes to process var totalBytes int64 for _, f := range filesToProcess { totalBytes += f.FileInfo.Size() } // Set up periodic status output lastStatusTime := time.Now() statusInterval := 15 * time.Second startTime := time.Now() filesProcessed := 0 var bytesProcessed int64 totalFiles := len(filesToProcess) // Process each file for _, fileToProcess := range filesToProcess { // Update progress if s.progress != nil { s.progress.GetStats().CurrentFile.Store(fileToProcess.Path) } // Process file in streaming fashion if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil { return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err) } // Update files processed counter if s.progress != nil { s.progress.GetStats().FilesProcessed.Add(1) } filesProcessed++ bytesProcessed += fileToProcess.FileInfo.Size() // Output periodic status if time.Since(lastStatusTime) >= statusInterval { elapsed := time.Since(startTime) pct := float64(bytesProcessed) / float64(totalBytes) * 100 byteRate := float64(bytesProcessed) / elapsed.Seconds() fileRate := float64(filesProcessed) / elapsed.Seconds() // Calculate ETA based on bytes (more accurate than files) remainingBytes := totalBytes - bytesProcessed var eta time.Duration if byteRate > 0 { eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second } // Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s", formatCompact(filesProcessed), formatCompact(totalFiles), humanize.Bytes(uint64(bytesProcessed)), humanize.Bytes(uint64(totalBytes)), pct, humanize.Bytes(uint64(byteRate)), fileRate, elapsed.Round(time.Second)) if eta > 0 { fmt.Printf(", ETA: %s", eta.Round(time.Second)) } fmt.Println() lastStatusTime = time.Now() } } // Final flush (outside any transaction) s.packerMu.Lock() if err := s.packer.Flush(); err != nil { s.packerMu.Unlock() return fmt.Errorf("flushing packer: %w", err) } s.packerMu.Unlock() // If no storage configured, store any remaining blobs locally if s.storage == nil { blobs := s.packer.GetFinishedBlobs() for _, b := range blobs { // Blob metadata is already stored incrementally during packing // Just add the blob to the snapshot err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error { return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, b.ID, b.Hash) }) if err != nil { return fmt.Errorf("storing blob metadata: %w", err) } } result.BlobsCreated += len(blobs) } return nil } // handleBlobReady is called by the packer when a blob is finalized func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { startTime := time.Now().UTC() finishedBlob := blobWithReader.FinishedBlob // Report upload start if s.progress != nil { s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed) } // Upload to storage first (without holding any locks) // Use scan context for cancellation support ctx := s.scanCtx if ctx == nil { ctx = context.Background() } // Track bytes uploaded for accurate speed calculation lastProgressTime := time.Now() lastProgressBytes := int64(0) progressCallback := func(uploaded int64) error { // Calculate instantaneous speed now := time.Now() elapsed := now.Sub(lastProgressTime).Seconds() if elapsed > 0.5 { // Update speed every 0.5 seconds bytesSinceLastUpdate := uploaded - lastProgressBytes speed := float64(bytesSinceLastUpdate) / elapsed if s.progress != nil { s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed) } lastProgressTime = now lastProgressBytes = uploaded } // Check for cancellation select { case <-ctx.Done(): return ctx.Err() default: return nil } } // Create sharded path: blobs/ca/fe/cafebabe... blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash) if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err) } uploadDuration := time.Since(startTime) // Calculate upload speed uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds() // Print blob stored message fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n", finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed)), humanize.Bytes(uint64(uploadSpeedBps)), uploadDuration.Round(time.Millisecond)) // Log upload stats uploadSpeedBits := uploadSpeedBps * 8 // bits per second log.Info("Successfully uploaded blob to storage", "path", blobPath, "size", humanize.Bytes(uint64(finishedBlob.Compressed)), "duration", uploadDuration, "speed", humanize.SI(uploadSpeedBits, "bps")) // Report upload complete if s.progress != nil { s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration) } // Update progress if s.progress != nil { stats := s.progress.GetStats() stats.BlobsUploaded.Add(1) stats.BytesUploaded.Add(finishedBlob.Compressed) stats.BlobsCreated.Add(1) } // Store metadata in database (after upload is complete) dbCtx := s.scanCtx if dbCtx == nil { dbCtx = context.Background() } err := s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error { // Update blob upload timestamp if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil { return fmt.Errorf("updating blob upload timestamp: %w", err) } // Add the blob to the snapshot if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlob.ID, finishedBlob.Hash); err != nil { return fmt.Errorf("adding blob to snapshot: %w", err) } // Record upload metrics upload := &database.Upload{ BlobHash: finishedBlob.Hash, SnapshotID: s.snapshotID, UploadedAt: startTime, Size: finishedBlob.Compressed, DurationMs: uploadDuration.Milliseconds(), } if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil { return fmt.Errorf("recording upload metrics: %w", err) } return nil }) // Cleanup temp file if needed if blobWithReader.TempFile != nil { tempName := blobWithReader.TempFile.Name() if err := blobWithReader.TempFile.Close(); err != nil { log.Fatal("Failed to close temp file", "file", tempName, "error", err) } if err := s.fs.Remove(tempName); err != nil { log.Fatal("Failed to remove temp file", "file", tempName, "error", err) } } return err } // processFileStreaming processes a file by streaming chunks directly to the packer func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error { // Open the file file, err := s.fs.Open(fileToProcess.Path) if err != nil { return fmt.Errorf("opening file: %w", err) } defer func() { _ = file.Close() }() // We'll collect file chunks for database storage // but process them for packing as we go type chunkInfo struct { fileChunk database.FileChunk offset int64 size int64 } var chunks []chunkInfo chunkIndex := 0 // Process chunks in streaming fashion and get full file hash fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error { // Check for cancellation select { case <-ctx.Done(): return ctx.Err() default: } log.Debug("Processing content-defined chunk from file", "file", fileToProcess.Path, "chunk_index", chunkIndex, "hash", chunk.Hash, "size", chunk.Size) // Check if chunk already exists (fast in-memory lookup) chunkExists := s.chunkExists(chunk.Hash) // Store chunk if new if !chunkExists { err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { dbChunk := &database.Chunk{ ChunkHash: chunk.Hash, Size: chunk.Size, } if err := s.repos.Chunks.Create(txCtx, tx, dbChunk); err != nil { return fmt.Errorf("creating chunk: %w", err) } return nil }) if err != nil { return fmt.Errorf("storing chunk: %w", err) } // Add to in-memory cache for fast duplicate detection s.addKnownChunk(chunk.Hash) } // Track file chunk association for later storage chunks = append(chunks, chunkInfo{ fileChunk: database.FileChunk{ FileID: fileToProcess.File.ID, Idx: chunkIndex, ChunkHash: chunk.Hash, }, offset: chunk.Offset, size: chunk.Size, }) // Update stats if chunkExists { result.FilesSkipped++ // Track as skipped for now result.BytesSkipped += chunk.Size if s.progress != nil { s.progress.GetStats().BytesSkipped.Add(chunk.Size) } } else { result.ChunksCreated++ result.BytesScanned += chunk.Size if s.progress != nil { s.progress.GetStats().ChunksCreated.Add(1) s.progress.GetStats().BytesProcessed.Add(chunk.Size) s.progress.UpdateChunkingActivity() } } // Add chunk to packer immediately (streaming) // This happens outside the database transaction if !chunkExists { s.packerMu.Lock() err := s.packer.AddChunk(&blob.ChunkRef{ Hash: chunk.Hash, Data: chunk.Data, }) if err == blob.ErrBlobSizeLimitExceeded { // Finalize current blob and retry if err := s.packer.FinalizeBlob(); err != nil { s.packerMu.Unlock() return fmt.Errorf("finalizing blob: %w", err) } // Retry adding the chunk if err := s.packer.AddChunk(&blob.ChunkRef{ Hash: chunk.Hash, Data: chunk.Data, }); err != nil { s.packerMu.Unlock() return fmt.Errorf("adding chunk after finalize: %w", err) } } else if err != nil { s.packerMu.Unlock() return fmt.Errorf("adding chunk to packer: %w", err) } s.packerMu.Unlock() } // Clear chunk data from memory immediately after use chunk.Data = nil chunkIndex++ return nil }) if err != nil { return fmt.Errorf("chunking file: %w", err) } log.Debug("Completed snapshotting file", "path", fileToProcess.Path, "file_hash", fileHash, "chunks", len(chunks)) // Store file record, chunk associations, and snapshot association in database // This happens AFTER successful chunking to avoid orphaned records on interruption err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { // Create or update the file record // Files.Create uses INSERT OR REPLACE, so it handles both new and changed files if err := s.repos.Files.Create(txCtx, tx, fileToProcess.File); err != nil { return fmt.Errorf("creating file record: %w", err) } // Delete any existing file_chunks and chunk_files for this file // This ensures old chunks are no longer associated when file content changes if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil { return fmt.Errorf("deleting old file chunks: %w", err) } if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil { return fmt.Errorf("deleting old chunk files: %w", err) } // Update chunk associations with the file ID (now that we have it) for i := range chunks { chunks[i].fileChunk.FileID = fileToProcess.File.ID } for _, ci := range chunks { // Create file-chunk mapping if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil { return fmt.Errorf("creating file chunk: %w", err) } // Create chunk-file mapping chunkFile := &database.ChunkFile{ ChunkHash: ci.fileChunk.ChunkHash, FileID: fileToProcess.File.ID, FileOffset: ci.offset, Length: ci.size, } if err := s.repos.ChunkFiles.Create(txCtx, tx, chunkFile); err != nil { return fmt.Errorf("creating chunk file: %w", err) } } // Add file to snapshot if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, fileToProcess.File.ID); err != nil { return fmt.Errorf("adding file to snapshot: %w", err) } return nil }) return err } // GetProgress returns the progress reporter for this scanner func (s *Scanner) GetProgress() *ProgressReporter { return s.progress } // detectDeletedFilesFromMap finds files that existed in previous snapshots but no longer exist // Uses pre-loaded maps to avoid any filesystem or database access func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[string]*database.File, existingFiles map[string]struct{}, result *ScanResult) error { if len(knownFiles) == 0 { return nil } // Check each known file against the enumerated set (no filesystem access needed) for path, file := range knownFiles { // Check context cancellation periodically select { case <-ctx.Done(): return ctx.Err() default: } // Check if the file exists in our enumerated set if _, exists := existingFiles[path]; !exists { // File has been deleted result.FilesDeleted++ result.BytesDeleted += file.Size log.Debug("Detected deleted file", "path", path, "size", file.Size) } } if result.FilesDeleted > 0 { fmt.Printf("Found %s deleted files\n", formatNumber(result.FilesDeleted)) } return nil } // formatNumber formats a number with comma separators func formatNumber(n int) string { if n < 1000 { return fmt.Sprintf("%d", n) } return humanize.Comma(int64(n)) } // formatCompact formats a number compactly with k/M suffixes (e.g., 5.7k, 1.2M) func formatCompact(n int) string { if n < 1000 { return fmt.Sprintf("%d", n) } if n < 10000 { return fmt.Sprintf("%.1fk", float64(n)/1000) } if n < 1000000 { return fmt.Sprintf("%.0fk", float64(n)/1000) } return fmt.Sprintf("%.1fM", float64(n)/1000000) }