Refactor: break up oversized methods into smaller descriptive helpers (#41)
All checks were successful
check / check (push) Successful in 4m17s
All checks were successful
check / check (push) Successful in 4m17s
Closes #40 Per sneak's feedback on PR #37: methods were too long. This PR breaks all methods over 100-150 lines into smaller, descriptively named helper methods. ## Refactored methods (8 total) | Original | Lines | Helpers extracted | |---|---|---| | `createNamedSnapshot` | 214 | `resolveSnapshotPaths`, `scanAllDirectories`, `collectUploadStats`, `finalizeSnapshotMetadata`, `printSnapshotSummary`, `getSnapshotBlobSizes`, `formatUploadSpeed` | | `ListSnapshots` | 159 | `listRemoteSnapshotIDs`, `reconcileLocalWithRemote`, `buildSnapshotInfoList`, `printSnapshotTable` | | `PruneBlobs` | 170 | `collectReferencedBlobs`, `listUniqueSnapshotIDs`, `listAllRemoteBlobs`, `findUnreferencedBlobs`, `deleteUnreferencedBlobs` | | `RunDeepVerify` | 182 | `loadVerificationData`, `runVerificationSteps`, `deepVerifyFailure` | | `RemoteInfo` | 187 | `collectSnapshotMetadata`, `collectReferencedBlobsFromManifests`, `populateRemoteInfoResult`, `scanRemoteBlobStorage`, `printRemoteInfoTable` | | `handleBlobReady` | 173 | `uploadBlobIfNeeded`, `makeUploadProgressCallback`, `recordBlobMetadata`, `cleanupBlobTempFile` | | `processFileStreaming` | 146 | `updateChunkStats`, `addChunkToPacker`, `queueFileForBatchInsert` | | `finalizeCurrentBlob` | 167 | `closeBlobWriter`, `buildChunkRefs`, `commitBlobToDatabase`, `deliverFinishedBlob` | ## Verification - `go build ./...` ✅ - `make test` ✅ (all tests pass) - `golangci-lint run` ✅ (0 issues) - No behavioral changes, pure restructuring Co-authored-by: user <user@Mac.lan guest wan> Reviewed-on: #41 Co-authored-by: clawbot <clawbot@noreply.example.org> Co-committed-by: clawbot <clawbot@noreply.example.org>
This commit was merged in pull request #41.
This commit is contained in:
@@ -180,18 +180,10 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
}
|
||||
|
||||
// Phase 0: Load known files and chunks from database into memory for fast lookup
|
||||
fmt.Println("Loading known files from database...")
|
||||
knownFiles, err := s.loadKnownFiles(ctx, path)
|
||||
knownFiles, err := s.loadDatabaseState(ctx, path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("loading known files: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
|
||||
|
||||
fmt.Println("Loading known chunks from database...")
|
||||
if err := s.loadKnownChunks(ctx); err != nil {
|
||||
return nil, fmt.Errorf("loading known chunks: %w", err)
|
||||
}
|
||||
fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
|
||||
|
||||
// Phase 1: Scan directory, collect files to process, and track existing files
|
||||
// (builds existingFiles map during walk to avoid double traversal)
|
||||
@@ -216,36 +208,8 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate total size to process
|
||||
var totalSizeToProcess int64
|
||||
for _, file := range filesToProcess {
|
||||
totalSizeToProcess += file.FileInfo.Size()
|
||||
}
|
||||
|
||||
// Update progress with total size and file count
|
||||
if s.progress != nil {
|
||||
s.progress.SetTotalSize(totalSizeToProcess)
|
||||
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
|
||||
}
|
||||
|
||||
log.Info("Phase 1 complete",
|
||||
"total_files", len(filesToProcess),
|
||||
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
|
||||
"files_skipped", result.FilesSkipped,
|
||||
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
|
||||
|
||||
// Print scan summary
|
||||
fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)",
|
||||
formatNumber(result.FilesScanned),
|
||||
humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)),
|
||||
formatNumber(len(filesToProcess)),
|
||||
humanize.Bytes(uint64(totalSizeToProcess)))
|
||||
if result.FilesDeleted > 0 {
|
||||
fmt.Printf(", %s deleted (%s)",
|
||||
formatNumber(result.FilesDeleted),
|
||||
humanize.Bytes(uint64(result.BytesDeleted)))
|
||||
}
|
||||
fmt.Println()
|
||||
// Summarize scan phase results and update progress
|
||||
s.summarizeScanPhase(result, filesToProcess)
|
||||
|
||||
// Phase 2: Process files and create chunks
|
||||
if len(filesToProcess) > 0 {
|
||||
@@ -259,7 +223,66 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)")
|
||||
}
|
||||
|
||||
// Get final stats from packer
|
||||
// Finalize result with blob statistics
|
||||
s.finalizeScanResult(ctx, result)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// loadDatabaseState loads known files and chunks from the database into memory for fast lookup
|
||||
// This avoids per-file and per-chunk database queries during the scan and process phases
|
||||
func (s *Scanner) loadDatabaseState(ctx context.Context, path string) (map[string]*database.File, error) {
|
||||
fmt.Println("Loading known files from database...")
|
||||
knownFiles, err := s.loadKnownFiles(ctx, path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("loading known files: %w", err)
|
||||
}
|
||||
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
|
||||
|
||||
fmt.Println("Loading known chunks from database...")
|
||||
if err := s.loadKnownChunks(ctx); err != nil {
|
||||
return nil, fmt.Errorf("loading known chunks: %w", err)
|
||||
}
|
||||
fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
|
||||
|
||||
return knownFiles, nil
|
||||
}
|
||||
|
||||
// summarizeScanPhase calculates total size to process, updates progress tracking,
|
||||
// and prints the scan phase summary with file counts and sizes
|
||||
func (s *Scanner) summarizeScanPhase(result *ScanResult, filesToProcess []*FileToProcess) {
|
||||
var totalSizeToProcess int64
|
||||
for _, file := range filesToProcess {
|
||||
totalSizeToProcess += file.FileInfo.Size()
|
||||
}
|
||||
|
||||
if s.progress != nil {
|
||||
s.progress.SetTotalSize(totalSizeToProcess)
|
||||
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
|
||||
}
|
||||
|
||||
log.Info("Phase 1 complete",
|
||||
"total_files", len(filesToProcess),
|
||||
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
|
||||
"files_skipped", result.FilesSkipped,
|
||||
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
|
||||
|
||||
fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)",
|
||||
formatNumber(result.FilesScanned),
|
||||
humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)),
|
||||
formatNumber(len(filesToProcess)),
|
||||
humanize.Bytes(uint64(totalSizeToProcess)))
|
||||
if result.FilesDeleted > 0 {
|
||||
fmt.Printf(", %s deleted (%s)",
|
||||
formatNumber(result.FilesDeleted),
|
||||
humanize.Bytes(uint64(result.BytesDeleted)))
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
// finalizeScanResult populates final blob statistics in the scan result
|
||||
// by querying the packer and database for blob/upload counts
|
||||
func (s *Scanner) finalizeScanResult(ctx context.Context, result *ScanResult) {
|
||||
blobs := s.packer.GetFinishedBlobs()
|
||||
result.BlobsCreated += len(blobs)
|
||||
|
||||
@@ -276,7 +299,6 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
}
|
||||
|
||||
result.EndTime = time.Now().UTC()
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// loadKnownFiles loads all known files from the database into a map for fast lookup
|
||||
@@ -424,12 +446,38 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
|
||||
flushStart := time.Now()
|
||||
log.Debug("flushCompletedPendingFiles: starting")
|
||||
|
||||
// Partition pending files into those ready to flush and those still waiting
|
||||
canFlush, stillPendingCount := s.partitionPendingByChunkStatus()
|
||||
|
||||
if len(canFlush) == 0 {
|
||||
log.Debug("flushCompletedPendingFiles: nothing to flush")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug("Flushing completed files after blob finalize",
|
||||
"files_to_flush", len(canFlush),
|
||||
"files_still_pending", stillPendingCount)
|
||||
|
||||
// Collect all data for batch operations
|
||||
allFiles, allFileIDs, allFileChunks, allChunkFiles := s.collectBatchFlushData(canFlush)
|
||||
|
||||
// Execute the batch flush in a single transaction
|
||||
log.Debug("flushCompletedPendingFiles: starting transaction")
|
||||
txStart := time.Now()
|
||||
err := s.executeBatchFileFlush(ctx, allFiles, allFileIDs, allFileChunks, allChunkFiles)
|
||||
log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart))
|
||||
log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart))
|
||||
return err
|
||||
}
|
||||
|
||||
// partitionPendingByChunkStatus separates pending files into those whose chunks
|
||||
// are all committed to DB (ready to flush) and those still waiting on pending chunks.
|
||||
// Updates s.pendingFiles to contain only the still-pending files.
|
||||
func (s *Scanner) partitionPendingByChunkStatus() (canFlush []pendingFileData, stillPendingCount int) {
|
||||
log.Debug("flushCompletedPendingFiles: acquiring pendingFilesMu lock")
|
||||
s.pendingFilesMu.Lock()
|
||||
log.Debug("flushCompletedPendingFiles: acquired lock", "pending_files", len(s.pendingFiles))
|
||||
|
||||
// Separate files into complete (can flush) and incomplete (keep pending)
|
||||
var canFlush []pendingFileData
|
||||
var stillPending []pendingFileData
|
||||
|
||||
log.Debug("flushCompletedPendingFiles: checking which files can flush")
|
||||
@@ -454,18 +502,15 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
|
||||
s.pendingFilesMu.Unlock()
|
||||
log.Debug("flushCompletedPendingFiles: released lock")
|
||||
|
||||
if len(canFlush) == 0 {
|
||||
log.Debug("flushCompletedPendingFiles: nothing to flush")
|
||||
return nil
|
||||
}
|
||||
return canFlush, len(stillPending)
|
||||
}
|
||||
|
||||
log.Debug("Flushing completed files after blob finalize",
|
||||
"files_to_flush", len(canFlush),
|
||||
"files_still_pending", len(stillPending))
|
||||
|
||||
// Collect all data for batch operations
|
||||
// collectBatchFlushData aggregates file records, IDs, file-chunk mappings, and chunk-file
|
||||
// mappings from the given pending file data for efficient batch database operations
|
||||
func (s *Scanner) collectBatchFlushData(canFlush []pendingFileData) ([]*database.File, []types.FileID, []database.FileChunk, []database.ChunkFile) {
|
||||
log.Debug("flushCompletedPendingFiles: collecting data for batch ops")
|
||||
collectStart := time.Now()
|
||||
|
||||
var allFileChunks []database.FileChunk
|
||||
var allChunkFiles []database.ChunkFile
|
||||
var allFileIDs []types.FileID
|
||||
@@ -477,16 +522,20 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
|
||||
allFileIDs = append(allFileIDs, data.file.ID)
|
||||
allFiles = append(allFiles, data.file)
|
||||
}
|
||||
|
||||
log.Debug("flushCompletedPendingFiles: collected data",
|
||||
"duration", time.Since(collectStart),
|
||||
"file_chunks", len(allFileChunks),
|
||||
"chunk_files", len(allChunkFiles),
|
||||
"files", len(allFiles))
|
||||
|
||||
// Flush the complete files using batch operations
|
||||
log.Debug("flushCompletedPendingFiles: starting transaction")
|
||||
txStart := time.Now()
|
||||
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
return allFiles, allFileIDs, allFileChunks, allChunkFiles
|
||||
}
|
||||
|
||||
// executeBatchFileFlush writes all collected file data to the database in a single transaction,
|
||||
// including deleting old mappings, creating file records, and adding snapshot associations
|
||||
func (s *Scanner) executeBatchFileFlush(ctx context.Context, allFiles []*database.File, allFileIDs []types.FileID, allFileChunks []database.FileChunk, allChunkFiles []database.ChunkFile) error {
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
log.Debug("flushCompletedPendingFiles: inside transaction")
|
||||
|
||||
// Batch delete old file_chunks and chunk_files
|
||||
@@ -539,9 +588,6 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
|
||||
log.Debug("flushCompletedPendingFiles: transaction complete")
|
||||
return nil
|
||||
})
|
||||
log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart))
|
||||
log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart))
|
||||
return err
|
||||
}
|
||||
|
||||
// ScanPhaseResult contains the results of the scan phase
|
||||
@@ -623,62 +669,11 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
mu.Unlock()
|
||||
|
||||
// Update result stats
|
||||
if needsProcessing {
|
||||
result.BytesScanned += info.Size()
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesScanned.Add(info.Size())
|
||||
}
|
||||
} else {
|
||||
result.FilesSkipped++
|
||||
result.BytesSkipped += info.Size()
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().FilesSkipped.Add(1)
|
||||
s.progress.GetStats().BytesSkipped.Add(info.Size())
|
||||
}
|
||||
}
|
||||
result.FilesScanned++
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().FilesScanned.Add(1)
|
||||
}
|
||||
s.updateScanEntryStats(result, needsProcessing, info)
|
||||
|
||||
// Output periodic status
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
elapsed := time.Since(startTime)
|
||||
rate := float64(filesScanned) / elapsed.Seconds()
|
||||
|
||||
// Build status line - use estimate if available (not first backup)
|
||||
if estimatedTotal > 0 {
|
||||
// Show actual scanned vs estimate (may exceed estimate if files were added)
|
||||
pct := float64(filesScanned) / float64(estimatedTotal) * 100
|
||||
if pct > 100 {
|
||||
pct = 100 // Cap at 100% for display
|
||||
}
|
||||
remaining := estimatedTotal - filesScanned
|
||||
if remaining < 0 {
|
||||
remaining = 0
|
||||
}
|
||||
var eta time.Duration
|
||||
if rate > 0 && remaining > 0 {
|
||||
eta = time.Duration(float64(remaining)/rate) * time.Second
|
||||
}
|
||||
fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
|
||||
formatNumber(int(filesScanned)),
|
||||
pct,
|
||||
formatNumber(changedCount),
|
||||
rate,
|
||||
elapsed.Round(time.Second))
|
||||
if eta > 0 {
|
||||
fmt.Printf(", ETA %s", eta.Round(time.Second))
|
||||
}
|
||||
fmt.Println()
|
||||
} else {
|
||||
// First backup - no estimate available
|
||||
fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
|
||||
formatNumber(int(filesScanned)),
|
||||
formatNumber(changedCount),
|
||||
rate,
|
||||
elapsed.Round(time.Second))
|
||||
}
|
||||
printScanProgressLine(filesScanned, changedCount, estimatedTotal, startTime)
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
|
||||
@@ -695,6 +690,68 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
}, nil
|
||||
}
|
||||
|
||||
// updateScanEntryStats updates the scan result and progress reporter statistics
|
||||
// for a single scanned file entry based on whether it needs processing
|
||||
func (s *Scanner) updateScanEntryStats(result *ScanResult, needsProcessing bool, info os.FileInfo) {
|
||||
if needsProcessing {
|
||||
result.BytesScanned += info.Size()
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesScanned.Add(info.Size())
|
||||
}
|
||||
} else {
|
||||
result.FilesSkipped++
|
||||
result.BytesSkipped += info.Size()
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().FilesSkipped.Add(1)
|
||||
s.progress.GetStats().BytesSkipped.Add(info.Size())
|
||||
}
|
||||
}
|
||||
result.FilesScanned++
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().FilesScanned.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
// printScanProgressLine prints a periodic progress line during the scan phase,
|
||||
// showing files scanned, percentage complete (if estimate available), and ETA
|
||||
func printScanProgressLine(filesScanned int64, changedCount int, estimatedTotal int64, startTime time.Time) {
|
||||
elapsed := time.Since(startTime)
|
||||
rate := float64(filesScanned) / elapsed.Seconds()
|
||||
|
||||
if estimatedTotal > 0 {
|
||||
// Show actual scanned vs estimate (may exceed estimate if files were added)
|
||||
pct := float64(filesScanned) / float64(estimatedTotal) * 100
|
||||
if pct > 100 {
|
||||
pct = 100 // Cap at 100% for display
|
||||
}
|
||||
remaining := estimatedTotal - filesScanned
|
||||
if remaining < 0 {
|
||||
remaining = 0
|
||||
}
|
||||
var eta time.Duration
|
||||
if rate > 0 && remaining > 0 {
|
||||
eta = time.Duration(float64(remaining)/rate) * time.Second
|
||||
}
|
||||
fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
|
||||
formatNumber(int(filesScanned)),
|
||||
pct,
|
||||
formatNumber(changedCount),
|
||||
rate,
|
||||
elapsed.Round(time.Second))
|
||||
if eta > 0 {
|
||||
fmt.Printf(", ETA %s", eta.Round(time.Second))
|
||||
}
|
||||
fmt.Println()
|
||||
} else {
|
||||
// First backup - no estimate available
|
||||
fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
|
||||
formatNumber(int(filesScanned)),
|
||||
formatNumber(changedCount),
|
||||
rate,
|
||||
elapsed.Round(time.Second))
|
||||
}
|
||||
}
|
||||
|
||||
// checkFileInMemory checks if a file needs processing using the in-memory map
|
||||
// No database access is performed - this is purely CPU/memory work
|
||||
func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) {
|
||||
@@ -830,22 +887,13 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
|
||||
s.progress.GetStats().CurrentFile.Store(fileToProcess.Path)
|
||||
}
|
||||
|
||||
// Process file in streaming fashion
|
||||
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
|
||||
// Handle files that were deleted between scan and process phases
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path)
|
||||
result.FilesSkipped++
|
||||
continue
|
||||
}
|
||||
// Skip file read errors if --skip-errors is enabled
|
||||
if s.skipErrors {
|
||||
log.Error("ERROR: Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err)
|
||||
fmt.Printf("ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err)
|
||||
result.FilesSkipped++
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
|
||||
// Process file with error handling for deleted files and skip-errors mode
|
||||
skipped, err := s.processFileWithErrorHandling(ctx, fileToProcess, result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if skipped {
|
||||
continue
|
||||
}
|
||||
|
||||
// Update files processed counter
|
||||
@@ -858,36 +906,71 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
|
||||
|
||||
// Output periodic status
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
elapsed := time.Since(startTime)
|
||||
pct := float64(bytesProcessed) / float64(totalBytes) * 100
|
||||
byteRate := float64(bytesProcessed) / elapsed.Seconds()
|
||||
fileRate := float64(filesProcessed) / elapsed.Seconds()
|
||||
|
||||
// Calculate ETA based on bytes (more accurate than files)
|
||||
remainingBytes := totalBytes - bytesProcessed
|
||||
var eta time.Duration
|
||||
if byteRate > 0 {
|
||||
eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second
|
||||
}
|
||||
|
||||
// Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s
|
||||
fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s",
|
||||
formatCompact(filesProcessed),
|
||||
formatCompact(totalFiles),
|
||||
humanize.Bytes(uint64(bytesProcessed)),
|
||||
humanize.Bytes(uint64(totalBytes)),
|
||||
pct,
|
||||
humanize.Bytes(uint64(byteRate)),
|
||||
fileRate,
|
||||
elapsed.Round(time.Second))
|
||||
if eta > 0 {
|
||||
fmt.Printf(", ETA: %s", eta.Round(time.Second))
|
||||
}
|
||||
fmt.Println()
|
||||
printProcessingProgress(filesProcessed, totalFiles, bytesProcessed, totalBytes, startTime)
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize: flush packer, pending files, and handle local blobs
|
||||
return s.finalizeProcessPhase(ctx, result)
|
||||
}
|
||||
|
||||
// processFileWithErrorHandling wraps processFileStreaming with error recovery for
|
||||
// deleted files and skip-errors mode. Returns (skipped, error).
|
||||
func (s *Scanner) processFileWithErrorHandling(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) (bool, error) {
|
||||
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
|
||||
// Handle files that were deleted between scan and process phases
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path)
|
||||
result.FilesSkipped++
|
||||
return true, nil
|
||||
}
|
||||
// Skip file read errors if --skip-errors is enabled
|
||||
if s.skipErrors {
|
||||
log.Error("ERROR: Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err)
|
||||
fmt.Printf("ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err)
|
||||
result.FilesSkipped++
|
||||
return true, nil
|
||||
}
|
||||
return false, fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// printProcessingProgress prints a periodic progress line during the process phase,
|
||||
// showing files processed, bytes transferred, throughput, and ETA
|
||||
func printProcessingProgress(filesProcessed, totalFiles int, bytesProcessed, totalBytes int64, startTime time.Time) {
|
||||
elapsed := time.Since(startTime)
|
||||
pct := float64(bytesProcessed) / float64(totalBytes) * 100
|
||||
byteRate := float64(bytesProcessed) / elapsed.Seconds()
|
||||
fileRate := float64(filesProcessed) / elapsed.Seconds()
|
||||
|
||||
// Calculate ETA based on bytes (more accurate than files)
|
||||
remainingBytes := totalBytes - bytesProcessed
|
||||
var eta time.Duration
|
||||
if byteRate > 0 {
|
||||
eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second
|
||||
}
|
||||
|
||||
// Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s
|
||||
fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s",
|
||||
formatCompact(filesProcessed),
|
||||
formatCompact(totalFiles),
|
||||
humanize.Bytes(uint64(bytesProcessed)),
|
||||
humanize.Bytes(uint64(totalBytes)),
|
||||
pct,
|
||||
humanize.Bytes(uint64(byteRate)),
|
||||
fileRate,
|
||||
elapsed.Round(time.Second))
|
||||
if eta > 0 {
|
||||
fmt.Printf(", ETA: %s", eta.Round(time.Second))
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
// finalizeProcessPhase flushes the packer, writes remaining pending files to the database,
|
||||
// and handles local blob storage when no remote storage is configured
|
||||
func (s *Scanner) finalizeProcessPhase(ctx context.Context, result *ScanResult) error {
|
||||
// Final packer flush first - this commits remaining chunks to DB
|
||||
// and handleBlobReady will flush files whose chunks are now committed
|
||||
s.packerMu.Lock()
|
||||
@@ -931,40 +1014,103 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
startTime := time.Now().UTC()
|
||||
finishedBlob := blobWithReader.FinishedBlob
|
||||
|
||||
// Report upload start and increment blobs created
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
|
||||
s.progress.GetStats().BlobsCreated.Add(1)
|
||||
}
|
||||
|
||||
// Upload to storage first (without holding any locks)
|
||||
// Use scan context for cancellation support
|
||||
ctx := s.scanCtx
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// Track bytes uploaded for accurate speed calculation
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
||||
blobExists, err := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime)
|
||||
if err != nil {
|
||||
s.cleanupBlobTempFile(blobWithReader)
|
||||
return fmt.Errorf("uploading blob %s: %w", finishedBlob.Hash, err)
|
||||
}
|
||||
|
||||
if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil {
|
||||
s.cleanupBlobTempFile(blobWithReader)
|
||||
return err
|
||||
}
|
||||
|
||||
s.cleanupBlobTempFile(blobWithReader)
|
||||
|
||||
// Chunks from this blob are now committed to DB - remove from pending set
|
||||
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
|
||||
|
||||
// Flush files whose chunks are now all committed
|
||||
if err := s.flushCompletedPendingFiles(ctx); err != nil {
|
||||
return fmt.Errorf("flushing completed files: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed
|
||||
func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) (bool, error) {
|
||||
finishedBlob := blobWithReader.FinishedBlob
|
||||
|
||||
// Check if blob already exists (deduplication after restart)
|
||||
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
||||
log.Info("Blob already exists in storage, skipping upload",
|
||||
"hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
fmt.Printf("Blob exists: %s (%s, skipped upload)\n",
|
||||
finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob)
|
||||
|
||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err)
|
||||
return false, fmt.Errorf("uploading blob to storage: %w", err)
|
||||
}
|
||||
|
||||
uploadDuration := time.Since(startTime)
|
||||
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
||||
|
||||
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
humanize.Bytes(uint64(uploadSpeedBps)),
|
||||
uploadDuration.Round(time.Millisecond))
|
||||
|
||||
log.Info("Successfully uploaded blob to storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
"speed", humanize.SI(uploadSpeedBps*8, "bps"))
|
||||
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
||||
stats := s.progress.GetStats()
|
||||
stats.BlobsUploaded.Add(1)
|
||||
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// makeUploadProgressCallback creates a progress callback for blob uploads
|
||||
func (s *Scanner) makeUploadProgressCallback(ctx context.Context, finishedBlob *blob.FinishedBlob) func(int64) error {
|
||||
lastProgressTime := time.Now()
|
||||
lastProgressBytes := int64(0)
|
||||
|
||||
progressCallback := func(uploaded int64) error {
|
||||
// Calculate instantaneous speed
|
||||
return func(uploaded int64) error {
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(lastProgressTime).Seconds()
|
||||
if elapsed > 0.5 { // Update speed every 0.5 seconds
|
||||
if elapsed > 0.5 {
|
||||
bytesSinceLastUpdate := uploaded - lastProgressBytes
|
||||
speed := float64(bytesSinceLastUpdate) / elapsed
|
||||
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed)
|
||||
}
|
||||
|
||||
lastProgressTime = now
|
||||
lastProgressBytes = uploaded
|
||||
}
|
||||
|
||||
// Check for cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -972,87 +1118,26 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create sharded path: blobs/ca/fe/cafebabe...
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
||||
|
||||
// Check if blob already exists in remote storage (deduplication after restart)
|
||||
blobExists := false
|
||||
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
||||
blobExists = true
|
||||
log.Info("Blob already exists in storage, skipping upload",
|
||||
"hash", finishedBlob.Hash,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
fmt.Printf("Blob exists: %s (%s, skipped upload)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
}
|
||||
|
||||
if !blobExists {
|
||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err)
|
||||
}
|
||||
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
// Calculate upload speed
|
||||
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
||||
|
||||
// Print blob stored message
|
||||
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
humanize.Bytes(uint64(uploadSpeedBps)),
|
||||
uploadDuration.Round(time.Millisecond))
|
||||
|
||||
// Log upload stats
|
||||
uploadSpeedBits := uploadSpeedBps * 8 // bits per second
|
||||
log.Info("Successfully uploaded blob to storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
"speed", humanize.SI(uploadSpeedBits, "bps"))
|
||||
|
||||
// Report upload complete
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
||||
}
|
||||
|
||||
// Update progress after upload completes
|
||||
if s.progress != nil {
|
||||
stats := s.progress.GetStats()
|
||||
stats.BlobsUploaded.Add(1)
|
||||
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
||||
}
|
||||
}
|
||||
|
||||
// Store metadata in database (after upload is complete)
|
||||
dbCtx := s.scanCtx
|
||||
if dbCtx == nil {
|
||||
dbCtx = context.Background()
|
||||
}
|
||||
|
||||
// Parse blob ID for typed operations
|
||||
// recordBlobMetadata stores blob upload metadata in the database
|
||||
func (s *Scanner) recordBlobMetadata(ctx context.Context, finishedBlob *blob.FinishedBlob, blobExists bool, startTime time.Time) error {
|
||||
finishedBlobID, err := types.ParseBlobID(finishedBlob.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing finished blob ID: %w", err)
|
||||
}
|
||||
|
||||
// Track upload duration (0 if blob already existed)
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
err = s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
// Update blob upload timestamp
|
||||
if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil {
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
if err := s.repos.Blobs.UpdateUploaded(txCtx, tx, finishedBlob.ID); err != nil {
|
||||
return fmt.Errorf("updating blob upload timestamp: %w", err)
|
||||
}
|
||||
|
||||
// Add the blob to the snapshot
|
||||
if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil {
|
||||
if err := s.repos.Snapshots.AddBlob(txCtx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil {
|
||||
return fmt.Errorf("adding blob to snapshot: %w", err)
|
||||
}
|
||||
|
||||
// Record upload metrics (only for actual uploads, not deduplicated blobs)
|
||||
if !blobExists {
|
||||
upload := &database.Upload{
|
||||
BlobHash: finishedBlob.Hash,
|
||||
@@ -1061,15 +1146,17 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
Size: finishedBlob.Compressed,
|
||||
DurationMs: uploadDuration.Milliseconds(),
|
||||
}
|
||||
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
|
||||
if err := s.repos.Uploads.Create(txCtx, tx, upload); err != nil {
|
||||
return fmt.Errorf("recording upload metrics: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Cleanup temp file if needed
|
||||
// cleanupBlobTempFile closes and removes the blob's temporary file
|
||||
func (s *Scanner) cleanupBlobTempFile(blobWithReader *blob.BlobWithReader) {
|
||||
if blobWithReader.TempFile != nil {
|
||||
tempName := blobWithReader.TempFile.Name()
|
||||
if err := blobWithReader.TempFile.Close(); err != nil {
|
||||
@@ -1079,77 +1166,41 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Chunks from this blob are now committed to DB - remove from pending set
|
||||
log.Debug("handleBlobReady: removing pending chunk hashes")
|
||||
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
|
||||
log.Debug("handleBlobReady: removed pending chunk hashes")
|
||||
|
||||
// Flush files whose chunks are now all committed
|
||||
// This maintains database consistency after each blob
|
||||
log.Debug("handleBlobReady: calling flushCompletedPendingFiles")
|
||||
if err := s.flushCompletedPendingFiles(dbCtx); err != nil {
|
||||
return fmt.Errorf("flushing completed files: %w", err)
|
||||
}
|
||||
log.Debug("handleBlobReady: flushCompletedPendingFiles returned")
|
||||
|
||||
log.Debug("handleBlobReady: complete")
|
||||
return nil
|
||||
// streamingChunkInfo tracks chunk metadata collected during streaming
|
||||
type streamingChunkInfo struct {
|
||||
fileChunk database.FileChunk
|
||||
offset int64
|
||||
size int64
|
||||
}
|
||||
|
||||
// processFileStreaming processes a file by streaming chunks directly to the packer
|
||||
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
|
||||
// Open the file
|
||||
file, err := s.fs.Open(fileToProcess.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening file: %w", err)
|
||||
}
|
||||
defer func() { _ = file.Close() }()
|
||||
|
||||
// We'll collect file chunks for database storage
|
||||
// but process them for packing as we go
|
||||
type chunkInfo struct {
|
||||
fileChunk database.FileChunk
|
||||
offset int64
|
||||
size int64
|
||||
}
|
||||
var chunks []chunkInfo
|
||||
var chunks []streamingChunkInfo
|
||||
chunkIndex := 0
|
||||
|
||||
// Process chunks in streaming fashion and get full file hash
|
||||
fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
|
||||
// Check for cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
log.Debug("Processing content-defined chunk from file",
|
||||
"file", fileToProcess.Path,
|
||||
"chunk_index", chunkIndex,
|
||||
"hash", chunk.Hash,
|
||||
"size", chunk.Size)
|
||||
|
||||
// Check if chunk already exists (fast in-memory lookup)
|
||||
chunkExists := s.chunkExists(chunk.Hash)
|
||||
|
||||
// Queue new chunks for batch insert when blob finalizes
|
||||
// This dramatically reduces transaction overhead
|
||||
if !chunkExists {
|
||||
s.packer.AddPendingChunk(chunk.Hash, chunk.Size)
|
||||
// Add to in-memory cache immediately for fast duplicate detection
|
||||
s.addKnownChunk(chunk.Hash)
|
||||
// Track as pending until blob finalizes and commits to DB
|
||||
s.addPendingChunkHash(chunk.Hash)
|
||||
}
|
||||
|
||||
// Track file chunk association for later storage
|
||||
chunks = append(chunks, chunkInfo{
|
||||
chunks = append(chunks, streamingChunkInfo{
|
||||
fileChunk: database.FileChunk{
|
||||
FileID: fileToProcess.File.ID,
|
||||
Idx: chunkIndex,
|
||||
@@ -1159,55 +1210,15 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
size: chunk.Size,
|
||||
})
|
||||
|
||||
// Update stats
|
||||
if chunkExists {
|
||||
result.FilesSkipped++ // Track as skipped for now
|
||||
result.BytesSkipped += chunk.Size
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesSkipped.Add(chunk.Size)
|
||||
}
|
||||
} else {
|
||||
result.ChunksCreated++
|
||||
result.BytesScanned += chunk.Size
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().ChunksCreated.Add(1)
|
||||
s.progress.GetStats().BytesProcessed.Add(chunk.Size)
|
||||
s.progress.UpdateChunkingActivity()
|
||||
}
|
||||
}
|
||||
s.updateChunkStats(chunkExists, chunk.Size, result)
|
||||
|
||||
// Add chunk to packer immediately (streaming)
|
||||
// This happens outside the database transaction
|
||||
if !chunkExists {
|
||||
s.packerMu.Lock()
|
||||
err := s.packer.AddChunk(&blob.ChunkRef{
|
||||
Hash: chunk.Hash,
|
||||
Data: chunk.Data,
|
||||
})
|
||||
if err == blob.ErrBlobSizeLimitExceeded {
|
||||
// Finalize current blob and retry
|
||||
if err := s.packer.FinalizeBlob(); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("finalizing blob: %w", err)
|
||||
}
|
||||
// Retry adding the chunk
|
||||
if err := s.packer.AddChunk(&blob.ChunkRef{
|
||||
Hash: chunk.Hash,
|
||||
Data: chunk.Data,
|
||||
}); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk after finalize: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk to packer: %w", err)
|
||||
if err := s.addChunkToPacker(chunk); err != nil {
|
||||
return err
|
||||
}
|
||||
s.packerMu.Unlock()
|
||||
}
|
||||
|
||||
// Clear chunk data from memory immediately after use
|
||||
chunk.Data = nil
|
||||
|
||||
chunkIndex++
|
||||
return nil
|
||||
})
|
||||
@@ -1217,12 +1228,54 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
}
|
||||
|
||||
log.Debug("Completed snapshotting file",
|
||||
"path", fileToProcess.Path,
|
||||
"file_hash", fileHash,
|
||||
"chunks", len(chunks))
|
||||
"path", fileToProcess.Path, "file_hash", fileHash, "chunks", len(chunks))
|
||||
|
||||
// Build file data for batch insertion
|
||||
// Update chunk associations with the file ID
|
||||
s.queueFileForBatchInsert(ctx, fileToProcess, chunks)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateChunkStats updates scan result and progress stats for a processed chunk
|
||||
func (s *Scanner) updateChunkStats(chunkExists bool, chunkSize int64, result *ScanResult) {
|
||||
if chunkExists {
|
||||
result.FilesSkipped++
|
||||
result.BytesSkipped += chunkSize
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesSkipped.Add(chunkSize)
|
||||
}
|
||||
} else {
|
||||
result.ChunksCreated++
|
||||
result.BytesScanned += chunkSize
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().ChunksCreated.Add(1)
|
||||
s.progress.GetStats().BytesProcessed.Add(chunkSize)
|
||||
s.progress.UpdateChunkingActivity()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addChunkToPacker adds a chunk to the blob packer, finalizing the current blob if needed
|
||||
func (s *Scanner) addChunkToPacker(chunk chunker.Chunk) error {
|
||||
s.packerMu.Lock()
|
||||
err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data})
|
||||
if err == blob.ErrBlobSizeLimitExceeded {
|
||||
if err := s.packer.FinalizeBlob(); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("finalizing blob: %w", err)
|
||||
}
|
||||
if err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data}); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk after finalize: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk to packer: %w", err)
|
||||
}
|
||||
s.packerMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// queueFileForBatchInsert builds file/chunk associations and queues the file for batch DB insert
|
||||
func (s *Scanner) queueFileForBatchInsert(ctx context.Context, fileToProcess *FileToProcess, chunks []streamingChunkInfo) {
|
||||
fileChunks := make([]database.FileChunk, len(chunks))
|
||||
chunkFiles := make([]database.ChunkFile, len(chunks))
|
||||
for i, ci := range chunks {
|
||||
@@ -1239,14 +1292,11 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
}
|
||||
}
|
||||
|
||||
// Queue file for batch insertion
|
||||
// Files will be flushed when their chunks are committed (after blob finalize)
|
||||
s.addPendingFile(ctx, pendingFileData{
|
||||
file: fileToProcess.File,
|
||||
fileChunks: fileChunks,
|
||||
chunkFiles: chunkFiles,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetProgress returns the progress reporter for this scanner
|
||||
|
||||
@@ -227,12 +227,39 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
}
|
||||
}()
|
||||
|
||||
// Steps 1-5: Copy, clean, vacuum, compress, and read the database
|
||||
finalData, tempDBPath, err := sm.prepareExportDB(ctx, dbPath, snapshotID, tempDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Step 6: Generate blob manifest (before closing temp DB)
|
||||
blobManifest, err := sm.generateBlobManifest(ctx, tempDBPath, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generating blob manifest: %w", err)
|
||||
}
|
||||
|
||||
// Step 7: Upload to S3 in snapshot subdirectory
|
||||
if err := sm.uploadSnapshotArtifacts(ctx, snapshotID, finalData, blobManifest); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Uploaded snapshot metadata",
|
||||
"snapshot_id", snapshotID,
|
||||
"db_size", len(finalData),
|
||||
"manifest_size", len(blobManifest))
|
||||
return nil
|
||||
}
|
||||
|
||||
// prepareExportDB copies, cleans, vacuums, and compresses the snapshot database for export.
|
||||
// Returns the compressed data and the path to the temporary database (needed for manifest generation).
|
||||
func (sm *SnapshotManager) prepareExportDB(ctx context.Context, dbPath, snapshotID, tempDir string) ([]byte, string, error) {
|
||||
// Step 1: Copy database to temp file
|
||||
// The main database should be closed at this point
|
||||
tempDBPath := filepath.Join(tempDir, "snapshot.db")
|
||||
log.Debug("Copying database to temporary location", "source", dbPath, "destination", tempDBPath)
|
||||
if err := sm.copyFile(dbPath, tempDBPath); err != nil {
|
||||
return fmt.Errorf("copying database: %w", err)
|
||||
return nil, "", fmt.Errorf("copying database: %w", err)
|
||||
}
|
||||
log.Debug("Database copy complete", "size", sm.getFileSize(tempDBPath))
|
||||
|
||||
@@ -240,7 +267,7 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
log.Debug("Cleaning temporary database", "snapshot_id", snapshotID)
|
||||
stats, err := sm.cleanSnapshotDB(ctx, tempDBPath, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cleaning snapshot database: %w", err)
|
||||
return nil, "", fmt.Errorf("cleaning snapshot database: %w", err)
|
||||
}
|
||||
log.Info("Temporary database cleanup complete",
|
||||
"db_path", tempDBPath,
|
||||
@@ -255,14 +282,14 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
// Step 3: VACUUM the database to remove deleted data and compact
|
||||
// This is critical for security - ensures no stale/deleted data is uploaded
|
||||
if err := sm.vacuumDatabase(tempDBPath); err != nil {
|
||||
return fmt.Errorf("vacuuming database: %w", err)
|
||||
return nil, "", fmt.Errorf("vacuuming database: %w", err)
|
||||
}
|
||||
log.Debug("Database vacuumed", "size", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))))
|
||||
|
||||
// Step 4: Compress and encrypt the binary database file
|
||||
compressedPath := filepath.Join(tempDir, "db.zst.age")
|
||||
if err := sm.compressFile(tempDBPath, compressedPath); err != nil {
|
||||
return fmt.Errorf("compressing database: %w", err)
|
||||
return nil, "", fmt.Errorf("compressing database: %w", err)
|
||||
}
|
||||
log.Debug("Compression complete",
|
||||
"original_size", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))),
|
||||
@@ -271,49 +298,43 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
// Step 5: Read compressed and encrypted data for upload
|
||||
finalData, err := afero.ReadFile(sm.fs, compressedPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading compressed dump: %w", err)
|
||||
return nil, "", fmt.Errorf("reading compressed dump: %w", err)
|
||||
}
|
||||
|
||||
// Step 6: Generate blob manifest (before closing temp DB)
|
||||
blobManifest, err := sm.generateBlobManifest(ctx, tempDBPath, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generating blob manifest: %w", err)
|
||||
}
|
||||
return finalData, tempDBPath, nil
|
||||
}
|
||||
|
||||
// Step 7: Upload to S3 in snapshot subdirectory
|
||||
// uploadSnapshotArtifacts uploads the database backup and blob manifest to S3
|
||||
func (sm *SnapshotManager) uploadSnapshotArtifacts(ctx context.Context, snapshotID string, dbData, manifestData []byte) error {
|
||||
// Upload database backup (compressed and encrypted)
|
||||
dbKey := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
||||
|
||||
dbUploadStart := time.Now()
|
||||
if err := sm.storage.Put(ctx, dbKey, bytes.NewReader(finalData)); err != nil {
|
||||
if err := sm.storage.Put(ctx, dbKey, bytes.NewReader(dbData)); err != nil {
|
||||
return fmt.Errorf("uploading snapshot database: %w", err)
|
||||
}
|
||||
dbUploadDuration := time.Since(dbUploadStart)
|
||||
dbUploadSpeed := float64(len(finalData)) * 8 / dbUploadDuration.Seconds() // bits per second
|
||||
dbUploadSpeed := float64(len(dbData)) * 8 / dbUploadDuration.Seconds() // bits per second
|
||||
log.Info("Uploaded snapshot database",
|
||||
"path", dbKey,
|
||||
"size", humanize.Bytes(uint64(len(finalData))),
|
||||
"size", humanize.Bytes(uint64(len(dbData))),
|
||||
"duration", dbUploadDuration,
|
||||
"speed", humanize.SI(dbUploadSpeed, "bps"))
|
||||
|
||||
// Upload blob manifest (compressed only, not encrypted)
|
||||
manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
manifestUploadStart := time.Now()
|
||||
if err := sm.storage.Put(ctx, manifestKey, bytes.NewReader(blobManifest)); err != nil {
|
||||
if err := sm.storage.Put(ctx, manifestKey, bytes.NewReader(manifestData)); err != nil {
|
||||
return fmt.Errorf("uploading blob manifest: %w", err)
|
||||
}
|
||||
manifestUploadDuration := time.Since(manifestUploadStart)
|
||||
manifestUploadSpeed := float64(len(blobManifest)) * 8 / manifestUploadDuration.Seconds() // bits per second
|
||||
manifestUploadSpeed := float64(len(manifestData)) * 8 / manifestUploadDuration.Seconds() // bits per second
|
||||
log.Info("Uploaded blob manifest",
|
||||
"path", manifestKey,
|
||||
"size", humanize.Bytes(uint64(len(blobManifest))),
|
||||
"size", humanize.Bytes(uint64(len(manifestData))),
|
||||
"duration", manifestUploadDuration,
|
||||
"speed", humanize.SI(manifestUploadSpeed, "bps"))
|
||||
|
||||
log.Info("Uploaded snapshot metadata",
|
||||
"snapshot_id", snapshotID,
|
||||
"db_size", len(finalData),
|
||||
"manifest_size", len(blobManifest))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user