Batch transactions per blob for improved performance

Previously, each chunk and blob_chunk was inserted in a separate
transaction, leading to ~560k+ transactions for large backups.
This change batches all database operations per blob:

- Chunks are queued in packer.pendingChunks during file processing
- When blob finalizes, one transaction inserts all chunks, blob_chunks,
  and updates the blob record
- Scanner tracks pending chunk hashes to know which files can be flushed
- Files are flushed when all their chunks are committed to DB
- Database is consistent after each blob finalize

This reduces transaction count from O(chunks) to O(blobs), which for a
614k file / 44GB backup means ~50-100 transactions instead of ~560k.
This commit is contained in:
Jeffrey Paul 2025-12-23 19:07:26 +07:00
parent f2c120f026
commit 05286bed01
2 changed files with 218 additions and 67 deletions

View File

@ -47,6 +47,12 @@ type PackerConfig struct {
Fs afero.Fs // Filesystem for temporary files Fs afero.Fs // Filesystem for temporary files
} }
// PendingChunk represents a chunk waiting to be inserted into the database.
type PendingChunk struct {
Hash string
Size int64
}
// Packer accumulates chunks and packs them into blobs. // Packer accumulates chunks and packs them into blobs.
// It handles compression, encryption, and coordination with the database // It handles compression, encryption, and coordination with the database
// to track blob metadata. Packer is thread-safe. // to track blob metadata. Packer is thread-safe.
@ -64,6 +70,9 @@ type Packer struct {
// Current blob being packed // Current blob being packed
currentBlob *blobInProgress currentBlob *blobInProgress
finishedBlobs []*FinishedBlob // Only used if no handler provided finishedBlobs []*FinishedBlob // Only used if no handler provided
// Pending chunks to be inserted when blob finalizes
pendingChunks []PendingChunk
} }
// blobInProgress represents a blob being assembled // blobInProgress represents a blob being assembled
@ -114,8 +123,9 @@ type BlobChunkRef struct {
// BlobWithReader wraps a FinishedBlob with its data reader // BlobWithReader wraps a FinishedBlob with its data reader
type BlobWithReader struct { type BlobWithReader struct {
*FinishedBlob *FinishedBlob
Reader io.ReadSeeker Reader io.ReadSeeker
TempFile afero.File // Optional, only set for disk-based blobs TempFile afero.File // Optional, only set for disk-based blobs
InsertedChunkHashes []string // Chunk hashes that were inserted to DB with this blob
} }
// NewPacker creates a new blob packer that accumulates chunks into blobs. // NewPacker creates a new blob packer that accumulates chunks into blobs.
@ -152,6 +162,15 @@ func (p *Packer) SetBlobHandler(handler BlobHandler) {
p.blobHandler = handler p.blobHandler = handler
} }
// AddPendingChunk queues a chunk to be inserted into the database when the
// current blob is finalized. This batches chunk inserts to reduce transaction
// overhead. Thread-safe.
func (p *Packer) AddPendingChunk(hash string, size int64) {
p.mu.Lock()
defer p.mu.Unlock()
p.pendingChunks = append(p.pendingChunks, PendingChunk{Hash: hash, Size: size})
}
// AddChunk adds a chunk to the current blob being packed. // AddChunk adds a chunk to the current blob being packed.
// If adding the chunk would exceed MaxBlobSize, returns ErrBlobSizeLimitExceeded. // If adding the chunk would exceed MaxBlobSize, returns ErrBlobSizeLimitExceeded.
// In this case, the caller should finalize the current blob and retry. // In this case, the caller should finalize the current blob and retry.
@ -314,23 +333,9 @@ func (p *Packer) addChunkToCurrentBlob(chunk *ChunkRef) error {
p.currentBlob.chunks = append(p.currentBlob.chunks, chunkInfo) p.currentBlob.chunks = append(p.currentBlob.chunks, chunkInfo)
p.currentBlob.chunkSet[chunk.Hash] = true p.currentBlob.chunkSet[chunk.Hash] = true
// Store blob-chunk association in database immediately // Note: blob_chunk records are inserted in batch when blob is finalized
if p.repos != nil { // to reduce transaction overhead. The chunk info is already stored in
blobChunk := &database.BlobChunk{ // p.currentBlob.chunks for later insertion.
BlobID: p.currentBlob.id,
ChunkHash: chunk.Hash,
Offset: offset,
Length: chunkSize,
}
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
return p.repos.BlobChunks.Create(ctx, tx, blobChunk)
})
if err != nil {
log.Error("Failed to store blob-chunk association in database", "error", err,
"blob_id", p.currentBlob.id, "chunk_hash", chunk.Hash)
// Continue anyway - we can reconstruct this later if needed
}
}
// Update total size // Update total size
p.currentBlob.size += chunkSize p.currentBlob.size += chunkSize
@ -392,16 +397,49 @@ func (p *Packer) finalizeCurrentBlob() error {
}) })
} }
// Update blob record in database with hash and sizes // Get pending chunks (will be inserted to DB and reported to handler)
chunksToInsert := p.pendingChunks
p.pendingChunks = nil // Clear pending list
// Insert pending chunks, blob_chunks, and update blob in a single transaction
if p.repos != nil { if p.repos != nil {
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
// First insert all pending chunks (required for blob_chunks FK)
for _, chunk := range chunksToInsert {
dbChunk := &database.Chunk{
ChunkHash: chunk.Hash,
Size: chunk.Size,
}
if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil {
return fmt.Errorf("creating chunk: %w", err)
}
}
// Insert all blob_chunk records in batch
for _, chunk := range p.currentBlob.chunks {
blobChunk := &database.BlobChunk{
BlobID: p.currentBlob.id,
ChunkHash: chunk.Hash,
Offset: chunk.Offset,
Length: chunk.Size,
}
if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil {
return fmt.Errorf("creating blob_chunk: %w", err)
}
}
// Update blob record with final hash and sizes
return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash, return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash,
p.currentBlob.size, finalSize) p.currentBlob.size, finalSize)
}) })
if err != nil { if err != nil {
p.cleanupTempFile() p.cleanupTempFile()
return fmt.Errorf("updating blob record: %w", err) return fmt.Errorf("finalizing blob transaction: %w", err)
} }
log.Debug("Committed blob transaction",
"chunks_inserted", len(chunksToInsert),
"blob_chunks_inserted", len(p.currentBlob.chunks))
} }
// Create finished blob // Create finished blob
@ -424,6 +462,12 @@ func (p *Packer) finalizeCurrentBlob() error {
"ratio", fmt.Sprintf("%.2f", compressionRatio), "ratio", fmt.Sprintf("%.2f", compressionRatio),
"duration", time.Since(p.currentBlob.startTime)) "duration", time.Since(p.currentBlob.startTime))
// Collect inserted chunk hashes for the scanner to track
var insertedChunkHashes []string
for _, chunk := range chunksToInsert {
insertedChunkHashes = append(insertedChunkHashes, chunk.Hash)
}
// Call blob handler if set // Call blob handler if set
if p.blobHandler != nil { if p.blobHandler != nil {
// Reset file position for handler // Reset file position for handler
@ -434,9 +478,10 @@ func (p *Packer) finalizeCurrentBlob() error {
// Create a blob reader that includes the data stream // Create a blob reader that includes the data stream
blobWithReader := &BlobWithReader{ blobWithReader := &BlobWithReader{
FinishedBlob: finished, FinishedBlob: finished,
Reader: p.currentBlob.tempFile, Reader: p.currentBlob.tempFile,
TempFile: p.currentBlob.tempFile, TempFile: p.currentBlob.tempFile,
InsertedChunkHashes: insertedChunkHashes,
} }
if err := p.blobHandler(blobWithReader); err != nil { if err := p.blobHandler(blobWithReader); err != nil {

View File

@ -50,7 +50,13 @@ type Scanner struct {
knownChunks map[string]struct{} knownChunks map[string]struct{}
knownChunksMu sync.RWMutex knownChunksMu sync.RWMutex
// Pending chunk hashes - chunks that have been added to packer but not yet committed to DB
// When a blob finalizes, the committed chunks are removed from this set
pendingChunkHashes map[string]struct{}
pendingChunkHashesMu sync.Mutex
// Pending file data buffer for batch insertion // Pending file data buffer for batch insertion
// Files are flushed when all their chunks have been committed to DB
pendingFiles []pendingFileData pendingFiles []pendingFileData
pendingFilesMu sync.Mutex pendingFilesMu sync.Mutex
@ -61,11 +67,6 @@ type Scanner struct {
scanCtx context.Context scanCtx context.Context
} }
const (
// Batch size for file database operations
fileBatchSize = 100
)
// ScannerConfig contains configuration for the scanner // ScannerConfig contains configuration for the scanner
type ScannerConfig struct { type ScannerConfig struct {
FS afero.Fs FS afero.Fs
@ -120,15 +121,16 @@ func NewScanner(cfg ScannerConfig) *Scanner {
} }
return &Scanner{ return &Scanner{
fs: cfg.FS, fs: cfg.FS,
chunker: chunker.NewChunker(cfg.ChunkSize), chunker: chunker.NewChunker(cfg.ChunkSize),
packer: packer, packer: packer,
repos: cfg.Repositories, repos: cfg.Repositories,
storage: cfg.Storage, storage: cfg.Storage,
maxBlobSize: cfg.MaxBlobSize, maxBlobSize: cfg.MaxBlobSize,
compressionLevel: cfg.CompressionLevel, compressionLevel: cfg.CompressionLevel,
ageRecipient: strings.Join(cfg.AgeRecipients, ","), ageRecipient: strings.Join(cfg.AgeRecipients, ","),
progress: progress, progress: progress,
pendingChunkHashes: make(map[string]struct{}),
} }
} }
@ -303,17 +305,37 @@ func (s *Scanner) addKnownChunk(hash string) {
s.knownChunksMu.Unlock() s.knownChunksMu.Unlock()
} }
// addPendingFile adds a file to the pending buffer and flushes if needed // addPendingChunkHash marks a chunk as pending (not yet committed to DB)
func (s *Scanner) addPendingFile(ctx context.Context, data pendingFileData) error { func (s *Scanner) addPendingChunkHash(hash string) {
s.pendingChunkHashesMu.Lock()
s.pendingChunkHashes[hash] = struct{}{}
s.pendingChunkHashesMu.Unlock()
}
// removePendingChunkHashes removes committed chunk hashes from the pending set
func (s *Scanner) removePendingChunkHashes(hashes []string) {
s.pendingChunkHashesMu.Lock()
for _, hash := range hashes {
delete(s.pendingChunkHashes, hash)
}
s.pendingChunkHashesMu.Unlock()
}
// isChunkPending returns true if the chunk is still pending (not yet committed to DB)
func (s *Scanner) isChunkPending(hash string) bool {
s.pendingChunkHashesMu.Lock()
_, pending := s.pendingChunkHashes[hash]
s.pendingChunkHashesMu.Unlock()
return pending
}
// addPendingFile adds a file to the pending buffer
// Files are NOT auto-flushed here - they are flushed when their chunks are committed
// (in handleBlobReady after blob finalize)
func (s *Scanner) addPendingFile(_ context.Context, data pendingFileData) {
s.pendingFilesMu.Lock() s.pendingFilesMu.Lock()
s.pendingFiles = append(s.pendingFiles, data) s.pendingFiles = append(s.pendingFiles, data)
needsFlush := len(s.pendingFiles) >= fileBatchSize
s.pendingFilesMu.Unlock() s.pendingFilesMu.Unlock()
if needsFlush {
return s.flushPendingFiles(ctx)
}
return nil
} }
// flushPendingFiles writes all pending files to the database in a single transaction // flushPendingFiles writes all pending files to the database in a single transaction
@ -370,6 +392,80 @@ func (s *Scanner) flushAllPending(ctx context.Context) error {
return s.flushPendingFiles(ctx) return s.flushPendingFiles(ctx)
} }
// flushCompletedPendingFiles flushes only files whose chunks are all committed to DB
// Files with pending chunks are kept in the queue for later flushing
func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
s.pendingFilesMu.Lock()
// Separate files into complete (can flush) and incomplete (keep pending)
var canFlush []pendingFileData
var stillPending []pendingFileData
for _, data := range s.pendingFiles {
allChunksCommitted := true
for _, fc := range data.fileChunks {
if s.isChunkPending(fc.ChunkHash) {
allChunksCommitted = false
break
}
}
if allChunksCommitted {
canFlush = append(canFlush, data)
} else {
stillPending = append(stillPending, data)
}
}
s.pendingFiles = stillPending
s.pendingFilesMu.Unlock()
if len(canFlush) == 0 {
return nil
}
log.Debug("Flushing completed files after blob finalize",
"files_to_flush", len(canFlush),
"files_still_pending", len(stillPending))
// Flush the complete files
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
for _, data := range canFlush {
// Create or update the file record
if err := s.repos.Files.Create(txCtx, tx, data.file); err != nil {
return fmt.Errorf("creating file record: %w", err)
}
// Delete any existing file_chunks and chunk_files for this file
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
return fmt.Errorf("deleting old file chunks: %w", err)
}
if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
return fmt.Errorf("deleting old chunk files: %w", err)
}
// Create file-chunk mappings
for i := range data.fileChunks {
if err := s.repos.FileChunks.Create(txCtx, tx, &data.fileChunks[i]); err != nil {
return fmt.Errorf("creating file chunk: %w", err)
}
}
// Create chunk-file mappings
for i := range data.chunkFiles {
if err := s.repos.ChunkFiles.Create(txCtx, tx, &data.chunkFiles[i]); err != nil {
return fmt.Errorf("creating chunk file: %w", err)
}
}
// Add file to snapshot
if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID); err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
}
return nil
})
}
// ScanPhaseResult contains the results of the scan phase // ScanPhaseResult contains the results of the scan phase
type ScanPhaseResult struct { type ScanPhaseResult struct {
FilesToProcess []*FileToProcess FilesToProcess []*FileToProcess
@ -677,12 +773,8 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
} }
} }
// Flush any remaining pending chunks and files to database // Final packer flush first - this commits remaining chunks to DB
if err := s.flushAllPending(ctx); err != nil { // and handleBlobReady will flush files whose chunks are now committed
return fmt.Errorf("flushing pending database operations: %w", err)
}
// Final flush (outside any transaction)
s.packerMu.Lock() s.packerMu.Lock()
if err := s.packer.Flush(); err != nil { if err := s.packer.Flush(); err != nil {
s.packerMu.Unlock() s.packerMu.Unlock()
@ -690,6 +782,12 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
} }
s.packerMu.Unlock() s.packerMu.Unlock()
// Flush any remaining pending files (e.g., files with only pre-existing chunks
// that didn't trigger a blob finalize)
if err := s.flushAllPending(ctx); err != nil {
return fmt.Errorf("flushing remaining pending files: %w", err)
}
// If no storage configured, store any remaining blobs locally // If no storage configured, store any remaining blobs locally
if s.storage == nil { if s.storage == nil {
blobs := s.packer.GetFinishedBlobs() blobs := s.packer.GetFinishedBlobs()
@ -836,7 +934,20 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
} }
} }
return err if err != nil {
return err
}
// Chunks from this blob are now committed to DB - remove from pending set
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
// Flush files whose chunks are now all committed
// This maintains database consistency after each blob
if err := s.flushCompletedPendingFiles(dbCtx); err != nil {
return fmt.Errorf("flushing completed files: %w", err)
}
return nil
} }
// processFileStreaming processes a file by streaming chunks directly to the packer // processFileStreaming processes a file by streaming chunks directly to the packer
@ -876,21 +987,14 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
// Check if chunk already exists (fast in-memory lookup) // Check if chunk already exists (fast in-memory lookup)
chunkExists := s.chunkExists(chunk.Hash) chunkExists := s.chunkExists(chunk.Hash)
// Store chunk in database if new (must happen before packer.AddChunk // Queue new chunks for batch insert when blob finalizes
// because packer creates blob_chunk entries that reference chunks) // This dramatically reduces transaction overhead
if !chunkExists { if !chunkExists {
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { s.packer.AddPendingChunk(chunk.Hash, chunk.Size)
dbChunk := &database.Chunk{ // Add to in-memory cache immediately for fast duplicate detection
ChunkHash: chunk.Hash,
Size: chunk.Size,
}
return s.repos.Chunks.Create(txCtx, tx, dbChunk)
})
if err != nil {
return fmt.Errorf("creating chunk: %w", err)
}
// Add to in-memory cache for fast duplicate detection
s.addKnownChunk(chunk.Hash) s.addKnownChunk(chunk.Hash)
// Track as pending until blob finalizes and commits to DB
s.addPendingChunkHash(chunk.Hash)
} }
// Track file chunk association for later storage // Track file chunk association for later storage
@ -985,11 +1089,13 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
} }
// Queue file for batch insertion // Queue file for batch insertion
return s.addPendingFile(ctx, pendingFileData{ // Files will be flushed when their chunks are committed (after blob finalize)
s.addPendingFile(ctx, pendingFileData{
file: fileToProcess.File, file: fileToProcess.File,
fileChunks: fileChunks, fileChunks: fileChunks,
chunkFiles: chunkFiles, chunkFiles: chunkFiles,
}) })
return nil
} }
// GetProgress returns the progress reporter for this scanner // GetProgress returns the progress reporter for this scanner