diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index ba7b109..7022c7e 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -38,6 +38,10 @@ type Scanner struct { snapshotID string // Current snapshot being processed progress *ProgressReporter + // In-memory cache of known chunk hashes for fast existence checks + knownChunks map[string]struct{} + knownChunksMu sync.RWMutex + // Mutex for coordinating blob creation packerMu sync.Mutex // Blocks chunk production during blob creation @@ -133,7 +137,7 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc defer s.progress.Stop() } - // Phase 0: Load known files from database into memory for fast lookup + // Phase 0: Load known files and chunks from database into memory for fast lookup fmt.Println("Loading known files from database...") knownFiles, err := s.loadKnownFiles(ctx, path) if err != nil { @@ -141,6 +145,12 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc } fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles))) + fmt.Println("Loading known chunks from database...") + if err := s.loadKnownChunks(ctx); err != nil { + return nil, fmt.Errorf("loading known chunks: %w", err) + } + fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks))) + // Phase 1: Scan directory, collect files to process, and track existing files // (builds existingFiles map during walk to avoid double traversal) log.Info("Phase 1/3: Scanning directory structure") @@ -243,6 +253,39 @@ func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]* return result, nil } +// loadKnownChunks loads all known chunk hashes from the database into a map for fast lookup +// This avoids per-chunk database queries during file processing +func (s *Scanner) loadKnownChunks(ctx context.Context) error { + chunks, err := s.repos.Chunks.List(ctx) + if err != nil { + return fmt.Errorf("listing chunks: %w", err) + } + + s.knownChunksMu.Lock() + s.knownChunks = make(map[string]struct{}, len(chunks)) + for _, c := range chunks { + s.knownChunks[c.ChunkHash] = struct{}{} + } + s.knownChunksMu.Unlock() + + return nil +} + +// chunkExists checks if a chunk hash exists in the in-memory cache +func (s *Scanner) chunkExists(hash string) bool { + s.knownChunksMu.RLock() + _, exists := s.knownChunks[hash] + s.knownChunksMu.RUnlock() + return exists +} + +// addKnownChunk adds a chunk hash to the in-memory cache +func (s *Scanner) addKnownChunk(hash string) { + s.knownChunksMu.Lock() + s.knownChunks[hash] = struct{}{} + s.knownChunksMu.Unlock() +} + // ScanPhaseResult contains the results of the scan phase type ScanPhaseResult struct { FilesToProcess []*FileToProcess @@ -733,12 +776,8 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT "hash", chunk.Hash, "size", chunk.Size) - // Check if chunk already exists (outside of transaction) - existing, err := s.repos.Chunks.GetByHash(ctx, chunk.Hash) - if err != nil { - return fmt.Errorf("checking chunk existence: %w", err) - } - chunkExists := (existing != nil) + // Check if chunk already exists (fast in-memory lookup) + chunkExists := s.chunkExists(chunk.Hash) // Store chunk if new if !chunkExists { @@ -755,6 +794,8 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT if err != nil { return fmt.Errorf("storing chunk: %w", err) } + // Add to in-memory cache for fast duplicate detection + s.addKnownChunk(chunk.Hash) } // Track file chunk association for later storage