refactor: break up oversized methods into smaller descriptive helpers

Addresses #40 (feedback from PR #37).

Refactored methods:
- createNamedSnapshot (214→~60 lines) → resolveSnapshotPaths, scanAllDirectories,
  collectUploadStats, finalizeSnapshotMetadata, printSnapshotSummary, getSnapshotBlobSizes,
  formatUploadSpeed
- ListSnapshots (159→~30 lines) → listRemoteSnapshotIDs, reconcileLocalWithRemote,
  buildSnapshotInfoList, printSnapshotTable
- PruneBlobs (170→~40 lines) → collectReferencedBlobs, listUniqueSnapshotIDs,
  listAllRemoteBlobs, findUnreferencedBlobs, deleteUnreferencedBlobs
- RunDeepVerify (182→~50 lines) → loadVerificationData, runVerificationSteps,
  deepVerifyFailure
- RemoteInfo (187→~30 lines) → collectSnapshotMetadata,
  collectReferencedBlobsFromManifests, populateRemoteInfoResult,
  scanRemoteBlobStorage, printRemoteInfoTable
- handleBlobReady (173→~30 lines) → uploadBlobIfNeeded, makeUploadProgressCallback,
  recordBlobMetadata, cleanupBlobTempFile
- processFileStreaming (146→~50 lines) → updateChunkStats, addChunkToPacker,
  queueFileForBatchInsert
- finalizeCurrentBlob (167→~30 lines) → closeBlobWriter, buildChunkRefs,
  commitBlobToDatabase, deliverFinishedBlob

All tests pass. No behavioral changes.
This commit is contained in:
user
2026-02-20 02:31:56 -08:00
parent c24e7e6360
commit ca2171cb42
6 changed files with 724 additions and 739 deletions

View File

