Cache chunk hashes in memory for faster small file processing
Load all known chunk hashes into an in-memory map at scan start, eliminating per-chunk database queries during file processing. This significantly improves performance when backing up many small files.
This commit is contained in:
parent
24c5e8c5a6
commit
899448e1da
@ -38,6 +38,10 @@ type Scanner struct {
|
|||||||
snapshotID string // Current snapshot being processed
|
snapshotID string // Current snapshot being processed
|
||||||
progress *ProgressReporter
|
progress *ProgressReporter
|
||||||
|
|
||||||
|
// In-memory cache of known chunk hashes for fast existence checks
|
||||||
|
knownChunks map[string]struct{}
|
||||||
|
knownChunksMu sync.RWMutex
|
||||||
|
|
||||||
// Mutex for coordinating blob creation
|
// Mutex for coordinating blob creation
|
||||||
packerMu sync.Mutex // Blocks chunk production during blob creation
|
packerMu sync.Mutex // Blocks chunk production during blob creation
|
||||||
|
|
||||||
@ -133,7 +137,7 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
|||||||
defer s.progress.Stop()
|
defer s.progress.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 0: Load known files from database into memory for fast lookup
|
// Phase 0: Load known files and chunks from database into memory for fast lookup
|
||||||
fmt.Println("Loading known files from database...")
|
fmt.Println("Loading known files from database...")
|
||||||
knownFiles, err := s.loadKnownFiles(ctx, path)
|
knownFiles, err := s.loadKnownFiles(ctx, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -141,6 +145,12 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
|||||||
}
|
}
|
||||||
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
|
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
|
||||||
|
|
||||||
|
fmt.Println("Loading known chunks from database...")
|
||||||
|
if err := s.loadKnownChunks(ctx); err != nil {
|
||||||
|
return nil, fmt.Errorf("loading known chunks: %w", err)
|
||||||
|
}
|
||||||
|
fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
|
||||||
|
|
||||||
// Phase 1: Scan directory, collect files to process, and track existing files
|
// Phase 1: Scan directory, collect files to process, and track existing files
|
||||||
// (builds existingFiles map during walk to avoid double traversal)
|
// (builds existingFiles map during walk to avoid double traversal)
|
||||||
log.Info("Phase 1/3: Scanning directory structure")
|
log.Info("Phase 1/3: Scanning directory structure")
|
||||||
@ -243,6 +253,39 @@ func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// loadKnownChunks loads all known chunk hashes from the database into a map for fast lookup
|
||||||
|
// This avoids per-chunk database queries during file processing
|
||||||
|
func (s *Scanner) loadKnownChunks(ctx context.Context) error {
|
||||||
|
chunks, err := s.repos.Chunks.List(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("listing chunks: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.knownChunksMu.Lock()
|
||||||
|
s.knownChunks = make(map[string]struct{}, len(chunks))
|
||||||
|
for _, c := range chunks {
|
||||||
|
s.knownChunks[c.ChunkHash] = struct{}{}
|
||||||
|
}
|
||||||
|
s.knownChunksMu.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// chunkExists checks if a chunk hash exists in the in-memory cache
|
||||||
|
func (s *Scanner) chunkExists(hash string) bool {
|
||||||
|
s.knownChunksMu.RLock()
|
||||||
|
_, exists := s.knownChunks[hash]
|
||||||
|
s.knownChunksMu.RUnlock()
|
||||||
|
return exists
|
||||||
|
}
|
||||||
|
|
||||||
|
// addKnownChunk adds a chunk hash to the in-memory cache
|
||||||
|
func (s *Scanner) addKnownChunk(hash string) {
|
||||||
|
s.knownChunksMu.Lock()
|
||||||
|
s.knownChunks[hash] = struct{}{}
|
||||||
|
s.knownChunksMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// ScanPhaseResult contains the results of the scan phase
|
// ScanPhaseResult contains the results of the scan phase
|
||||||
type ScanPhaseResult struct {
|
type ScanPhaseResult struct {
|
||||||
FilesToProcess []*FileToProcess
|
FilesToProcess []*FileToProcess
|
||||||
@ -733,12 +776,8 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
|||||||
"hash", chunk.Hash,
|
"hash", chunk.Hash,
|
||||||
"size", chunk.Size)
|
"size", chunk.Size)
|
||||||
|
|
||||||
// Check if chunk already exists (outside of transaction)
|
// Check if chunk already exists (fast in-memory lookup)
|
||||||
existing, err := s.repos.Chunks.GetByHash(ctx, chunk.Hash)
|
chunkExists := s.chunkExists(chunk.Hash)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("checking chunk existence: %w", err)
|
|
||||||
}
|
|
||||||
chunkExists := (existing != nil)
|
|
||||||
|
|
||||||
// Store chunk if new
|
// Store chunk if new
|
||||||
if !chunkExists {
|
if !chunkExists {
|
||||||
@ -755,6 +794,8 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("storing chunk: %w", err)
|
return fmt.Errorf("storing chunk: %w", err)
|
||||||
}
|
}
|
||||||
|
// Add to in-memory cache for fast duplicate detection
|
||||||
|
s.addKnownChunk(chunk.Hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track file chunk association for later storage
|
// Track file chunk association for later storage
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user