From eb23e14799b9bcd1a84908fe8b23493cac124a5c Mon Sep 17 00:00:00 2001 From: user Date: Fri, 20 Feb 2026 02:31:56 -0800 Subject: [PATCH 1/2] refactor: break up oversized methods into smaller descriptive helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses #40 (feedback from PR #37). Refactored methods: - createNamedSnapshot (214→~60 lines) → resolveSnapshotPaths, scanAllDirectories, collectUploadStats, finalizeSnapshotMetadata, printSnapshotSummary, getSnapshotBlobSizes, formatUploadSpeed - ListSnapshots (159→~30 lines) → listRemoteSnapshotIDs, reconcileLocalWithRemote, buildSnapshotInfoList, printSnapshotTable - PruneBlobs (170→~40 lines) → collectReferencedBlobs, listUniqueSnapshotIDs, listAllRemoteBlobs, findUnreferencedBlobs, deleteUnreferencedBlobs - RunDeepVerify (182→~50 lines) → loadVerificationData, runVerificationSteps, deepVerifyFailure - RemoteInfo (187→~30 lines) → collectSnapshotMetadata, collectReferencedBlobsFromManifests, populateRemoteInfoResult, scanRemoteBlobStorage, printRemoteInfoTable - handleBlobReady (173→~30 lines) → uploadBlobIfNeeded, makeUploadProgressCallback, recordBlobMetadata, cleanupBlobTempFile - processFileStreaming (146→~50 lines) → updateChunkStats, addChunkToPacker, queueFileForBatchInsert - finalizeCurrentBlob (167→~30 lines) → closeBlobWriter, buildChunkRefs, commitBlobToDatabase, deliverFinishedBlob All tests pass. No behavioral changes. --- internal/blob/packer.go | 221 +++++++++++---------- internal/snapshot/scanner.go | 313 ++++++++++++++---------------- internal/vaultik/info.go | 96 +++++----- internal/vaultik/prune.go | 223 +++++++++++----------- internal/vaultik/snapshot.go | 360 +++++++++++++++++++---------------- internal/vaultik/verify.go | 250 +++++++++++------------- 6 files changed, 724 insertions(+), 739 deletions(-) diff --git a/internal/blob/packer.go b/internal/blob/packer.go index c5284ec..7edf15b 100644 --- a/internal/blob/packer.go +++ b/internal/blob/packer.go @@ -361,101 +361,23 @@ func (p *Packer) finalizeCurrentBlob() error { return nil } - // Close blobgen writer to flush all data - if err := p.currentBlob.writer.Close(); err != nil { - p.cleanupTempFile() - return fmt.Errorf("closing blobgen writer: %w", err) - } - - // Sync file to ensure all data is written - if err := p.currentBlob.tempFile.Sync(); err != nil { - p.cleanupTempFile() - return fmt.Errorf("syncing temp file: %w", err) - } - - // Get the final size (encrypted if applicable) - finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent) + blobHash, finalSize, err := p.closeBlobWriter() if err != nil { - p.cleanupTempFile() - return fmt.Errorf("getting file size: %w", err) + return err } - // Reset to beginning for reading - if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { - p.cleanupTempFile() - return fmt.Errorf("seeking to start: %w", err) - } + chunkRefs := p.buildChunkRefs() - // Get hash from blobgen writer (of final encrypted data) - finalHash := p.currentBlob.writer.Sum256() - blobHash := hex.EncodeToString(finalHash) - - // Create chunk references with offsets - chunkRefs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks)) - - for _, chunk := range p.currentBlob.chunks { - chunkRefs = append(chunkRefs, &BlobChunkRef{ - ChunkHash: chunk.Hash, - Offset: chunk.Offset, - Length: chunk.Size, - }) - } - - // Get pending chunks (will be inserted to DB and reported to handler) chunksToInsert := p.pendingChunks - p.pendingChunks = nil // Clear pending list + p.pendingChunks = nil - // Insert pending chunks, blob_chunks, and update blob in a single transaction - if p.repos != nil { - blobIDTyped, parseErr := types.ParseBlobID(p.currentBlob.id) - if parseErr != nil { - p.cleanupTempFile() - return fmt.Errorf("parsing blob ID: %w", parseErr) - } - err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { - // First insert all pending chunks (required for blob_chunks FK) - for _, chunk := range chunksToInsert { - dbChunk := &database.Chunk{ - ChunkHash: types.ChunkHash(chunk.Hash), - Size: chunk.Size, - } - if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil { - return fmt.Errorf("creating chunk: %w", err) - } - } - - // Insert all blob_chunk records in batch - for _, chunk := range p.currentBlob.chunks { - blobChunk := &database.BlobChunk{ - BlobID: blobIDTyped, - ChunkHash: types.ChunkHash(chunk.Hash), - Offset: chunk.Offset, - Length: chunk.Size, - } - if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil { - return fmt.Errorf("creating blob_chunk: %w", err) - } - } - - // Update blob record with final hash and sizes - return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash, - p.currentBlob.size, finalSize) - }) - if err != nil { - p.cleanupTempFile() - return fmt.Errorf("finalizing blob transaction: %w", err) - } - - log.Debug("Committed blob transaction", - "chunks_inserted", len(chunksToInsert), - "blob_chunks_inserted", len(p.currentBlob.chunks)) + if err := p.commitBlobToDatabase(blobHash, finalSize, chunksToInsert); err != nil { + return err } - // Create finished blob finished := &FinishedBlob{ ID: p.currentBlob.id, Hash: blobHash, - Data: nil, // We don't load data into memory anymore Chunks: chunkRefs, CreatedTS: p.currentBlob.startTime, Uncompressed: p.currentBlob.size, @@ -464,28 +386,105 @@ func (p *Packer) finalizeCurrentBlob() error { compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed) log.Info("Finalized blob (compressed and encrypted)", - "hash", blobHash, - "chunks", len(chunkRefs), - "uncompressed", finished.Uncompressed, - "compressed", finished.Compressed, + "hash", blobHash, "chunks", len(chunkRefs), + "uncompressed", finished.Uncompressed, "compressed", finished.Compressed, "ratio", fmt.Sprintf("%.2f", compressionRatio), "duration", time.Since(p.currentBlob.startTime)) - // Collect inserted chunk hashes for the scanner to track var insertedChunkHashes []string for _, chunk := range chunksToInsert { insertedChunkHashes = append(insertedChunkHashes, chunk.Hash) } - // Call blob handler if set + return p.deliverFinishedBlob(finished, insertedChunkHashes) +} + +// closeBlobWriter closes the writer, syncs to disk, and returns the blob hash and final size +func (p *Packer) closeBlobWriter() (string, int64, error) { + if err := p.currentBlob.writer.Close(); err != nil { + p.cleanupTempFile() + return "", 0, fmt.Errorf("closing blobgen writer: %w", err) + } + if err := p.currentBlob.tempFile.Sync(); err != nil { + p.cleanupTempFile() + return "", 0, fmt.Errorf("syncing temp file: %w", err) + } + + finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent) + if err != nil { + p.cleanupTempFile() + return "", 0, fmt.Errorf("getting file size: %w", err) + } + if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { + p.cleanupTempFile() + return "", 0, fmt.Errorf("seeking to start: %w", err) + } + + finalHash := p.currentBlob.writer.Sum256() + return hex.EncodeToString(finalHash), finalSize, nil +} + +// buildChunkRefs creates BlobChunkRef entries from the current blob's chunks +func (p *Packer) buildChunkRefs() []*BlobChunkRef { + refs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks)) + for _, chunk := range p.currentBlob.chunks { + refs = append(refs, &BlobChunkRef{ + ChunkHash: chunk.Hash, Offset: chunk.Offset, Length: chunk.Size, + }) + } + return refs +} + +// commitBlobToDatabase inserts pending chunks, blob_chunks, and updates the blob record +func (p *Packer) commitBlobToDatabase(blobHash string, finalSize int64, chunksToInsert []PendingChunk) error { + if p.repos == nil { + return nil + } + + blobIDTyped, parseErr := types.ParseBlobID(p.currentBlob.id) + if parseErr != nil { + p.cleanupTempFile() + return fmt.Errorf("parsing blob ID: %w", parseErr) + } + + err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error { + for _, chunk := range chunksToInsert { + dbChunk := &database.Chunk{ChunkHash: types.ChunkHash(chunk.Hash), Size: chunk.Size} + if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil { + return fmt.Errorf("creating chunk: %w", err) + } + } + + for _, chunk := range p.currentBlob.chunks { + blobChunk := &database.BlobChunk{ + BlobID: blobIDTyped, ChunkHash: types.ChunkHash(chunk.Hash), + Offset: chunk.Offset, Length: chunk.Size, + } + if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil { + return fmt.Errorf("creating blob_chunk: %w", err) + } + } + + return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash, p.currentBlob.size, finalSize) + }) + if err != nil { + p.cleanupTempFile() + return fmt.Errorf("finalizing blob transaction: %w", err) + } + + log.Debug("Committed blob transaction", + "chunks_inserted", len(chunksToInsert), "blob_chunks_inserted", len(p.currentBlob.chunks)) + return nil +} + +// deliverFinishedBlob passes the blob to the handler or stores it internally +func (p *Packer) deliverFinishedBlob(finished *FinishedBlob, insertedChunkHashes []string) error { if p.blobHandler != nil { - // Reset file position for handler if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { p.cleanupTempFile() return fmt.Errorf("seeking for handler: %w", err) } - // Create a blob reader that includes the data stream blobWithReader := &BlobWithReader{ FinishedBlob: finished, Reader: p.currentBlob.tempFile, @@ -497,30 +496,26 @@ func (p *Packer) finalizeCurrentBlob() error { p.cleanupTempFile() return fmt.Errorf("blob handler failed: %w", err) } - // Note: blob handler is responsible for closing/cleaning up temp file - p.currentBlob = nil - } else { - log.Debug("No blob handler callback configured", "blob_hash", blobHash[:8]+"...") - // No handler, need to read data for legacy behavior - if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { - p.cleanupTempFile() - return fmt.Errorf("seeking to read data: %w", err) - } - - data, err := io.ReadAll(p.currentBlob.tempFile) - if err != nil { - p.cleanupTempFile() - return fmt.Errorf("reading blob data: %w", err) - } - finished.Data = data - - p.finishedBlobs = append(p.finishedBlobs, finished) - - // Cleanup - p.cleanupTempFile() p.currentBlob = nil + return nil } + // No handler - read data for legacy behavior + log.Debug("No blob handler callback configured", "blob_hash", finished.Hash[:8]+"...") + if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil { + p.cleanupTempFile() + return fmt.Errorf("seeking to read data: %w", err) + } + + data, err := io.ReadAll(p.currentBlob.tempFile) + if err != nil { + p.cleanupTempFile() + return fmt.Errorf("reading blob data: %w", err) + } + finished.Data = data + p.finishedBlobs = append(p.finishedBlobs, finished) + p.cleanupTempFile() + p.currentBlob = nil return nil } diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index ca403b4..74391d1 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -931,40 +931,99 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { startTime := time.Now().UTC() finishedBlob := blobWithReader.FinishedBlob - // Report upload start and increment blobs created if s.progress != nil { s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed) s.progress.GetStats().BlobsCreated.Add(1) } - // Upload to storage first (without holding any locks) - // Use scan context for cancellation support ctx := s.scanCtx if ctx == nil { ctx = context.Background() } - // Track bytes uploaded for accurate speed calculation + blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash) + blobExists := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime) + + if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil { + s.cleanupBlobTempFile(blobWithReader) + return err + } + + s.cleanupBlobTempFile(blobWithReader) + + // Chunks from this blob are now committed to DB - remove from pending set + s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes) + + // Flush files whose chunks are now all committed + if err := s.flushCompletedPendingFiles(ctx); err != nil { + return fmt.Errorf("flushing completed files: %w", err) + } + + return nil +} + +// uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed +func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) bool { + finishedBlob := blobWithReader.FinishedBlob + + // Check if blob already exists (deduplication after restart) + if _, err := s.storage.Stat(ctx, blobPath); err == nil { + log.Info("Blob already exists in storage, skipping upload", + "hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed))) + fmt.Printf("Blob exists: %s (%s, skipped upload)\n", + finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed))) + return true + } + + progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob) + + if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { + log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err) + return false + } + + uploadDuration := time.Since(startTime) + uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds() + + fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n", + finishedBlob.Hash[:12]+"...", + humanize.Bytes(uint64(finishedBlob.Compressed)), + humanize.Bytes(uint64(uploadSpeedBps)), + uploadDuration.Round(time.Millisecond)) + + log.Info("Successfully uploaded blob to storage", + "path", blobPath, + "size", humanize.Bytes(uint64(finishedBlob.Compressed)), + "duration", uploadDuration, + "speed", humanize.SI(uploadSpeedBps*8, "bps")) + + if s.progress != nil { + s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration) + stats := s.progress.GetStats() + stats.BlobsUploaded.Add(1) + stats.BytesUploaded.Add(finishedBlob.Compressed) + } + + return false +} + +// makeUploadProgressCallback creates a progress callback for blob uploads +func (s *Scanner) makeUploadProgressCallback(ctx context.Context, finishedBlob *blob.FinishedBlob) func(int64) error { lastProgressTime := time.Now() lastProgressBytes := int64(0) - progressCallback := func(uploaded int64) error { - // Calculate instantaneous speed + return func(uploaded int64) error { now := time.Now() elapsed := now.Sub(lastProgressTime).Seconds() - if elapsed > 0.5 { // Update speed every 0.5 seconds + if elapsed > 0.5 { bytesSinceLastUpdate := uploaded - lastProgressBytes speed := float64(bytesSinceLastUpdate) / elapsed - if s.progress != nil { s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed) } - lastProgressTime = now lastProgressBytes = uploaded } - - // Check for cancellation select { case <-ctx.Done(): return ctx.Err() @@ -972,87 +1031,26 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { return nil } } +} - // Create sharded path: blobs/ca/fe/cafebabe... - blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash) - - // Check if blob already exists in remote storage (deduplication after restart) - blobExists := false - if _, err := s.storage.Stat(ctx, blobPath); err == nil { - blobExists = true - log.Info("Blob already exists in storage, skipping upload", - "hash", finishedBlob.Hash, - "size", humanize.Bytes(uint64(finishedBlob.Compressed))) - fmt.Printf("Blob exists: %s (%s, skipped upload)\n", - finishedBlob.Hash[:12]+"...", - humanize.Bytes(uint64(finishedBlob.Compressed))) - } - - if !blobExists { - if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { - return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err) - } - - uploadDuration := time.Since(startTime) - - // Calculate upload speed - uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds() - - // Print blob stored message - fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n", - finishedBlob.Hash[:12]+"...", - humanize.Bytes(uint64(finishedBlob.Compressed)), - humanize.Bytes(uint64(uploadSpeedBps)), - uploadDuration.Round(time.Millisecond)) - - // Log upload stats - uploadSpeedBits := uploadSpeedBps * 8 // bits per second - log.Info("Successfully uploaded blob to storage", - "path", blobPath, - "size", humanize.Bytes(uint64(finishedBlob.Compressed)), - "duration", uploadDuration, - "speed", humanize.SI(uploadSpeedBits, "bps")) - - // Report upload complete - if s.progress != nil { - s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration) - } - - // Update progress after upload completes - if s.progress != nil { - stats := s.progress.GetStats() - stats.BlobsUploaded.Add(1) - stats.BytesUploaded.Add(finishedBlob.Compressed) - } - } - - // Store metadata in database (after upload is complete) - dbCtx := s.scanCtx - if dbCtx == nil { - dbCtx = context.Background() - } - - // Parse blob ID for typed operations +// recordBlobMetadata stores blob upload metadata in the database +func (s *Scanner) recordBlobMetadata(ctx context.Context, finishedBlob *blob.FinishedBlob, blobExists bool, startTime time.Time) error { finishedBlobID, err := types.ParseBlobID(finishedBlob.ID) if err != nil { return fmt.Errorf("parsing finished blob ID: %w", err) } - // Track upload duration (0 if blob already existed) uploadDuration := time.Since(startTime) - err = s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error { - // Update blob upload timestamp - if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil { + return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error { + if err := s.repos.Blobs.UpdateUploaded(txCtx, tx, finishedBlob.ID); err != nil { return fmt.Errorf("updating blob upload timestamp: %w", err) } - // Add the blob to the snapshot - if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil { + if err := s.repos.Snapshots.AddBlob(txCtx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil { return fmt.Errorf("adding blob to snapshot: %w", err) } - // Record upload metrics (only for actual uploads, not deduplicated blobs) if !blobExists { upload := &database.Upload{ BlobHash: finishedBlob.Hash, @@ -1061,15 +1059,17 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { Size: finishedBlob.Compressed, DurationMs: uploadDuration.Milliseconds(), } - if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil { + if err := s.repos.Uploads.Create(txCtx, tx, upload); err != nil { return fmt.Errorf("recording upload metrics: %w", err) } } return nil }) +} - // Cleanup temp file if needed +// cleanupBlobTempFile closes and removes the blob's temporary file +func (s *Scanner) cleanupBlobTempFile(blobWithReader *blob.BlobWithReader) { if blobWithReader.TempFile != nil { tempName := blobWithReader.TempFile.Name() if err := blobWithReader.TempFile.Close(); err != nil { @@ -1079,77 +1079,41 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { log.Fatal("Failed to remove temp file", "file", tempName, "error", err) } } +} - if err != nil { - return err - } - - // Chunks from this blob are now committed to DB - remove from pending set - log.Debug("handleBlobReady: removing pending chunk hashes") - s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes) - log.Debug("handleBlobReady: removed pending chunk hashes") - - // Flush files whose chunks are now all committed - // This maintains database consistency after each blob - log.Debug("handleBlobReady: calling flushCompletedPendingFiles") - if err := s.flushCompletedPendingFiles(dbCtx); err != nil { - return fmt.Errorf("flushing completed files: %w", err) - } - log.Debug("handleBlobReady: flushCompletedPendingFiles returned") - - log.Debug("handleBlobReady: complete") - return nil +// streamingChunkInfo tracks chunk metadata collected during streaming +type streamingChunkInfo struct { + fileChunk database.FileChunk + offset int64 + size int64 } // processFileStreaming processes a file by streaming chunks directly to the packer func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error { - // Open the file file, err := s.fs.Open(fileToProcess.Path) if err != nil { return fmt.Errorf("opening file: %w", err) } defer func() { _ = file.Close() }() - // We'll collect file chunks for database storage - // but process them for packing as we go - type chunkInfo struct { - fileChunk database.FileChunk - offset int64 - size int64 - } - var chunks []chunkInfo + var chunks []streamingChunkInfo chunkIndex := 0 - // Process chunks in streaming fashion and get full file hash fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error { - // Check for cancellation select { case <-ctx.Done(): return ctx.Err() default: } - log.Debug("Processing content-defined chunk from file", - "file", fileToProcess.Path, - "chunk_index", chunkIndex, - "hash", chunk.Hash, - "size", chunk.Size) - - // Check if chunk already exists (fast in-memory lookup) chunkExists := s.chunkExists(chunk.Hash) - - // Queue new chunks for batch insert when blob finalizes - // This dramatically reduces transaction overhead if !chunkExists { s.packer.AddPendingChunk(chunk.Hash, chunk.Size) - // Add to in-memory cache immediately for fast duplicate detection s.addKnownChunk(chunk.Hash) - // Track as pending until blob finalizes and commits to DB s.addPendingChunkHash(chunk.Hash) } - // Track file chunk association for later storage - chunks = append(chunks, chunkInfo{ + chunks = append(chunks, streamingChunkInfo{ fileChunk: database.FileChunk{ FileID: fileToProcess.File.ID, Idx: chunkIndex, @@ -1159,55 +1123,15 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT size: chunk.Size, }) - // Update stats - if chunkExists { - result.FilesSkipped++ // Track as skipped for now - result.BytesSkipped += chunk.Size - if s.progress != nil { - s.progress.GetStats().BytesSkipped.Add(chunk.Size) - } - } else { - result.ChunksCreated++ - result.BytesScanned += chunk.Size - if s.progress != nil { - s.progress.GetStats().ChunksCreated.Add(1) - s.progress.GetStats().BytesProcessed.Add(chunk.Size) - s.progress.UpdateChunkingActivity() - } - } + s.updateChunkStats(chunkExists, chunk.Size, result) - // Add chunk to packer immediately (streaming) - // This happens outside the database transaction if !chunkExists { - s.packerMu.Lock() - err := s.packer.AddChunk(&blob.ChunkRef{ - Hash: chunk.Hash, - Data: chunk.Data, - }) - if err == blob.ErrBlobSizeLimitExceeded { - // Finalize current blob and retry - if err := s.packer.FinalizeBlob(); err != nil { - s.packerMu.Unlock() - return fmt.Errorf("finalizing blob: %w", err) - } - // Retry adding the chunk - if err := s.packer.AddChunk(&blob.ChunkRef{ - Hash: chunk.Hash, - Data: chunk.Data, - }); err != nil { - s.packerMu.Unlock() - return fmt.Errorf("adding chunk after finalize: %w", err) - } - } else if err != nil { - s.packerMu.Unlock() - return fmt.Errorf("adding chunk to packer: %w", err) + if err := s.addChunkToPacker(chunk); err != nil { + return err } - s.packerMu.Unlock() } - // Clear chunk data from memory immediately after use chunk.Data = nil - chunkIndex++ return nil }) @@ -1217,12 +1141,54 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT } log.Debug("Completed snapshotting file", - "path", fileToProcess.Path, - "file_hash", fileHash, - "chunks", len(chunks)) + "path", fileToProcess.Path, "file_hash", fileHash, "chunks", len(chunks)) - // Build file data for batch insertion - // Update chunk associations with the file ID + s.queueFileForBatchInsert(ctx, fileToProcess, chunks) + return nil +} + +// updateChunkStats updates scan result and progress stats for a processed chunk +func (s *Scanner) updateChunkStats(chunkExists bool, chunkSize int64, result *ScanResult) { + if chunkExists { + result.FilesSkipped++ + result.BytesSkipped += chunkSize + if s.progress != nil { + s.progress.GetStats().BytesSkipped.Add(chunkSize) + } + } else { + result.ChunksCreated++ + result.BytesScanned += chunkSize + if s.progress != nil { + s.progress.GetStats().ChunksCreated.Add(1) + s.progress.GetStats().BytesProcessed.Add(chunkSize) + s.progress.UpdateChunkingActivity() + } + } +} + +// addChunkToPacker adds a chunk to the blob packer, finalizing the current blob if needed +func (s *Scanner) addChunkToPacker(chunk chunker.Chunk) error { + s.packerMu.Lock() + err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data}) + if err == blob.ErrBlobSizeLimitExceeded { + if err := s.packer.FinalizeBlob(); err != nil { + s.packerMu.Unlock() + return fmt.Errorf("finalizing blob: %w", err) + } + if err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data}); err != nil { + s.packerMu.Unlock() + return fmt.Errorf("adding chunk after finalize: %w", err) + } + } else if err != nil { + s.packerMu.Unlock() + return fmt.Errorf("adding chunk to packer: %w", err) + } + s.packerMu.Unlock() + return nil +} + +// queueFileForBatchInsert builds file/chunk associations and queues the file for batch DB insert +func (s *Scanner) queueFileForBatchInsert(ctx context.Context, fileToProcess *FileToProcess, chunks []streamingChunkInfo) { fileChunks := make([]database.FileChunk, len(chunks)) chunkFiles := make([]database.ChunkFile, len(chunks)) for i, ci := range chunks { @@ -1239,14 +1205,11 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT } } - // Queue file for batch insertion - // Files will be flushed when their chunks are committed (after blob finalize) s.addPendingFile(ctx, pendingFileData{ file: fileToProcess.File, fileChunks: fileChunks, chunkFiles: chunkFiles, }) - return nil } // GetProgress returns the progress reporter for this scanner diff --git a/internal/vaultik/info.go b/internal/vaultik/info.go index 53cfc2c..ab3343b 100644 --- a/internal/vaultik/info.go +++ b/internal/vaultik/info.go @@ -151,7 +151,6 @@ type RemoteInfoResult struct { func (v *Vaultik) RemoteInfo(jsonOutput bool) error { result := &RemoteInfoResult{} - // Get storage info storageInfo := v.Storage.Info() result.StorageType = storageInfo.Type result.StorageLocation = storageInfo.Location @@ -161,23 +160,46 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error { v.printfStdout("Type: %s\n", storageInfo.Type) v.printfStdout("Location: %s\n", storageInfo.Location) v.printlnStdout() - } - - // List all snapshot metadata - if !jsonOutput { v.printfStdout("Scanning snapshot metadata...\n") } + snapshotMetadata, snapshotIDs, err := v.collectSnapshotMetadata() + if err != nil { + return err + } + + if !jsonOutput { + v.printfStdout("Downloading %d manifest(s)...\n", len(snapshotIDs)) + } + + referencedBlobs := v.collectReferencedBlobsFromManifests(snapshotIDs, snapshotMetadata) + + v.populateRemoteInfoResult(result, snapshotMetadata, snapshotIDs, referencedBlobs) + + if err := v.scanRemoteBlobStorage(result, referencedBlobs, jsonOutput); err != nil { + return err + } + + if jsonOutput { + enc := json.NewEncoder(v.Stdout) + enc.SetIndent("", " ") + return enc.Encode(result) + } + + v.printRemoteInfoTable(result) + return nil +} + +// collectSnapshotMetadata scans remote metadata and returns per-snapshot info and sorted IDs +func (v *Vaultik) collectSnapshotMetadata() (map[string]*SnapshotMetadataInfo, []string, error) { snapshotMetadata := make(map[string]*SnapshotMetadataInfo) - // Collect metadata files metadataCh := v.Storage.ListStream(v.ctx, "metadata/") for obj := range metadataCh { if obj.Err != nil { - return fmt.Errorf("listing metadata: %w", obj.Err) + return nil, nil, fmt.Errorf("listing metadata: %w", obj.Err) } - // Parse key: metadata// parts := strings.Split(obj.Key, "/") if len(parts) < 3 { continue @@ -185,14 +207,11 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error { snapshotID := parts[1] if _, exists := snapshotMetadata[snapshotID]; !exists { - snapshotMetadata[snapshotID] = &SnapshotMetadataInfo{ - SnapshotID: snapshotID, - } + snapshotMetadata[snapshotID] = &SnapshotMetadataInfo{SnapshotID: snapshotID} } info := snapshotMetadata[snapshotID] filename := parts[2] - if strings.HasPrefix(filename, "manifest") { info.ManifestSize = obj.Size } else if strings.HasPrefix(filename, "db") { @@ -201,19 +220,18 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error { info.TotalSize = info.ManifestSize + info.DatabaseSize } - // Sort snapshots by ID for consistent output var snapshotIDs []string for id := range snapshotMetadata { snapshotIDs = append(snapshotIDs, id) } sort.Strings(snapshotIDs) - // Download and parse all manifests to get referenced blobs - if !jsonOutput { - v.printfStdout("Downloading %d manifest(s)...\n", len(snapshotIDs)) - } + return snapshotMetadata, snapshotIDs, nil +} - referencedBlobs := make(map[string]int64) // hash -> compressed size +// collectReferencedBlobsFromManifests downloads manifests and returns referenced blob hashes with sizes +func (v *Vaultik) collectReferencedBlobsFromManifests(snapshotIDs []string, snapshotMetadata map[string]*SnapshotMetadataInfo) map[string]int64 { + referencedBlobs := make(map[string]int64) for _, snapshotID := range snapshotIDs { manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) @@ -230,10 +248,8 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error { continue } - // Record blob info from manifest info := snapshotMetadata[snapshotID] info.BlobCount = manifest.BlobCount - var blobsSize int64 for _, blob := range manifest.Blobs { referencedBlobs[blob.Hash] = blob.CompressedSize @@ -242,7 +258,11 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error { info.BlobsSize = blobsSize } - // Build result snapshots + return referencedBlobs +} + +// populateRemoteInfoResult fills in the result's snapshot and referenced blob stats +func (v *Vaultik) populateRemoteInfoResult(result *RemoteInfoResult, snapshotMetadata map[string]*SnapshotMetadataInfo, snapshotIDs []string, referencedBlobs map[string]int64) { var totalMetadataSize int64 for _, id := range snapshotIDs { info := snapshotMetadata[id] @@ -252,26 +272,25 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error { result.TotalMetadataSize = totalMetadataSize result.TotalMetadataCount = len(snapshotIDs) - // Calculate referenced blob stats for _, size := range referencedBlobs { result.ReferencedBlobCount++ result.ReferencedBlobSize += size } +} - // List all blobs on remote +// scanRemoteBlobStorage lists all blobs on remote and computes orphan stats +func (v *Vaultik) scanRemoteBlobStorage(result *RemoteInfoResult, referencedBlobs map[string]int64, jsonOutput bool) error { if !jsonOutput { v.printfStdout("Scanning blobs...\n") } - allBlobs := make(map[string]int64) // hash -> size from storage - blobCh := v.Storage.ListStream(v.ctx, "blobs/") + allBlobs := make(map[string]int64) + for obj := range blobCh { if obj.Err != nil { return fmt.Errorf("listing blobs: %w", obj.Err) } - - // Extract hash from key: blobs/xx/yy/hash parts := strings.Split(obj.Key, "/") if len(parts) < 4 { continue @@ -282,7 +301,6 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error { result.TotalBlobSize += obj.Size } - // Calculate orphaned blobs for hash, size := range allBlobs { if _, referenced := referencedBlobs[hash]; !referenced { result.OrphanedBlobCount++ @@ -290,14 +308,11 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error { } } - // Output results - if jsonOutput { - enc := json.NewEncoder(v.Stdout) - enc.SetIndent("", " ") - return enc.Encode(result) - } + return nil +} - // Human-readable output +// printRemoteInfoTable renders the human-readable remote info output +func (v *Vaultik) printRemoteInfoTable(result *RemoteInfoResult) { v.printfStdout("\n=== Snapshot Metadata ===\n") if len(result.Snapshots) == 0 { v.printfStdout("No snapshots found\n") @@ -320,20 +335,15 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error { v.printfStdout("\n=== Blob Storage ===\n") v.printfStdout("Total blobs on remote: %s (%s)\n", - humanize.Comma(int64(result.TotalBlobCount)), - humanize.Bytes(uint64(result.TotalBlobSize))) + humanize.Comma(int64(result.TotalBlobCount)), humanize.Bytes(uint64(result.TotalBlobSize))) v.printfStdout("Referenced by snapshots: %s (%s)\n", - humanize.Comma(int64(result.ReferencedBlobCount)), - humanize.Bytes(uint64(result.ReferencedBlobSize))) + humanize.Comma(int64(result.ReferencedBlobCount)), humanize.Bytes(uint64(result.ReferencedBlobSize))) v.printfStdout("Orphaned (unreferenced): %s (%s)\n", - humanize.Comma(int64(result.OrphanedBlobCount)), - humanize.Bytes(uint64(result.OrphanedBlobSize))) + humanize.Comma(int64(result.OrphanedBlobCount)), humanize.Bytes(uint64(result.OrphanedBlobSize))) if result.OrphanedBlobCount > 0 { v.printfStdout("\nRun 'vaultik prune --remote' to remove orphaned blobs.\n") } - - return nil } // truncateString truncates a string to maxLen, adding "..." if truncated diff --git a/internal/vaultik/prune.go b/internal/vaultik/prune.go index dff9dd9..2aefc48 100644 --- a/internal/vaultik/prune.go +++ b/internal/vaultik/prune.go @@ -27,95 +27,19 @@ type PruneBlobsResult struct { func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { log.Info("Starting prune operation") - // Get all remote snapshots and their manifests - allBlobsReferenced := make(map[string]bool) - manifestCount := 0 - - // List all snapshots in storage - log.Info("Listing remote snapshots") - objectCh := v.Storage.ListStream(v.ctx, "metadata/") - - var snapshotIDs []string - for object := range objectCh { - if object.Err != nil { - return fmt.Errorf("listing remote snapshots: %w", object.Err) - } - - // Extract snapshot ID from paths like metadata/hostname-20240115-143052Z/ - parts := strings.Split(object.Key, "/") - if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" { - // Check if this is a directory by looking for trailing slash - if strings.HasSuffix(object.Key, "/") || strings.Contains(object.Key, "/manifest.json.zst") { - snapshotID := parts[1] - // Only add unique snapshot IDs - found := false - for _, id := range snapshotIDs { - if id == snapshotID { - found = true - break - } - } - if !found { - snapshotIDs = append(snapshotIDs, snapshotID) - } - } - } + allBlobsReferenced, err := v.collectReferencedBlobs() + if err != nil { + return err } - log.Info("Found manifests in remote storage", "count", len(snapshotIDs)) - - // Download and parse each manifest to get referenced blobs - for _, snapshotID := range snapshotIDs { - log.Debug("Processing manifest", "snapshot_id", snapshotID) - - manifest, err := v.downloadManifest(snapshotID) - if err != nil { - log.Error("Failed to download manifest", "snapshot_id", snapshotID, "error", err) - continue - } - - // Add all blobs from this manifest to our referenced set - for _, blob := range manifest.Blobs { - allBlobsReferenced[blob.Hash] = true - } - manifestCount++ + allBlobs, err := v.listAllRemoteBlobs() + if err != nil { + return err } - log.Info("Processed manifests", "count", manifestCount, "unique_blobs_referenced", len(allBlobsReferenced)) + unreferencedBlobs, totalSize := v.findUnreferencedBlobs(allBlobs, allBlobsReferenced) - // List all blobs in storage - log.Info("Listing all blobs in storage") - allBlobs := make(map[string]int64) // hash -> size - blobObjectCh := v.Storage.ListStream(v.ctx, "blobs/") - - for object := range blobObjectCh { - if object.Err != nil { - return fmt.Errorf("listing blobs: %w", object.Err) - } - - // Extract hash from path like blobs/ab/cd/abcdef123456... - parts := strings.Split(object.Key, "/") - if len(parts) == 4 && parts[0] == "blobs" { - hash := parts[3] - allBlobs[hash] = object.Size - } - } - - log.Info("Found blobs in storage", "count", len(allBlobs)) - - // Find unreferenced blobs - var unreferencedBlobs []string - var totalSize int64 - for hash, size := range allBlobs { - if !allBlobsReferenced[hash] { - unreferencedBlobs = append(unreferencedBlobs, hash) - totalSize += size - } - } - - result := &PruneBlobsResult{ - BlobsFound: len(unreferencedBlobs), - } + result := &PruneBlobsResult{BlobsFound: len(unreferencedBlobs)} if len(unreferencedBlobs) == 0 { log.Info("No unreferenced blobs found") @@ -126,18 +50,15 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { return nil } - // Show what will be deleted log.Info("Found unreferenced blobs", "count", len(unreferencedBlobs), "total_size", humanize.Bytes(uint64(totalSize))) if !opts.JSON { v.printfStdout("Found %d unreferenced blob(s) totaling %s\n", len(unreferencedBlobs), humanize.Bytes(uint64(totalSize))) } - // Confirm unless --force is used (skip in JSON mode - require --force) if !opts.Force && !opts.JSON { v.printfStdout("\nDelete %d unreferenced blob(s)? [y/N] ", len(unreferencedBlobs)) var confirm string if _, err := v.scanStdin(&confirm); err != nil { - // Treat EOF or error as "no" v.printlnStdout("Cancelled") return nil } @@ -147,10 +68,106 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { } } - // Delete unreferenced blobs + v.deleteUnreferencedBlobs(unreferencedBlobs, allBlobs, result) + + if opts.JSON { + return v.outputPruneBlobsJSON(result) + } + + v.printfStdout("\nDeleted %d blob(s) totaling %s\n", result.BlobsDeleted, humanize.Bytes(uint64(result.BytesFreed))) + if result.BlobsFailed > 0 { + v.printfStdout("Failed to delete %d blob(s)\n", result.BlobsFailed) + } + + return nil +} + +// collectReferencedBlobs downloads all manifests and returns the set of referenced blob hashes +func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) { + log.Info("Listing remote snapshots") + snapshotIDs := v.listUniqueSnapshotIDs() + log.Info("Found manifests in remote storage", "count", len(snapshotIDs)) + + allBlobsReferenced := make(map[string]bool) + manifestCount := 0 + + for _, snapshotID := range snapshotIDs { + log.Debug("Processing manifest", "snapshot_id", snapshotID) + manifest, err := v.downloadManifest(snapshotID) + if err != nil { + log.Error("Failed to download manifest", "snapshot_id", snapshotID, "error", err) + continue + } + for _, blob := range manifest.Blobs { + allBlobsReferenced[blob.Hash] = true + } + manifestCount++ + } + + log.Info("Processed manifests", "count", manifestCount, "unique_blobs_referenced", len(allBlobsReferenced)) + return allBlobsReferenced, nil +} + +// listUniqueSnapshotIDs returns deduplicated snapshot IDs from remote metadata +func (v *Vaultik) listUniqueSnapshotIDs() []string { + objectCh := v.Storage.ListStream(v.ctx, "metadata/") + seen := make(map[string]bool) + var snapshotIDs []string + + for object := range objectCh { + if object.Err != nil { + continue + } + parts := strings.Split(object.Key, "/") + if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" { + if strings.HasSuffix(object.Key, "/") || strings.Contains(object.Key, "/manifest.json.zst") { + snapshotID := parts[1] + if !seen[snapshotID] { + seen[snapshotID] = true + snapshotIDs = append(snapshotIDs, snapshotID) + } + } + } + } + return snapshotIDs +} + +// listAllRemoteBlobs returns a map of all blob hashes to their sizes in remote storage +func (v *Vaultik) listAllRemoteBlobs() (map[string]int64, error) { + log.Info("Listing all blobs in storage") + allBlobs := make(map[string]int64) + blobObjectCh := v.Storage.ListStream(v.ctx, "blobs/") + + for object := range blobObjectCh { + if object.Err != nil { + return nil, fmt.Errorf("listing blobs: %w", object.Err) + } + parts := strings.Split(object.Key, "/") + if len(parts) == 4 && parts[0] == "blobs" { + allBlobs[parts[3]] = object.Size + } + } + + log.Info("Found blobs in storage", "count", len(allBlobs)) + return allBlobs, nil +} + +// findUnreferencedBlobs returns blob hashes not referenced by any manifest and their total size +func (v *Vaultik) findUnreferencedBlobs(allBlobs map[string]int64, referenced map[string]bool) ([]string, int64) { + var unreferenced []string + var totalSize int64 + for hash, size := range allBlobs { + if !referenced[hash] { + unreferenced = append(unreferenced, hash) + totalSize += size + } + } + return unreferenced, totalSize +} + +// deleteUnreferencedBlobs deletes the given blobs from storage and populates the result +func (v *Vaultik) deleteUnreferencedBlobs(unreferencedBlobs []string, allBlobs map[string]int64, result *PruneBlobsResult) { log.Info("Deleting unreferenced blobs") - deletedCount := 0 - deletedSize := int64(0) for i, hash := range unreferencedBlobs { blobPath := fmt.Sprintf("blobs/%s/%s/%s", hash[:2], hash[2:4], hash) @@ -160,10 +177,9 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { continue } - deletedCount++ - deletedSize += allBlobs[hash] + result.BlobsDeleted++ + result.BytesFreed += allBlobs[hash] - // Progress update every 100 blobs if (i+1)%100 == 0 || i == len(unreferencedBlobs)-1 { log.Info("Deletion progress", "deleted", i+1, @@ -173,26 +189,13 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { } } - result.BlobsDeleted = deletedCount - result.BlobsFailed = len(unreferencedBlobs) - deletedCount - result.BytesFreed = deletedSize + result.BlobsFailed = len(unreferencedBlobs) - result.BlobsDeleted log.Info("Prune complete", - "deleted_count", deletedCount, - "deleted_size", humanize.Bytes(uint64(deletedSize)), - "failed", len(unreferencedBlobs)-deletedCount, + "deleted_count", result.BlobsDeleted, + "deleted_size", humanize.Bytes(uint64(result.BytesFreed)), + "failed", result.BlobsFailed, ) - - if opts.JSON { - return v.outputPruneBlobsJSON(result) - } - - v.printfStdout("\nDeleted %d blob(s) totaling %s\n", deletedCount, humanize.Bytes(uint64(deletedSize))) - if deletedCount < len(unreferencedBlobs) { - v.printfStdout("Failed to delete %d blob(s)\n", len(unreferencedBlobs)-deletedCount) - } - - return nil } // outputPruneBlobsJSON outputs the prune result as JSON diff --git a/internal/vaultik/snapshot.go b/internal/vaultik/snapshot.go index e0d93b2..2943e6a 100644 --- a/internal/vaultik/snapshot.go +++ b/internal/vaultik/snapshot.go @@ -111,40 +111,34 @@ func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error { return nil } +// snapshotStats tracks aggregate statistics across directory scans +type snapshotStats struct { + totalFiles int + totalBytes int64 + totalChunks int + totalBlobs int + totalBytesSkipped int64 + totalFilesSkipped int + totalFilesDeleted int + totalBytesDeleted int64 + totalBytesUploaded int64 + totalBlobsUploaded int + uploadDuration time.Duration +} + // createNamedSnapshot creates a single named snapshot func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, snapName string, idx, total int) error { snapshotStartTime := time.Now() - snapConfig := v.Config.Snapshots[snapName] - if total > 1 { v.printfStdout("\n=== Snapshot %d/%d: %s ===\n", idx, total, snapName) } - // Resolve source directories to absolute paths - resolvedDirs := make([]string, 0, len(snapConfig.Paths)) - for _, dir := range snapConfig.Paths { - absPath, err := filepath.Abs(dir) - if err != nil { - return fmt.Errorf("failed to resolve absolute path for %s: %w", dir, err) - } - - // Resolve symlinks - resolvedPath, err := filepath.EvalSymlinks(absPath) - if err != nil { - // If the path doesn't exist yet, use the absolute path - if os.IsNotExist(err) { - resolvedPath = absPath - } else { - return fmt.Errorf("failed to resolve symlinks for %s: %w", absPath, err) - } - } - - resolvedDirs = append(resolvedDirs, resolvedPath) + resolvedDirs, err := v.resolveSnapshotPaths(snapName) + if err != nil { + return err } - // Create scanner with progress enabled (unless in cron mode) - // Pass the combined excludes for this snapshot scanner := v.ScannerFactory(snapshot.ScannerParams{ EnableProgress: !opts.Cron, Fs: v.Fs, @@ -152,20 +146,6 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna SkipErrors: opts.SkipErrors, }) - // Statistics tracking - totalFiles := 0 - totalBytes := int64(0) - totalChunks := 0 - totalBlobs := 0 - totalBytesSkipped := int64(0) - totalFilesSkipped := 0 - totalFilesDeleted := 0 - totalBytesDeleted := int64(0) - totalBytesUploaded := int64(0) - totalBlobsUploaded := 0 - uploadDuration := time.Duration(0) - - // Create a new snapshot at the beginning (with snapshot name in ID) snapshotID, err := v.SnapshotManager.CreateSnapshotWithName(v.ctx, hostname, snapName, v.Globals.Version, v.Globals.Commit) if err != nil { return fmt.Errorf("creating snapshot: %w", err) @@ -173,12 +153,56 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna log.Info("Beginning snapshot", "snapshot_id", snapshotID, "name", snapName) v.printfStdout("Beginning snapshot: %s\n", snapshotID) + stats, err := v.scanAllDirectories(scanner, resolvedDirs, snapshotID) + if err != nil { + return err + } + + v.collectUploadStats(scanner, stats) + + if err := v.finalizeSnapshotMetadata(snapshotID, stats); err != nil { + return err + } + + v.printSnapshotSummary(snapshotID, snapshotStartTime, stats) + return nil +} + +// resolveSnapshotPaths resolves source directories to absolute paths with symlink resolution +func (v *Vaultik) resolveSnapshotPaths(snapName string) ([]string, error) { + snapConfig := v.Config.Snapshots[snapName] + resolvedDirs := make([]string, 0, len(snapConfig.Paths)) + + for _, dir := range snapConfig.Paths { + absPath, err := filepath.Abs(dir) + if err != nil { + return nil, fmt.Errorf("failed to resolve absolute path for %s: %w", dir, err) + } + + resolvedPath, err := filepath.EvalSymlinks(absPath) + if err != nil { + if os.IsNotExist(err) { + resolvedPath = absPath + } else { + return nil, fmt.Errorf("failed to resolve symlinks for %s: %w", absPath, err) + } + } + + resolvedDirs = append(resolvedDirs, resolvedPath) + } + + return resolvedDirs, nil +} + +// scanAllDirectories runs the scanner on each resolved directory and accumulates stats +func (v *Vaultik) scanAllDirectories(scanner *snapshot.Scanner, resolvedDirs []string, snapshotID string) (*snapshotStats, error) { + stats := &snapshotStats{} + for i, dir := range resolvedDirs { - // Check if context is cancelled select { case <-v.ctx.Done(): log.Info("Snapshot creation cancelled") - return v.ctx.Err() + return nil, v.ctx.Err() default: } @@ -186,17 +210,17 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna v.printfStdout("Beginning directory scan (%d/%d): %s\n", i+1, len(resolvedDirs), dir) result, err := scanner.Scan(v.ctx, dir, snapshotID) if err != nil { - return fmt.Errorf("failed to scan %s: %w", dir, err) + return nil, fmt.Errorf("failed to scan %s: %w", dir, err) } - totalFiles += result.FilesScanned - totalBytes += result.BytesScanned - totalChunks += result.ChunksCreated - totalBlobs += result.BlobsCreated - totalFilesSkipped += result.FilesSkipped - totalBytesSkipped += result.BytesSkipped - totalFilesDeleted += result.FilesDeleted - totalBytesDeleted += result.BytesDeleted + stats.totalFiles += result.FilesScanned + stats.totalBytes += result.BytesScanned + stats.totalChunks += result.ChunksCreated + stats.totalBlobs += result.BlobsCreated + stats.totalFilesSkipped += result.FilesSkipped + stats.totalBytesSkipped += result.BytesSkipped + stats.totalFilesDeleted += result.FilesDeleted + stats.totalBytesDeleted += result.BytesDeleted log.Info("Directory scan complete", "path", dir, @@ -207,85 +231,79 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna "chunks", result.ChunksCreated, "blobs", result.BlobsCreated, "duration", result.EndTime.Sub(result.StartTime)) - - // Remove per-directory summary - the scanner already prints its own summary } - // Get upload statistics from scanner progress if available + return stats, nil +} + +// collectUploadStats gathers upload statistics from the scanner's progress reporter +func (v *Vaultik) collectUploadStats(scanner *snapshot.Scanner, stats *snapshotStats) { if s := scanner.GetProgress(); s != nil { - stats := s.GetStats() - totalBytesUploaded = stats.BytesUploaded.Load() - totalBlobsUploaded = int(stats.BlobsUploaded.Load()) - uploadDuration = time.Duration(stats.UploadDurationMs.Load()) * time.Millisecond + progressStats := s.GetStats() + stats.totalBytesUploaded = progressStats.BytesUploaded.Load() + stats.totalBlobsUploaded = int(progressStats.BlobsUploaded.Load()) + stats.uploadDuration = time.Duration(progressStats.UploadDurationMs.Load()) * time.Millisecond } +} - // Update snapshot statistics with extended fields +// finalizeSnapshotMetadata updates stats, marks complete, and exports metadata +func (v *Vaultik) finalizeSnapshotMetadata(snapshotID string, stats *snapshotStats) error { extStats := snapshot.ExtendedBackupStats{ BackupStats: snapshot.BackupStats{ - FilesScanned: totalFiles, - BytesScanned: totalBytes, - ChunksCreated: totalChunks, - BlobsCreated: totalBlobs, - BytesUploaded: totalBytesUploaded, + FilesScanned: stats.totalFiles, + BytesScanned: stats.totalBytes, + ChunksCreated: stats.totalChunks, + BlobsCreated: stats.totalBlobs, + BytesUploaded: stats.totalBytesUploaded, }, - BlobUncompressedSize: 0, // Will be set from database query below + BlobUncompressedSize: 0, CompressionLevel: v.Config.CompressionLevel, - UploadDurationMs: uploadDuration.Milliseconds(), + UploadDurationMs: stats.uploadDuration.Milliseconds(), } if err := v.SnapshotManager.UpdateSnapshotStatsExtended(v.ctx, snapshotID, extStats); err != nil { return fmt.Errorf("updating snapshot stats: %w", err) } - // Mark snapshot as complete if err := v.SnapshotManager.CompleteSnapshot(v.ctx, snapshotID); err != nil { return fmt.Errorf("completing snapshot: %w", err) } - // Export snapshot metadata - // Export snapshot metadata without closing the database - // The export function should handle its own database connection if err := v.SnapshotManager.ExportSnapshotMetadata(v.ctx, v.Config.IndexPath, snapshotID); err != nil { return fmt.Errorf("exporting snapshot metadata: %w", err) } - // Calculate final statistics - snapshotDuration := time.Since(snapshotStartTime) - totalFilesChanged := totalFiles - totalFilesSkipped - totalBytesChanged := totalBytes - totalBytesAll := totalBytes + totalBytesSkipped + return nil +} - // Calculate upload speed - var avgUploadSpeed string - if totalBytesUploaded > 0 && uploadDuration > 0 { - bytesPerSec := float64(totalBytesUploaded) / uploadDuration.Seconds() - bitsPerSec := bytesPerSec * 8 - if bitsPerSec >= 1e9 { - avgUploadSpeed = fmt.Sprintf("%.1f Gbit/s", bitsPerSec/1e9) - } else if bitsPerSec >= 1e6 { - avgUploadSpeed = fmt.Sprintf("%.0f Mbit/s", bitsPerSec/1e6) - } else if bitsPerSec >= 1e3 { - avgUploadSpeed = fmt.Sprintf("%.0f Kbit/s", bitsPerSec/1e3) - } else { - avgUploadSpeed = fmt.Sprintf("%.0f bit/s", bitsPerSec) - } - } else { - avgUploadSpeed = "N/A" +// formatUploadSpeed formats bytes uploaded and duration into a human-readable speed string +func formatUploadSpeed(bytesUploaded int64, duration time.Duration) string { + if bytesUploaded <= 0 || duration <= 0 { + return "N/A" } + bytesPerSec := float64(bytesUploaded) / duration.Seconds() + bitsPerSec := bytesPerSec * 8 + switch { + case bitsPerSec >= 1e9: + return fmt.Sprintf("%.1f Gbit/s", bitsPerSec/1e9) + case bitsPerSec >= 1e6: + return fmt.Sprintf("%.0f Mbit/s", bitsPerSec/1e6) + case bitsPerSec >= 1e3: + return fmt.Sprintf("%.0f Kbit/s", bitsPerSec/1e3) + default: + return fmt.Sprintf("%.0f bit/s", bitsPerSec) + } +} + +// printSnapshotSummary prints the comprehensive snapshot completion summary +func (v *Vaultik) printSnapshotSummary(snapshotID string, startTime time.Time, stats *snapshotStats) { + snapshotDuration := time.Since(startTime) + totalFilesChanged := stats.totalFiles - stats.totalFilesSkipped + totalBytesAll := stats.totalBytes + stats.totalBytesSkipped // Get total blob sizes from database - totalBlobSizeCompressed := int64(0) - totalBlobSizeUncompressed := int64(0) - if blobHashes, err := v.Repositories.Snapshots.GetBlobHashes(v.ctx, snapshotID); err == nil { - for _, hash := range blobHashes { - if blob, err := v.Repositories.Blobs.GetByHash(v.ctx, hash); err == nil && blob != nil { - totalBlobSizeCompressed += blob.CompressedSize - totalBlobSizeUncompressed += blob.UncompressedSize - } - } - } + totalBlobSizeCompressed, totalBlobSizeUncompressed := v.getSnapshotBlobSizes(snapshotID) - // Calculate compression ratio var compressionRatio float64 if totalBlobSizeUncompressed > 0 { compressionRatio = float64(totalBlobSizeCompressed) / float64(totalBlobSizeUncompressed) @@ -293,55 +311,95 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna compressionRatio = 1.0 } - // Print comprehensive summary v.printfStdout("=== Snapshot Complete ===\n") v.printfStdout("ID: %s\n", snapshotID) v.printfStdout("Files: %s examined, %s to process, %s unchanged", - formatNumber(totalFiles), + formatNumber(stats.totalFiles), formatNumber(totalFilesChanged), - formatNumber(totalFilesSkipped)) - if totalFilesDeleted > 0 { - v.printfStdout(", %s deleted", formatNumber(totalFilesDeleted)) + formatNumber(stats.totalFilesSkipped)) + if stats.totalFilesDeleted > 0 { + v.printfStdout(", %s deleted", formatNumber(stats.totalFilesDeleted)) } v.printlnStdout() v.printfStdout("Data: %s total (%s to process)", humanize.Bytes(uint64(totalBytesAll)), - humanize.Bytes(uint64(totalBytesChanged))) - if totalBytesDeleted > 0 { - v.printfStdout(", %s deleted", humanize.Bytes(uint64(totalBytesDeleted))) + humanize.Bytes(uint64(stats.totalBytes))) + if stats.totalBytesDeleted > 0 { + v.printfStdout(", %s deleted", humanize.Bytes(uint64(stats.totalBytesDeleted))) } v.printlnStdout() - if totalBlobsUploaded > 0 { + if stats.totalBlobsUploaded > 0 { v.printfStdout("Storage: %s compressed from %s (%.2fx)\n", humanize.Bytes(uint64(totalBlobSizeCompressed)), humanize.Bytes(uint64(totalBlobSizeUncompressed)), compressionRatio) v.printfStdout("Upload: %d blobs, %s in %s (%s)\n", - totalBlobsUploaded, - humanize.Bytes(uint64(totalBytesUploaded)), - formatDuration(uploadDuration), - avgUploadSpeed) + stats.totalBlobsUploaded, + humanize.Bytes(uint64(stats.totalBytesUploaded)), + formatDuration(stats.uploadDuration), + formatUploadSpeed(stats.totalBytesUploaded, stats.uploadDuration)) } v.printfStdout("Duration: %s\n", formatDuration(snapshotDuration)) +} - return nil +// getSnapshotBlobSizes returns total compressed and uncompressed blob sizes for a snapshot +func (v *Vaultik) getSnapshotBlobSizes(snapshotID string) (compressed int64, uncompressed int64) { + blobHashes, err := v.Repositories.Snapshots.GetBlobHashes(v.ctx, snapshotID) + if err != nil { + return 0, 0 + } + for _, hash := range blobHashes { + if blob, err := v.Repositories.Blobs.GetByHash(v.ctx, hash); err == nil && blob != nil { + compressed += blob.CompressedSize + uncompressed += blob.UncompressedSize + } + } + return compressed, uncompressed } // ListSnapshots lists all snapshots func (v *Vaultik) ListSnapshots(jsonOutput bool) error { - // Get all remote snapshots + remoteSnapshots, err := v.listRemoteSnapshotIDs() + if err != nil { + return err + } + + localSnapshotMap, err := v.reconcileLocalWithRemote(remoteSnapshots) + if err != nil { + return err + } + + snapshots, err := v.buildSnapshotInfoList(remoteSnapshots, localSnapshotMap) + if err != nil { + return err + } + + // Sort by timestamp (newest first) + sort.Slice(snapshots, func(i, j int) bool { + return snapshots[i].Timestamp.After(snapshots[j].Timestamp) + }) + + if jsonOutput { + encoder := json.NewEncoder(v.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(snapshots) + } + + return v.printSnapshotTable(snapshots) +} + +// listRemoteSnapshotIDs returns a set of snapshot IDs found in remote storage +func (v *Vaultik) listRemoteSnapshotIDs() (map[string]bool, error) { remoteSnapshots := make(map[string]bool) objectCh := v.Storage.ListStream(v.ctx, "metadata/") for object := range objectCh { if object.Err != nil { - return fmt.Errorf("listing remote snapshots: %w", object.Err) + return nil, fmt.Errorf("listing remote snapshots: %w", object.Err) } - // Extract snapshot ID from paths like metadata/hostname-20240115-143052Z/ parts := strings.Split(object.Key, "/") if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" { - // Skip macOS resource fork files (._*) and other hidden files if strings.HasPrefix(parts[1], ".") { continue } @@ -349,56 +407,46 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error { } } - // Get all local snapshots + return remoteSnapshots, nil +} + +// reconcileLocalWithRemote removes local snapshots not in remote and returns the surviving local map +func (v *Vaultik) reconcileLocalWithRemote(remoteSnapshots map[string]bool) (map[string]*database.Snapshot, error) { localSnapshots, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000) if err != nil { - return fmt.Errorf("listing local snapshots: %w", err) + return nil, fmt.Errorf("listing local snapshots: %w", err) } - // Build a map of local snapshots for quick lookup localSnapshotMap := make(map[string]*database.Snapshot) for _, s := range localSnapshots { localSnapshotMap[s.ID.String()] = s } - // Remove local snapshots that don't exist remotely - for _, snapshot := range localSnapshots { - snapshotIDStr := snapshot.ID.String() + for _, snap := range localSnapshots { + snapshotIDStr := snap.ID.String() if !remoteSnapshots[snapshotIDStr] { - log.Info("Removing local snapshot not found in remote", "snapshot_id", snapshot.ID) - - // Delete related records first to avoid foreign key constraints - if err := v.Repositories.Snapshots.DeleteSnapshotFiles(v.ctx, snapshotIDStr); err != nil { - log.Error("Failed to delete snapshot files", "snapshot_id", snapshot.ID, "error", err) - } - if err := v.Repositories.Snapshots.DeleteSnapshotBlobs(v.ctx, snapshotIDStr); err != nil { - log.Error("Failed to delete snapshot blobs", "snapshot_id", snapshot.ID, "error", err) - } - if err := v.Repositories.Snapshots.DeleteSnapshotUploads(v.ctx, snapshotIDStr); err != nil { - log.Error("Failed to delete snapshot uploads", "snapshot_id", snapshot.ID, "error", err) - } - - // Now delete the snapshot itself - if err := v.Repositories.Snapshots.Delete(v.ctx, snapshotIDStr); err != nil { - log.Error("Failed to delete local snapshot", "snapshot_id", snapshot.ID, "error", err) + log.Info("Removing local snapshot not found in remote", "snapshot_id", snap.ID) + if err := v.deleteSnapshotFromLocalDB(snapshotIDStr); err != nil { + log.Error("Failed to delete local snapshot", "snapshot_id", snap.ID, "error", err) } else { - log.Info("Deleted local snapshot not found in remote", "snapshot_id", snapshot.ID) + log.Info("Deleted local snapshot not found in remote", "snapshot_id", snap.ID) delete(localSnapshotMap, snapshotIDStr) } } } - // Build final snapshot list + return localSnapshotMap, nil +} + +// buildSnapshotInfoList constructs SnapshotInfo entries from remote IDs and local data +func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSnapshotMap map[string]*database.Snapshot) ([]SnapshotInfo, error) { snapshots := make([]SnapshotInfo, 0, len(remoteSnapshots)) for snapshotID := range remoteSnapshots { - // Check if we have this snapshot locally if localSnap, exists := localSnapshotMap[snapshotID]; exists && localSnap.CompletedAt != nil { - // Get total compressed size of all blobs referenced by this snapshot totalSize, err := v.Repositories.Snapshots.GetSnapshotTotalCompressedSize(v.ctx, snapshotID) if err != nil { log.Warn("Failed to get total compressed size", "id", snapshotID, "error", err) - // Fall back to stored blob size totalSize = localSnap.BlobSize } @@ -408,17 +456,15 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error { CompressedSize: totalSize, }) } else { - // Remote snapshot not in local DB - fetch manifest to get size timestamp, err := parseSnapshotTimestamp(snapshotID) if err != nil { log.Warn("Failed to parse snapshot timestamp", "id", snapshotID, "error", err) continue } - // Try to download manifest to get size totalSize, err := v.getManifestSize(snapshotID) if err != nil { - return fmt.Errorf("failed to get manifest size for %s: %w", snapshotID, err) + return nil, fmt.Errorf("failed to get manifest size for %s: %w", snapshotID, err) } snapshots = append(snapshots, SnapshotInfo{ @@ -429,22 +475,13 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error { } } - // Sort by timestamp (newest first) - sort.Slice(snapshots, func(i, j int) bool { - return snapshots[i].Timestamp.After(snapshots[j].Timestamp) - }) + return snapshots, nil +} - if jsonOutput { - // JSON output - encoder := json.NewEncoder(v.Stdout) - encoder.SetIndent("", " ") - return encoder.Encode(snapshots) - } - - // Table output +// printSnapshotTable renders the snapshot list as a formatted table +func (v *Vaultik) printSnapshotTable(snapshots []SnapshotInfo) error { w := tabwriter.NewWriter(v.Stdout, 0, 0, 3, ' ', 0) - // Show configured snapshots from config file if _, err := fmt.Fprintln(w, "CONFIGURED SNAPSHOTS:"); err != nil { return err } @@ -465,7 +502,6 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error { return err } - // Show remote snapshots if _, err := fmt.Fprintln(w, "REMOTE SNAPSHOTS:"); err != nil { return err } diff --git a/internal/vaultik/verify.go b/internal/vaultik/verify.go index 55213ef..a4c8f8a 100644 --- a/internal/vaultik/verify.go +++ b/internal/vaultik/verify.go @@ -35,6 +35,19 @@ type VerifyResult struct { ErrorMessage string `json:"error,omitempty"` } +// deepVerifyFailure records a failure in the result and returns it appropriately +func (v *Vaultik) deepVerifyFailure(result *VerifyResult, opts *VerifyOptions, msg string, err error) error { + result.Status = "failed" + result.ErrorMessage = msg + if opts.JSON { + return v.outputVerifyJSON(result) + } + if err != nil { + return err + } + return fmt.Errorf("%s", msg) +} + // RunDeepVerify executes deep verification operation func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { result := &VerifyResult{ @@ -42,89 +55,20 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { Mode: "deep", } - // Check for decryption capability if !v.CanDecrypt() { - result.Status = "failed" - result.ErrorMessage = "VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification" - if opts.JSON { - return v.outputVerifyJSON(result) - } - return fmt.Errorf("VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification") + return v.deepVerifyFailure(result, opts, + "VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification", + fmt.Errorf("VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification")) } - log.Info("Starting snapshot verification", - "snapshot_id", snapshotID, - "mode", "deep", - ) - + log.Info("Starting snapshot verification", "snapshot_id", snapshotID, "mode", "deep") if !opts.JSON { v.printfStdout("Deep verification of snapshot: %s\n\n", snapshotID) } - // Step 1: Download manifest - manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) - log.Info("Downloading manifest", "path", manifestPath) - if !opts.JSON { - v.printfStdout("Downloading manifest...\n") - } - - manifestReader, err := v.Storage.Get(v.ctx, manifestPath) + manifest, tempDB, dbBlobs, err := v.loadVerificationData(snapshotID, opts, result) if err != nil { - result.Status = "failed" - result.ErrorMessage = fmt.Sprintf("failed to download manifest: %v", err) - if opts.JSON { - return v.outputVerifyJSON(result) - } - return fmt.Errorf("failed to download manifest: %w", err) - } - defer func() { _ = manifestReader.Close() }() - - // Decompress manifest - manifest, err := snapshot.DecodeManifest(manifestReader) - if err != nil { - result.Status = "failed" - result.ErrorMessage = fmt.Sprintf("failed to decode manifest: %v", err) - if opts.JSON { - return v.outputVerifyJSON(result) - } - return fmt.Errorf("failed to decode manifest: %w", err) - } - - log.Info("Manifest loaded", - "manifest_blob_count", manifest.BlobCount, - "manifest_total_size", humanize.Bytes(uint64(manifest.TotalCompressedSize)), - ) - if !opts.JSON { - v.printfStdout("Manifest loaded: %d blobs (%s)\n", manifest.BlobCount, humanize.Bytes(uint64(manifest.TotalCompressedSize))) - } - - // Step 2: Download and decrypt database (authoritative source) - dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID) - log.Info("Downloading encrypted database", "path", dbPath) - if !opts.JSON { - v.printfStdout("Downloading and decrypting database...\n") - } - - dbReader, err := v.Storage.Get(v.ctx, dbPath) - if err != nil { - result.Status = "failed" - result.ErrorMessage = fmt.Sprintf("failed to download database: %v", err) - if opts.JSON { - return v.outputVerifyJSON(result) - } - return fmt.Errorf("failed to download database: %w", err) - } - defer func() { _ = dbReader.Close() }() - - // Decrypt and decompress database - tempDB, err := v.decryptAndLoadDatabase(dbReader, v.Config.AgeSecretKey) - if err != nil { - result.Status = "failed" - result.ErrorMessage = fmt.Sprintf("failed to decrypt database: %v", err) - if opts.JSON { - return v.outputVerifyJSON(result) - } - return fmt.Errorf("failed to decrypt database: %w", err) + return err } defer func() { if tempDB != nil { @@ -132,17 +76,6 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { } }() - // Step 3: Get authoritative blob list from database - dbBlobs, err := v.getBlobsFromDatabase(snapshotID, tempDB.DB) - if err != nil { - result.Status = "failed" - result.ErrorMessage = fmt.Sprintf("failed to get blobs from database: %v", err) - if opts.JSON { - return v.outputVerifyJSON(result) - } - return fmt.Errorf("failed to get blobs from database: %w", err) - } - result.BlobCount = len(dbBlobs) var totalSize int64 for _, blob := range dbBlobs { @@ -150,54 +83,10 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { } result.TotalSize = totalSize - log.Info("Database loaded", - "db_blob_count", len(dbBlobs), - "db_total_size", humanize.Bytes(uint64(totalSize)), - ) - if !opts.JSON { - v.printfStdout("Database loaded: %d blobs (%s)\n", len(dbBlobs), humanize.Bytes(uint64(totalSize))) - v.printfStdout("Verifying manifest against database...\n") - } - - // Step 4: Verify manifest matches database - if err := v.verifyManifestAgainstDatabase(manifest, dbBlobs); err != nil { - result.Status = "failed" - result.ErrorMessage = err.Error() - if opts.JSON { - return v.outputVerifyJSON(result) - } + if err := v.runVerificationSteps(manifest, dbBlobs, tempDB, opts, result, totalSize); err != nil { return err } - // Step 5: Verify all blobs exist in S3 (using database as source) - if !opts.JSON { - v.printfStdout("Manifest verified.\n") - v.printfStdout("Checking blob existence in remote storage...\n") - } - if err := v.verifyBlobExistenceFromDB(dbBlobs); err != nil { - result.Status = "failed" - result.ErrorMessage = err.Error() - if opts.JSON { - return v.outputVerifyJSON(result) - } - return err - } - - // Step 6: Deep verification - download and verify blob contents - if !opts.JSON { - v.printfStdout("All blobs exist.\n") - v.printfStdout("Downloading and verifying blob contents (%d blobs, %s)...\n", len(dbBlobs), humanize.Bytes(uint64(totalSize))) - } - if err := v.performDeepVerificationFromDB(dbBlobs, tempDB.DB, opts); err != nil { - result.Status = "failed" - result.ErrorMessage = err.Error() - if opts.JSON { - return v.outputVerifyJSON(result) - } - return err - } - - // Success result.Status = "ok" result.Verified = len(dbBlobs) @@ -206,11 +95,7 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { } log.Info("✓ Verification completed successfully", - "snapshot_id", snapshotID, - "mode", "deep", - "blobs_verified", len(dbBlobs), - ) - + "snapshot_id", snapshotID, "mode", "deep", "blobs_verified", len(dbBlobs)) v.printfStdout("\n✓ Verification completed successfully\n") v.printfStdout(" Snapshot: %s\n", snapshotID) v.printfStdout(" Blobs verified: %d\n", len(dbBlobs)) @@ -219,6 +104,99 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { return nil } +// loadVerificationData downloads manifest, database, and blob list for verification +func (v *Vaultik) loadVerificationData(snapshotID string, opts *VerifyOptions, result *VerifyResult) (*snapshot.Manifest, *tempDB, []snapshot.BlobInfo, error) { + // Download manifest + if !opts.JSON { + v.printfStdout("Downloading manifest...\n") + } + manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) + manifestReader, err := v.Storage.Get(v.ctx, manifestPath) + if err != nil { + return nil, nil, nil, v.deepVerifyFailure(result, opts, + fmt.Sprintf("failed to download manifest: %v", err), + fmt.Errorf("failed to download manifest: %w", err)) + } + defer func() { _ = manifestReader.Close() }() + + manifest, err := snapshot.DecodeManifest(manifestReader) + if err != nil { + return nil, nil, nil, v.deepVerifyFailure(result, opts, + fmt.Sprintf("failed to decode manifest: %v", err), + fmt.Errorf("failed to decode manifest: %w", err)) + } + + if !opts.JSON { + v.printfStdout("Manifest loaded: %d blobs (%s)\n", manifest.BlobCount, humanize.Bytes(uint64(manifest.TotalCompressedSize))) + v.printfStdout("Downloading and decrypting database...\n") + } + + // Download and decrypt database + dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID) + dbReader, err := v.Storage.Get(v.ctx, dbPath) + if err != nil { + return nil, nil, nil, v.deepVerifyFailure(result, opts, + fmt.Sprintf("failed to download database: %v", err), + fmt.Errorf("failed to download database: %w", err)) + } + defer func() { _ = dbReader.Close() }() + + tdb, err := v.decryptAndLoadDatabase(dbReader, v.Config.AgeSecretKey) + if err != nil { + return nil, nil, nil, v.deepVerifyFailure(result, opts, + fmt.Sprintf("failed to decrypt database: %v", err), + fmt.Errorf("failed to decrypt database: %w", err)) + } + + dbBlobs, err := v.getBlobsFromDatabase(snapshotID, tdb.DB) + if err != nil { + _ = tdb.Close() + return nil, nil, nil, v.deepVerifyFailure(result, opts, + fmt.Sprintf("failed to get blobs from database: %v", err), + fmt.Errorf("failed to get blobs from database: %w", err)) + } + + if !opts.JSON { + v.printfStdout("Database loaded: %d blobs (%s)\n", len(dbBlobs), humanize.Bytes(uint64(func() int64 { + var s int64 + for _, b := range dbBlobs { + s += b.CompressedSize + } + return s + }()))) + } + + return manifest, tdb, dbBlobs, nil +} + +// runVerificationSteps executes manifest verification, blob existence check, and deep content verification +func (v *Vaultik) runVerificationSteps(manifest *snapshot.Manifest, dbBlobs []snapshot.BlobInfo, tdb *tempDB, opts *VerifyOptions, result *VerifyResult, totalSize int64) error { + if !opts.JSON { + v.printfStdout("Verifying manifest against database...\n") + } + if err := v.verifyManifestAgainstDatabase(manifest, dbBlobs); err != nil { + return v.deepVerifyFailure(result, opts, err.Error(), err) + } + + if !opts.JSON { + v.printfStdout("Manifest verified.\n") + v.printfStdout("Checking blob existence in remote storage...\n") + } + if err := v.verifyBlobExistenceFromDB(dbBlobs); err != nil { + return v.deepVerifyFailure(result, opts, err.Error(), err) + } + + if !opts.JSON { + v.printfStdout("All blobs exist.\n") + v.printfStdout("Downloading and verifying blob contents (%d blobs, %s)...\n", len(dbBlobs), humanize.Bytes(uint64(totalSize))) + } + if err := v.performDeepVerificationFromDB(dbBlobs, tdb.DB, opts); err != nil { + return v.deepVerifyFailure(result, opts, err.Error(), err) + } + + return nil +} + // tempDB wraps sql.DB with cleanup type tempDB struct { *sql.DB -- 2.45.2 From 37780d59de5c9f60b98473c3aae4f5266b97535f Mon Sep 17 00:00:00 2001 From: user Date: Fri, 20 Feb 2026 03:50:29 -0800 Subject: [PATCH 2/2] fix: propagate errors in uploadBlobIfNeeded and listUniqueSnapshotIDs - scanner.go: uploadBlobIfNeeded now returns (bool, error) instead of bool, preventing data loss where blobs were recorded in DB but upload failures were silently swallowed - prune.go: listUniqueSnapshotIDs now returns ([]string, error) instead of []string, preventing incorrect orphan detection when listing errors occur --- internal/snapshot/scanner.go | 14 +++++++++----- internal/vaultik/prune.go | 11 +++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index 74391d1..5c8fa88 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -942,7 +942,11 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { } blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash) - blobExists := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime) + blobExists, err := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime) + if err != nil { + s.cleanupBlobTempFile(blobWithReader) + return fmt.Errorf("uploading blob %s: %w", finishedBlob.Hash, err) + } if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil { s.cleanupBlobTempFile(blobWithReader) @@ -963,7 +967,7 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { } // uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed -func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) bool { +func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) (bool, error) { finishedBlob := blobWithReader.FinishedBlob // Check if blob already exists (deduplication after restart) @@ -972,14 +976,14 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW "hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed))) fmt.Printf("Blob exists: %s (%s, skipped upload)\n", finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed))) - return true + return true, nil } progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob) if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err) - return false + return false, fmt.Errorf("uploading blob to storage: %w", err) } uploadDuration := time.Since(startTime) @@ -1004,7 +1008,7 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW stats.BytesUploaded.Add(finishedBlob.Compressed) } - return false + return false, nil } // makeUploadProgressCallback creates a progress callback for blob uploads diff --git a/internal/vaultik/prune.go b/internal/vaultik/prune.go index 2aefc48..2fb1a35 100644 --- a/internal/vaultik/prune.go +++ b/internal/vaultik/prune.go @@ -85,7 +85,10 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { // collectReferencedBlobs downloads all manifests and returns the set of referenced blob hashes func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) { log.Info("Listing remote snapshots") - snapshotIDs := v.listUniqueSnapshotIDs() + snapshotIDs, err := v.listUniqueSnapshotIDs() + if err != nil { + return nil, fmt.Errorf("listing snapshot IDs: %w", err) + } log.Info("Found manifests in remote storage", "count", len(snapshotIDs)) allBlobsReferenced := make(map[string]bool) @@ -109,14 +112,14 @@ func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) { } // listUniqueSnapshotIDs returns deduplicated snapshot IDs from remote metadata -func (v *Vaultik) listUniqueSnapshotIDs() []string { +func (v *Vaultik) listUniqueSnapshotIDs() ([]string, error) { objectCh := v.Storage.ListStream(v.ctx, "metadata/") seen := make(map[string]bool) var snapshotIDs []string for object := range objectCh { if object.Err != nil { - continue + return nil, fmt.Errorf("listing metadata objects: %w", object.Err) } parts := strings.Split(object.Key, "/") if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" { @@ -129,7 +132,7 @@ func (v *Vaultik) listUniqueSnapshotIDs() []string { } } } - return snapshotIDs + return snapshotIDs, nil } // listAllRemoteBlobs returns a map of all blob hashes to their sizes in remote storage -- 2.45.2