refactor: break up oversized methods into smaller descriptive helpers
Addresses #40 (feedback from PR #37). Refactored methods: - createNamedSnapshot (214→~60 lines) → resolveSnapshotPaths, scanAllDirectories, collectUploadStats, finalizeSnapshotMetadata, printSnapshotSummary, getSnapshotBlobSizes, formatUploadSpeed - ListSnapshots (159→~30 lines) → listRemoteSnapshotIDs, reconcileLocalWithRemote, buildSnapshotInfoList, printSnapshotTable - PruneBlobs (170→~40 lines) → collectReferencedBlobs, listUniqueSnapshotIDs, listAllRemoteBlobs, findUnreferencedBlobs, deleteUnreferencedBlobs - RunDeepVerify (182→~50 lines) → loadVerificationData, runVerificationSteps, deepVerifyFailure - RemoteInfo (187→~30 lines) → collectSnapshotMetadata, collectReferencedBlobsFromManifests, populateRemoteInfoResult, scanRemoteBlobStorage, printRemoteInfoTable - handleBlobReady (173→~30 lines) → uploadBlobIfNeeded, makeUploadProgressCallback, recordBlobMetadata, cleanupBlobTempFile - processFileStreaming (146→~50 lines) → updateChunkStats, addChunkToPacker, queueFileForBatchInsert - finalizeCurrentBlob (167→~30 lines) → closeBlobWriter, buildChunkRefs, commitBlobToDatabase, deliverFinishedBlob All tests pass. No behavioral changes.
This commit is contained in:
@@ -931,40 +931,99 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
startTime := time.Now().UTC()
|
||||
finishedBlob := blobWithReader.FinishedBlob
|
||||
|
||||
// Report upload start and increment blobs created
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
|
||||
s.progress.GetStats().BlobsCreated.Add(1)
|
||||
}
|
||||
|
||||
// Upload to storage first (without holding any locks)
|
||||
// Use scan context for cancellation support
|
||||
ctx := s.scanCtx
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// Track bytes uploaded for accurate speed calculation
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
||||
blobExists := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime)
|
||||
|
||||
if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil {
|
||||
s.cleanupBlobTempFile(blobWithReader)
|
||||
return err
|
||||
}
|
||||
|
||||
s.cleanupBlobTempFile(blobWithReader)
|
||||
|
||||
// Chunks from this blob are now committed to DB - remove from pending set
|
||||
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
|
||||
|
||||
// Flush files whose chunks are now all committed
|
||||
if err := s.flushCompletedPendingFiles(ctx); err != nil {
|
||||
return fmt.Errorf("flushing completed files: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed
|
||||
func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) bool {
|
||||
finishedBlob := blobWithReader.FinishedBlob
|
||||
|
||||
// Check if blob already exists (deduplication after restart)
|
||||
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
||||
log.Info("Blob already exists in storage, skipping upload",
|
||||
"hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
fmt.Printf("Blob exists: %s (%s, skipped upload)\n",
|
||||
finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
return true
|
||||
}
|
||||
|
||||
progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob)
|
||||
|
||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err)
|
||||
return false
|
||||
}
|
||||
|
||||
uploadDuration := time.Since(startTime)
|
||||
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
||||
|
||||
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
humanize.Bytes(uint64(uploadSpeedBps)),
|
||||
uploadDuration.Round(time.Millisecond))
|
||||
|
||||
log.Info("Successfully uploaded blob to storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
"speed", humanize.SI(uploadSpeedBps*8, "bps"))
|
||||
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
||||
stats := s.progress.GetStats()
|
||||
stats.BlobsUploaded.Add(1)
|
||||
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// makeUploadProgressCallback creates a progress callback for blob uploads
|
||||
func (s *Scanner) makeUploadProgressCallback(ctx context.Context, finishedBlob *blob.FinishedBlob) func(int64) error {
|
||||
lastProgressTime := time.Now()
|
||||
lastProgressBytes := int64(0)
|
||||
|
||||
progressCallback := func(uploaded int64) error {
|
||||
// Calculate instantaneous speed
|
||||
return func(uploaded int64) error {
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(lastProgressTime).Seconds()
|
||||
if elapsed > 0.5 { // Update speed every 0.5 seconds
|
||||
if elapsed > 0.5 {
|
||||
bytesSinceLastUpdate := uploaded - lastProgressBytes
|
||||
speed := float64(bytesSinceLastUpdate) / elapsed
|
||||
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed)
|
||||
}
|
||||
|
||||
lastProgressTime = now
|
||||
lastProgressBytes = uploaded
|
||||
}
|
||||
|
||||
// Check for cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -972,87 +1031,26 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create sharded path: blobs/ca/fe/cafebabe...
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
||||
|
||||
// Check if blob already exists in remote storage (deduplication after restart)
|
||||
blobExists := false
|
||||
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
||||
blobExists = true
|
||||
log.Info("Blob already exists in storage, skipping upload",
|
||||
"hash", finishedBlob.Hash,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
fmt.Printf("Blob exists: %s (%s, skipped upload)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
}
|
||||
|
||||
if !blobExists {
|
||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err)
|
||||
}
|
||||
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
// Calculate upload speed
|
||||
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
||||
|
||||
// Print blob stored message
|
||||
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
humanize.Bytes(uint64(uploadSpeedBps)),
|
||||
uploadDuration.Round(time.Millisecond))
|
||||
|
||||
// Log upload stats
|
||||
uploadSpeedBits := uploadSpeedBps * 8 // bits per second
|
||||
log.Info("Successfully uploaded blob to storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
"speed", humanize.SI(uploadSpeedBits, "bps"))
|
||||
|
||||
// Report upload complete
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
||||
}
|
||||
|
||||
// Update progress after upload completes
|
||||
if s.progress != nil {
|
||||
stats := s.progress.GetStats()
|
||||
stats.BlobsUploaded.Add(1)
|
||||
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
||||
}
|
||||
}
|
||||
|
||||
// Store metadata in database (after upload is complete)
|
||||
dbCtx := s.scanCtx
|
||||
if dbCtx == nil {
|
||||
dbCtx = context.Background()
|
||||
}
|
||||
|
||||
// Parse blob ID for typed operations
|
||||
// recordBlobMetadata stores blob upload metadata in the database
|
||||
func (s *Scanner) recordBlobMetadata(ctx context.Context, finishedBlob *blob.FinishedBlob, blobExists bool, startTime time.Time) error {
|
||||
finishedBlobID, err := types.ParseBlobID(finishedBlob.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing finished blob ID: %w", err)
|
||||
}
|
||||
|
||||
// Track upload duration (0 if blob already existed)
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
err = s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
// Update blob upload timestamp
|
||||
if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil {
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
if err := s.repos.Blobs.UpdateUploaded(txCtx, tx, finishedBlob.ID); err != nil {
|
||||
return fmt.Errorf("updating blob upload timestamp: %w", err)
|
||||
}
|
||||
|
||||
// Add the blob to the snapshot
|
||||
if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil {
|
||||
if err := s.repos.Snapshots.AddBlob(txCtx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil {
|
||||
return fmt.Errorf("adding blob to snapshot: %w", err)
|
||||
}
|
||||
|
||||
// Record upload metrics (only for actual uploads, not deduplicated blobs)
|
||||
if !blobExists {
|
||||
upload := &database.Upload{
|
||||
BlobHash: finishedBlob.Hash,
|
||||
@@ -1061,15 +1059,17 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
Size: finishedBlob.Compressed,
|
||||
DurationMs: uploadDuration.Milliseconds(),
|
||||
}
|
||||
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
|
||||
if err := s.repos.Uploads.Create(txCtx, tx, upload); err != nil {
|
||||
return fmt.Errorf("recording upload metrics: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Cleanup temp file if needed
|
||||
// cleanupBlobTempFile closes and removes the blob's temporary file
|
||||
func (s *Scanner) cleanupBlobTempFile(blobWithReader *blob.BlobWithReader) {
|
||||
if blobWithReader.TempFile != nil {
|
||||
tempName := blobWithReader.TempFile.Name()
|
||||
if err := blobWithReader.TempFile.Close(); err != nil {
|
||||
@@ -1079,77 +1079,41 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Chunks from this blob are now committed to DB - remove from pending set
|
||||
log.Debug("handleBlobReady: removing pending chunk hashes")
|
||||
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
|
||||
log.Debug("handleBlobReady: removed pending chunk hashes")
|
||||
|
||||
// Flush files whose chunks are now all committed
|
||||
// This maintains database consistency after each blob
|
||||
log.Debug("handleBlobReady: calling flushCompletedPendingFiles")
|
||||
if err := s.flushCompletedPendingFiles(dbCtx); err != nil {
|
||||
return fmt.Errorf("flushing completed files: %w", err)
|
||||
}
|
||||
log.Debug("handleBlobReady: flushCompletedPendingFiles returned")
|
||||
|
||||
log.Debug("handleBlobReady: complete")
|
||||
return nil
|
||||
// streamingChunkInfo tracks chunk metadata collected during streaming
|
||||
type streamingChunkInfo struct {
|
||||
fileChunk database.FileChunk
|
||||
offset int64
|
||||
size int64
|
||||
}
|
||||
|
||||
// processFileStreaming processes a file by streaming chunks directly to the packer
|
||||
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
|
||||
// Open the file
|
||||
file, err := s.fs.Open(fileToProcess.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening file: %w", err)
|
||||
}
|
||||
defer func() { _ = file.Close() }()
|
||||
|
||||
// We'll collect file chunks for database storage
|
||||
// but process them for packing as we go
|
||||
type chunkInfo struct {
|
||||
fileChunk database.FileChunk
|
||||
offset int64
|
||||
size int64
|
||||
}
|
||||
var chunks []chunkInfo
|
||||
var chunks []streamingChunkInfo
|
||||
chunkIndex := 0
|
||||
|
||||
// Process chunks in streaming fashion and get full file hash
|
||||
fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
|
||||
// Check for cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
log.Debug("Processing content-defined chunk from file",
|
||||
"file", fileToProcess.Path,
|
||||
"chunk_index", chunkIndex,
|
||||
"hash", chunk.Hash,
|
||||
"size", chunk.Size)
|
||||
|
||||
// Check if chunk already exists (fast in-memory lookup)
|
||||
chunkExists := s.chunkExists(chunk.Hash)
|
||||
|
||||
// Queue new chunks for batch insert when blob finalizes
|
||||
// This dramatically reduces transaction overhead
|
||||
if !chunkExists {
|
||||
s.packer.AddPendingChunk(chunk.Hash, chunk.Size)
|
||||
// Add to in-memory cache immediately for fast duplicate detection
|
||||
s.addKnownChunk(chunk.Hash)
|
||||
// Track as pending until blob finalizes and commits to DB
|
||||
s.addPendingChunkHash(chunk.Hash)
|
||||
}
|
||||
|
||||
// Track file chunk association for later storage
|
||||
chunks = append(chunks, chunkInfo{
|
||||
chunks = append(chunks, streamingChunkInfo{
|
||||
fileChunk: database.FileChunk{
|
||||
FileID: fileToProcess.File.ID,
|
||||
Idx: chunkIndex,
|
||||
@@ -1159,55 +1123,15 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
size: chunk.Size,
|
||||
})
|
||||
|
||||
// Update stats
|
||||
if chunkExists {
|
||||
result.FilesSkipped++ // Track as skipped for now
|
||||
result.BytesSkipped += chunk.Size
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesSkipped.Add(chunk.Size)
|
||||
}
|
||||
} else {
|
||||
result.ChunksCreated++
|
||||
result.BytesScanned += chunk.Size
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().ChunksCreated.Add(1)
|
||||
s.progress.GetStats().BytesProcessed.Add(chunk.Size)
|
||||
s.progress.UpdateChunkingActivity()
|
||||
}
|
||||
}
|
||||
s.updateChunkStats(chunkExists, chunk.Size, result)
|
||||
|
||||
// Add chunk to packer immediately (streaming)
|
||||
// This happens outside the database transaction
|
||||
if !chunkExists {
|
||||
s.packerMu.Lock()
|
||||
err := s.packer.AddChunk(&blob.ChunkRef{
|
||||
Hash: chunk.Hash,
|
||||
Data: chunk.Data,
|
||||
})
|
||||
if err == blob.ErrBlobSizeLimitExceeded {
|
||||
// Finalize current blob and retry
|
||||
if err := s.packer.FinalizeBlob(); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("finalizing blob: %w", err)
|
||||
}
|
||||
// Retry adding the chunk
|
||||
if err := s.packer.AddChunk(&blob.ChunkRef{
|
||||
Hash: chunk.Hash,
|
||||
Data: chunk.Data,
|
||||
}); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk after finalize: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk to packer: %w", err)
|
||||
if err := s.addChunkToPacker(chunk); err != nil {
|
||||
return err
|
||||
}
|
||||
s.packerMu.Unlock()
|
||||
}
|
||||
|
||||
// Clear chunk data from memory immediately after use
|
||||
chunk.Data = nil
|
||||
|
||||
chunkIndex++
|
||||
return nil
|
||||
})
|
||||
@@ -1217,12 +1141,54 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
}
|
||||
|
||||
log.Debug("Completed snapshotting file",
|
||||
"path", fileToProcess.Path,
|
||||
"file_hash", fileHash,
|
||||
"chunks", len(chunks))
|
||||
"path", fileToProcess.Path, "file_hash", fileHash, "chunks", len(chunks))
|
||||
|
||||
// Build file data for batch insertion
|
||||
// Update chunk associations with the file ID
|
||||
s.queueFileForBatchInsert(ctx, fileToProcess, chunks)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateChunkStats updates scan result and progress stats for a processed chunk
|
||||
func (s *Scanner) updateChunkStats(chunkExists bool, chunkSize int64, result *ScanResult) {
|
||||
if chunkExists {
|
||||
result.FilesSkipped++
|
||||
result.BytesSkipped += chunkSize
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesSkipped.Add(chunkSize)
|
||||
}
|
||||
} else {
|
||||
result.ChunksCreated++
|
||||
result.BytesScanned += chunkSize
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().ChunksCreated.Add(1)
|
||||
s.progress.GetStats().BytesProcessed.Add(chunkSize)
|
||||
s.progress.UpdateChunkingActivity()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addChunkToPacker adds a chunk to the blob packer, finalizing the current blob if needed
|
||||
func (s *Scanner) addChunkToPacker(chunk chunker.Chunk) error {
|
||||
s.packerMu.Lock()
|
||||
err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data})
|
||||
if err == blob.ErrBlobSizeLimitExceeded {
|
||||
if err := s.packer.FinalizeBlob(); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("finalizing blob: %w", err)
|
||||
}
|
||||
if err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data}); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk after finalize: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk to packer: %w", err)
|
||||
}
|
||||
s.packerMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// queueFileForBatchInsert builds file/chunk associations and queues the file for batch DB insert
|
||||
func (s *Scanner) queueFileForBatchInsert(ctx context.Context, fileToProcess *FileToProcess, chunks []streamingChunkInfo) {
|
||||
fileChunks := make([]database.FileChunk, len(chunks))
|
||||
chunkFiles := make([]database.ChunkFile, len(chunks))
|
||||
for i, ci := range chunks {
|
||||
@@ -1239,14 +1205,11 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
}
|
||||
}
|
||||
|
||||
// Queue file for batch insertion
|
||||
// Files will be flushed when their chunks are committed (after blob finalize)
|
||||
s.addPendingFile(ctx, pendingFileData{
|
||||
file: fileToProcess.File,
|
||||
fileChunks: fileChunks,
|
||||
chunkFiles: chunkFiles,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetProgress returns the progress reporter for this scanner
|
||||
|
||||
Reference in New Issue
Block a user