From 05286bed01b406b8341254b2edd810a206e9af4d Mon Sep 17 00:00:00 2001 From: sneak Date: Tue, 23 Dec 2025 19:07:26 +0700 Subject: [PATCH] Batch transactions per blob for improved performance Previously, each chunk and blob_chunk was inserted in a separate transaction, leading to ~560k+ transactions for large backups. This change batches all database operations per blob: - Chunks are queued in packer.pendingChunks during file processing - When blob finalizes, one transaction inserts all chunks, blob_chunks, and updates the blob record - Scanner tracks pending chunk hashes to know which files can be flushed - Files are flushed when all their chunks are committed to DB - Database is consistent after each blob finalize This reduces transaction count from O(chunks) to O(blobs), which for a 614k file / 44GB backup means ~50-100 transactions instead of ~560k. --- internal/blob/packer.go | 93 ++++++++++++----- internal/snapshot/scanner.go | 192 +++++++++++++++++++++++++++-------- 2 files changed, 218 insertions(+), 67 deletions(-) diff --git a/internal/blob/packer.go b/internal/blob/packer.go index b874210..1a09e0f 100644 --- a/internal/blob/packer.go +++ b/internal/blob/packer.go @@ -47,6 +47,12 @@ type PackerConfig struct { Fs afero.Fs // Filesystem for temporary files } +// PendingChunk represents a chunk waiting to be inserted into the database. +type PendingChunk struct { + Hash string + Size int64 +} + // Packer accumulates chunks and packs them into blobs. // It handles compression, encryption, and coordination with the database // to track blob metadata. Packer is thread-safe. @@ -64,6 +70,9 @@ type Packer struct { // Current blob being packed currentBlob *blobInProgress finishedBlobs []*FinishedBlob // Only used if no handler provided + + // Pending chunks to be inserted when blob finalizes + pendingChunks []PendingChunk } // blobInProgress represents a blob being assembled @@ -114,8 +123,9 @@ type BlobChunkRef struct { // BlobWithReader wraps a FinishedBlob with its data reader type BlobWithReader struct { *FinishedBlob - Reader io.ReadSeeker - TempFile afero.File // Optional, only set for disk-based blobs + Reader io.ReadSeeker + TempFile afero.File // Optional, only set for disk-based blobs + InsertedChunkHashes []string // Chunk hashes that were inserted to DB with this blob } // NewPacker creates a new blob packer that accumulates chunks into blobs. @@ -152,6 +162,15 @@ func (p *Packer) SetBlobHandler(handler BlobHandler) { p.blobHandler = handler } +// AddPendingChunk queues a chunk to be inserted into the database when the +// current blob is finalized. This batches chunk inserts to reduce transaction +// overhead. Thread-safe. +func (p *Packer) AddPendingChunk(hash string, size int64) { + p.mu.Lock() + defer p.mu.Unlock() + p.pendingChunks = append(p.pendingChunks, PendingChunk{Hash: hash, Size: size}) +} + // AddChunk adds a chunk to the current blob being packed. // If adding the chunk would exceed MaxBlobSize, returns ErrBlobSizeLimitExceeded. // In this case, the caller should finalize the current blob and retry. @@ -314,23 +333,9 @@ func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error { p.currentBlob.chunks = append(p.currentBlob.chunks, chunkInfo) p.currentBlob.chunkSet[chunk.Hash] = true - // Store blob-chunk association in database immediately - if p.repos != nil { - blobChunk := &database.BlobChunk{ - BlobID: p.currentBlob.id, - ChunkHash: chunk.Hash, - Offset: offset, - Length: chunkSize, - } - err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { - return p.repos.BlobChunks.Create(ctx, tx, blobChunk) - }) - if err != nil { - log.Error("Failed to store blob-chunk association in database", "error", err, - "blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash) - // Continue anyway - we can reconstruct this later if needed - } - } + // Note: blob_chunk records are inserted in batch when blob is finalized + // to reduce transaction overhead. The chunk info is already stored in + // p.currentBlob.chunks for later insertion. // Update total size p.currentBlob.size += chunkSize @@ -392,16 +397,49 @@ func (p *Packer) finalizeCurrentBlob() error { }) } - // Update blob record in database with hash and sizes + // Get pending chunks (will be inserted to DB and reported to handler) + chunksToInsert := p.pendingChunks + p.pendingChunks = nil // Clear pending list + + // Insert pending chunks, blob_chunks, and update blob in a single transaction if p.repos != nil { err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { + // First insert all pending chunks (required for blob_chunks FK) + for _, chunk := range chunksToInsert { + dbChunk := &database.Chunk{ + ChunkHash: chunk.Hash, + Size: chunk.Size, + } + if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil { + return fmt.Errorf("creating chunk: %w", err) + } + } + + // Insert all blob_chunk records in batch + for _, chunk := range p.currentBlob.chunks { + blobChunk := &database.BlobChunk{ + BlobID: p.currentBlob.id, + ChunkHash: chunk.Hash, + Offset: chunk.Offset, + Length: chunk.Size, + } + if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil { + return fmt.Errorf("creating blob_chunk: %w", err) + } + } + + // Update blob record with final hash and sizes return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash, p.currentBlob.size, finalSize) }) if err != nil { p.cleanupTempFile() - return fmt.Errorf("updating blob record: %w", err) + return fmt.Errorf("finalizing blob transaction: %w", err) } + + log.Debug("Committed blob transaction", + "chunks_inserted", len(chunksToInsert), + "blob_chunks_inserted", len(p.currentBlob.chunks)) } // Create finished blob @@ -424,6 +462,12 @@ func (p *Packer) finalizeCurrentBlob() error { "ratio", fmt.Sprintf("%.2f", compressionRatio), "duration", time.Since(p.currentBlob.startTime)) + // Collect inserted chunk hashes for the scanner to track + var insertedChunkHashes []string + for _, chunk := range chunksToInsert { + insertedChunkHashes = append(insertedChunkHashes, chunk.Hash) + } + // Call blob handler if set if p.blobHandler != nil { // Reset file position for handler @@ -434,9 +478,10 @@ func (p *Packer) finalizeCurrentBlob() error { // Create a blob reader that includes the data stream blobWithReader := &BlobWithReader{ - FinishedBlob: finished, - Reader: p.currentBlob.tempFile, - TempFile: p.currentBlob.tempFile, + FinishedBlob: finished, + Reader: p.currentBlob.tempFile, + TempFile: p.currentBlob.tempFile, + InsertedChunkHashes: insertedChunkHashes, } if err := p.blobHandler(blobWithReader); err != nil { diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index 0507f5e..242abf1 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -50,7 +50,13 @@ type Scanner struct { knownChunks map[string]struct{} knownChunksMu sync.RWMutex + // Pending chunk hashes - chunks that have been added to packer but not yet committed to DB + // When a blob finalizes, the committed chunks are removed from this set + pendingChunkHashes map[string]struct{} + pendingChunkHashesMu sync.Mutex + // Pending file data buffer for batch insertion + // Files are flushed when all their chunks have been committed to DB pendingFiles []pendingFileData pendingFilesMu sync.Mutex @@ -61,11 +67,6 @@ type Scanner struct { scanCtx context.Context } -const ( - // Batch size for file database operations - fileBatchSize = 100 -) - // ScannerConfig contains configuration for the scanner type ScannerConfig struct { FS afero.Fs @@ -120,15 +121,16 @@ func NewScanner(cfg ScannerConfig) *Scanner { } return &Scanner{ - fs: cfg.FS, - chunker: chunker.NewChunker(cfg.ChunkSize), - packer: packer, - repos: cfg.Repositories, - storage: cfg.Storage, - maxBlobSize: cfg.MaxBlobSize, - compressionLevel: cfg.CompressionLevel, - ageRecipient: strings.Join(cfg.AgeRecipients, ","), - progress: progress, + fs: cfg.FS, + chunker: chunker.NewChunker(cfg.ChunkSize), + packer: packer, + repos: cfg.Repositories, + storage: cfg.Storage, + maxBlobSize: cfg.MaxBlobSize, + compressionLevel: cfg.CompressionLevel, + ageRecipient: strings.Join(cfg.AgeRecipients, ","), + progress: progress, + pendingChunkHashes: make(map[string]struct{}), } } @@ -303,17 +305,37 @@ func (s *Scanner) addKnownChunk(hash string) { s.knownChunksMu.Unlock() } -// addPendingFile adds a file to the pending buffer and flushes if needed -func (s *Scanner) addPendingFile(ctx context.Context, data pendingFileData) error { +// addPendingChunkHash marks a chunk as pending (not yet committed to DB) +func (s *Scanner) addPendingChunkHash(hash string) { + s.pendingChunkHashesMu.Lock() + s.pendingChunkHashes[hash] = struct{}{} + s.pendingChunkHashesMu.Unlock() +} + +// removePendingChunkHashes removes committed chunk hashes from the pending set +func (s *Scanner) removePendingChunkHashes(hashes []string) { + s.pendingChunkHashesMu.Lock() + for _, hash := range hashes { + delete(s.pendingChunkHashes, hash) + } + s.pendingChunkHashesMu.Unlock() +} + +// isChunkPending returns true if the chunk is still pending (not yet committed to DB) +func (s *Scanner) isChunkPending(hash string) bool { + s.pendingChunkHashesMu.Lock() + _, pending := s.pendingChunkHashes[hash] + s.pendingChunkHashesMu.Unlock() + return pending +} + +// addPendingFile adds a file to the pending buffer +// Files are NOT auto-flushed here - they are flushed when their chunks are committed +// (in handleBlobReady after blob finalize) +func (s *Scanner) addPendingFile(_ context.Context, data pendingFileData) { s.pendingFilesMu.Lock() s.pendingFiles = append(s.pendingFiles, data) - needsFlush := len(s.pendingFiles) >= fileBatchSize s.pendingFilesMu.Unlock() - - if needsFlush { - return s.flushPendingFiles(ctx) - } - return nil } // flushPendingFiles writes all pending files to the database in a single transaction @@ -370,6 +392,80 @@ func (s *Scanner) flushAllPending(ctx context.Context) error { return s.flushPendingFiles(ctx) } +// flushCompletedPendingFiles flushes only files whose chunks are all committed to DB +// Files with pending chunks are kept in the queue for later flushing +func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error { + s.pendingFilesMu.Lock() + + // Separate files into complete (can flush) and incomplete (keep pending) + var canFlush []pendingFileData + var stillPending []pendingFileData + + for _, data := range s.pendingFiles { + allChunksCommitted := true + for _, fc := range data.fileChunks { + if s.isChunkPending(fc.ChunkHash) { + allChunksCommitted = false + break + } + } + if allChunksCommitted { + canFlush = append(canFlush, data) + } else { + stillPending = append(stillPending, data) + } + } + + s.pendingFiles = stillPending + s.pendingFilesMu.Unlock() + + if len(canFlush) == 0 { + return nil + } + + log.Debug("Flushing completed files after blob finalize", + "files_to_flush", len(canFlush), + "files_still_pending", len(stillPending)) + + // Flush the complete files + return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + for _, data := range canFlush { + // Create or update the file record + if err := s.repos.Files.Create(txCtx, tx, data.file); err != nil { + return fmt.Errorf("creating file record: %w", err) + } + + // Delete any existing file_chunks and chunk_files for this file + if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID); err != nil { + return fmt.Errorf("deleting old file chunks: %w", err) + } + if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID); err != nil { + return fmt.Errorf("deleting old chunk files: %w", err) + } + + // Create file-chunk mappings + for i := range data.fileChunks { + if err := s.repos.FileChunks.Create(txCtx, tx, &data.fileChunks[i]); err != nil { + return fmt.Errorf("creating file chunk: %w", err) + } + } + + // Create chunk-file mappings + for i := range data.chunkFiles { + if err := s.repos.ChunkFiles.Create(txCtx, tx, &data.chunkFiles[i]); err != nil { + return fmt.Errorf("creating chunk file: %w", err) + } + } + + // Add file to snapshot + if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID); err != nil { + return fmt.Errorf("adding file to snapshot: %w", err) + } + } + return nil + }) +} + // ScanPhaseResult contains the results of the scan phase type ScanPhaseResult struct { FilesToProcess []*FileToProcess @@ -677,12 +773,8 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc } } - // Flush any remaining pending chunks and files to database - if err := s.flushAllPending(ctx); err != nil { - return fmt.Errorf("flushing pending database operations: %w", err) - } - - // Final flush (outside any transaction) + // Final packer flush first - this commits remaining chunks to DB + // and handleBlobReady will flush files whose chunks are now committed s.packerMu.Lock() if err := s.packer.Flush(); err != nil { s.packerMu.Unlock() @@ -690,6 +782,12 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc } s.packerMu.Unlock() + // Flush any remaining pending files (e.g., files with only pre-existing chunks + // that didn't trigger a blob finalize) + if err := s.flushAllPending(ctx); err != nil { + return fmt.Errorf("flushing remaining pending files: %w", err) + } + // If no storage configured, store any remaining blobs locally if s.storage == nil { blobs := s.packer.GetFinishedBlobs() @@ -836,7 +934,20 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { } } - return err + if err != nil { + return err + } + + // Chunks from this blob are now committed to DB - remove from pending set + s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes) + + // Flush files whose chunks are now all committed + // This maintains database consistency after each blob + if err := s.flushCompletedPendingFiles(dbCtx); err != nil { + return fmt.Errorf("flushing completed files: %w", err) + } + + return nil } // processFileStreaming processes a file by streaming chunks directly to the packer @@ -876,21 +987,14 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT // Check if chunk already exists (fast in-memory lookup) chunkExists := s.chunkExists(chunk.Hash) - // Store chunk in database if new (must happen before packer.AddChunk - // because packer creates blob_chunk entries that reference chunks) + // Queue new chunks for batch insert when blob finalizes + // This dramatically reduces transaction overhead if !chunkExists { - err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { - dbChunk := &database.Chunk{ - ChunkHash: chunk.Hash, - Size: chunk.Size, - } - return s.repos.Chunks.Create(txCtx, tx, dbChunk) - }) - if err != nil { - return fmt.Errorf("creating chunk: %w", err) - } - // Add to in-memory cache for fast duplicate detection + s.packer.AddPendingChunk(chunk.Hash, chunk.Size) + // Add to in-memory cache immediately for fast duplicate detection s.addKnownChunk(chunk.Hash) + // Track as pending until blob finalizes and commits to DB + s.addPendingChunkHash(chunk.Hash) } // Track file chunk association for later storage @@ -985,11 +1089,13 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT } // Queue file for batch insertion - return s.addPendingFile(ctx, pendingFileData{ + // Files will be flushed when their chunks are committed (after blob finalize) + s.addPendingFile(ctx, pendingFileData{ file: fileToProcess.File, fileChunks: fileChunks, chunkFiles: chunkFiles, }) + return nil } // GetProgress returns the progress reporter for this scanner