@@ -361,101 +361,23 @@ func (p *Packer) finalizeCurrentBlob() error {
return nil
}
// Close blobgen writer to flush all data
if err := p.currentBlob.writer.Close(); err != nil {
p.cleanupTempFile()
return fmt.Errorf("closing blobgen writer: %w", err)
}
// Sync file to ensure all data is written
if err := p.currentBlob.tempFile.Sync(); err != nil {
p.cleanupTempFile()
return fmt.Errorf("syncing temp file: %w", err)
}
// Get the final size (encrypted if applicable)
finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent)
blobHash, finalSize, err := p.closeBlobWriter()
if err != nil {
p.cleanupTempFile()
return fmt.Errorf("getting file size: %w", err)
return err
}
// Reset to beginning for reading
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
p.cleanupTempFile()
return fmt.Errorf("seeking to start: %w", err)
}
chunkRefs := p.buildChunkRefs()
// Get hash from blobgen writer (of final encrypted data)
finalHash := p.currentBlob.writer.Sum256()
blobHash := hex.EncodeToString(finalHash)
// Create chunk references with offsets
chunkRefs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks))
for _, chunk := range p.currentBlob.chunks {
chunkRefs = append(chunkRefs, &BlobChunkRef{
ChunkHash: chunk.Hash,
Offset: chunk.Offset,
Length: chunk.Size,
})
}
// Get pending chunks (will be inserted to DB and reported to handler)
chunksToInsert := p.pendingChunks
p.pendingChunks = nil // Clear pending list
p.pendingChunks = nil
// Insert pending chunks, blob_chunks, and update blob in a single transaction
if p.repos != nil {
blobIDTyped, parseErr := types.ParseBlobID(p.currentBlob.id)
if parseErr != nil {
p.cleanupTempFile()
return fmt.Errorf("parsing blob ID: %w", parseErr)
}
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
// First insert all pending chunks (required for blob_chunks FK)
for _, chunk := range chunksToInsert {
dbChunk := &database.Chunk{
ChunkHash: types.ChunkHash(chunk.Hash),
Size: chunk.Size,
}
if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil {
return fmt.Errorf("creating chunk: %w", err)
}
}
// Insert all blob_chunk records in batch
for _, chunk := range p.currentBlob.chunks {
blobChunk := &database.BlobChunk{
BlobID: blobIDTyped,
ChunkHash: types.ChunkHash(chunk.Hash),
Offset: chunk.Offset,
Length: chunk.Size,
}
if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil {
return fmt.Errorf("creating blob_chunk: %w", err)
}
}
// Update blob record with final hash and sizes
return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash,
p.currentBlob.size, finalSize)
})
if err != nil {
p.cleanupTempFile()
return fmt.Errorf("finalizing blob transaction: %w", err)
}
log.Debug("Committed blob transaction",
"chunks_inserted", len(chunksToInsert),
"blob_chunks_inserted", len(p.currentBlob.chunks))
if err := p.commitBlobToDatabase(blobHash, finalSize, chunksToInsert); err != nil {
return err
}
// Create finished blob
finished := &FinishedBlob{
ID: p.currentBlob.id,
Hash: blobHash,
Data: nil, // We don't load data into memory anymore
Chunks: chunkRefs,
CreatedTS: p.currentBlob.startTime,
Uncompressed: p.currentBlob.size,
@@ -464,28 +386,105 @@ func (p *Packer) finalizeCurrentBlob() error {
compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed)
log.Info("Finalized blob (compressed and encrypted)",
"hash", blobHash,
"chunks", len(chunkRefs),
"uncompressed", finished.Uncompressed,
"compressed", finished.Compressed,
"hash", blobHash, "chunks", len(chunkRefs),
"uncompressed", finished.Uncompressed, "compressed", finished.Compressed,
"ratio", fmt.Sprintf("%.2f", compressionRatio),
"duration", time.Since(p.currentBlob.startTime))
// Collect inserted chunk hashes for the scanner to track
var insertedChunkHashes []string
for _, chunk := range chunksToInsert {
insertedChunkHashes = append(insertedChunkHashes, chunk.Hash)
}
// Call blob handler if set
return p.deliverFinishedBlob(finished, insertedChunkHashes)
}
// closeBlobWriter closes the writer, syncs to disk, and returns the blob hash and final size
func (p *Packer) closeBlobWriter() (string, int64, error) {
if err := p.currentBlob.writer.Close(); err != nil {
p.cleanupTempFile()
return "", 0, fmt.Errorf("closing blobgen writer: %w", err)
}
if err := p.currentBlob.tempFile.Sync(); err != nil {
p.cleanupTempFile()
return "", 0, fmt.Errorf("syncing temp file: %w", err)
}
finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent)
if err != nil {
p.cleanupTempFile()
return "", 0, fmt.Errorf("getting file size: %w", err)
}
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
p.cleanupTempFile()
return "", 0, fmt.Errorf("seeking to start: %w", err)
}
finalHash := p.currentBlob.writer.Sum256()
return hex.EncodeToString(finalHash), finalSize, nil
}
// buildChunkRefs creates BlobChunkRef entries from the current blob's chunks
func (p *Packer) buildChunkRefs() []*BlobChunkRef {
refs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks))
for _, chunk := range p.currentBlob.chunks {
refs = append(refs, &BlobChunkRef{
ChunkHash: chunk.Hash, Offset: chunk.Offset, Length: chunk.Size,
})
}
return refs
}
// commitBlobToDatabase inserts pending chunks, blob_chunks, and updates the blob record
func (p *Packer) commitBlobToDatabase(blobHash string, finalSize int64, chunksToInsert []PendingChunk) error {
if p.repos == nil {
return nil
}
blobIDTyped, parseErr := types.ParseBlobID(p.currentBlob.id)
if parseErr != nil {
p.cleanupTempFile()
return fmt.Errorf("parsing blob ID: %w", parseErr)
}
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
for _, chunk := range chunksToInsert {
dbChunk := &database.Chunk{ChunkHash: types.ChunkHash(chunk.Hash), Size: chunk.Size}
if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil {
return fmt.Errorf("creating chunk: %w", err)
}
}
for _, chunk := range p.currentBlob.chunks {
blobChunk := &database.BlobChunk{
BlobID: blobIDTyped, ChunkHash: types.ChunkHash(chunk.Hash),
Offset: chunk.Offset, Length: chunk.Size,
}
if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil {
return fmt.Errorf("creating blob_chunk: %w", err)
}
}
return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash, p.currentBlob.size, finalSize)
})
if err != nil {
p.cleanupTempFile()
return fmt.Errorf("finalizing blob transaction: %w", err)
}
log.Debug("Committed blob transaction",
"chunks_inserted", len(chunksToInsert), "blob_chunks_inserted", len(p.currentBlob.chunks))
return nil
}
// deliverFinishedBlob passes the blob to the handler or stores it internally
func (p *Packer) deliverFinishedBlob(finished *FinishedBlob, insertedChunkHashes []string) error {
if p.blobHandler != nil {
// Reset file position for handler
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
p.cleanupTempFile()
return fmt.Errorf("seeking for handler: %w", err)
}
// Create a blob reader that includes the data stream
blobWithReader := &BlobWithReader{
FinishedBlob: finished,
Reader: p.currentBlob.tempFile,
@@ -497,30 +496,26 @@ func (p *Packer) finalizeCurrentBlob() error {
p.cleanupTempFile()
return fmt.Errorf("blob handler failed: %w", err)
}
// Note: blob handler is responsible for closing/cleaning up temp file
p.currentBlob = nil
} else {
log.Debug("No blob handler callback configured", "blob_hash", blobHash[:8]+"...")
// No handler, need to read data for legacy behavior
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
p.cleanupTempFile()
return fmt.Errorf("seeking to read data: %w", err)
}
data, err := io.ReadAll(p.currentBlob.tempFile)
if err != nil {
p.cleanupTempFile()
return fmt.Errorf("reading blob data: %w", err)
}
finished.Data = data
p.finishedBlobs = append(p.finishedBlobs, finished)
// Cleanup
p.cleanupTempFile()
p.currentBlob = nil
return nil
}
// No handler - read data for legacy behavior
log.Debug("No blob handler callback configured", "blob_hash", finished.Hash[:8]+"...")
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
p.cleanupTempFile()
return fmt.Errorf("seeking to read data: %w", err)
}
data, err := io.ReadAll(p.currentBlob.tempFile)
if err != nil {
p.cleanupTempFile()
return fmt.Errorf("reading blob data: %w", err)
}
finished.Data = data
p.finishedBlobs = append(p.finishedBlobs, finished)
p.cleanupTempFile()
p.currentBlob = nil
return nil
}