Compare commits
2 Commits
main
...
refactor/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37780d59de | ||
|
|
eb23e14799 |
@ -361,101 +361,23 @@ func (p *Packer) finalizeCurrentBlob() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close blobgen writer to flush all data
|
||||
if err := p.currentBlob.writer.Close(); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("closing blobgen writer: %w", err)
|
||||
}
|
||||
|
||||
// Sync file to ensure all data is written
|
||||
if err := p.currentBlob.tempFile.Sync(); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("syncing temp file: %w", err)
|
||||
}
|
||||
|
||||
// Get the final size (encrypted if applicable)
|
||||
finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent)
|
||||
blobHash, finalSize, err := p.closeBlobWriter()
|
||||
if err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("getting file size: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Reset to beginning for reading
|
||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("seeking to start: %w", err)
|
||||
}
|
||||
chunkRefs := p.buildChunkRefs()
|
||||
|
||||
// Get hash from blobgen writer (of final encrypted data)
|
||||
finalHash := p.currentBlob.writer.Sum256()
|
||||
blobHash := hex.EncodeToString(finalHash)
|
||||
|
||||
// Create chunk references with offsets
|
||||
chunkRefs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks))
|
||||
|
||||
for _, chunk := range p.currentBlob.chunks {
|
||||
chunkRefs = append(chunkRefs, &BlobChunkRef{
|
||||
ChunkHash: chunk.Hash,
|
||||
Offset: chunk.Offset,
|
||||
Length: chunk.Size,
|
||||
})
|
||||
}
|
||||
|
||||
// Get pending chunks (will be inserted to DB and reported to handler)
|
||||
chunksToInsert := p.pendingChunks
|
||||
p.pendingChunks = nil // Clear pending list
|
||||
p.pendingChunks = nil
|
||||
|
||||
// Insert pending chunks, blob_chunks, and update blob in a single transaction
|
||||
if p.repos != nil {
|
||||
blobIDTyped, parseErr := types.ParseBlobID(p.currentBlob.id)
|
||||
if parseErr != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("parsing blob ID: %w", parseErr)
|
||||
}
|
||||
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
|
||||
// First insert all pending chunks (required for blob_chunks FK)
|
||||
for _, chunk := range chunksToInsert {
|
||||
dbChunk := &database.Chunk{
|
||||
ChunkHash: types.ChunkHash(chunk.Hash),
|
||||
Size: chunk.Size,
|
||||
}
|
||||
if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil {
|
||||
return fmt.Errorf("creating chunk: %w", err)
|
||||
}
|
||||
if err := p.commitBlobToDatabase(blobHash, finalSize, chunksToInsert); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Insert all blob_chunk records in batch
|
||||
for _, chunk := range p.currentBlob.chunks {
|
||||
blobChunk := &database.BlobChunk{
|
||||
BlobID: blobIDTyped,
|
||||
ChunkHash: types.ChunkHash(chunk.Hash),
|
||||
Offset: chunk.Offset,
|
||||
Length: chunk.Size,
|
||||
}
|
||||
if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil {
|
||||
return fmt.Errorf("creating blob_chunk: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Update blob record with final hash and sizes
|
||||
return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash,
|
||||
p.currentBlob.size, finalSize)
|
||||
})
|
||||
if err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("finalizing blob transaction: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("Committed blob transaction",
|
||||
"chunks_inserted", len(chunksToInsert),
|
||||
"blob_chunks_inserted", len(p.currentBlob.chunks))
|
||||
}
|
||||
|
||||
// Create finished blob
|
||||
finished := &FinishedBlob{
|
||||
ID: p.currentBlob.id,
|
||||
Hash: blobHash,
|
||||
Data: nil, // We don't load data into memory anymore
|
||||
Chunks: chunkRefs,
|
||||
CreatedTS: p.currentBlob.startTime,
|
||||
Uncompressed: p.currentBlob.size,
|
||||
@ -464,28 +386,105 @@ func (p *Packer) finalizeCurrentBlob() error {
|
||||
|
||||
compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed)
|
||||
log.Info("Finalized blob (compressed and encrypted)",
|
||||
"hash", blobHash,
|
||||
"chunks", len(chunkRefs),
|
||||
"uncompressed", finished.Uncompressed,
|
||||
"compressed", finished.Compressed,
|
||||
"hash", blobHash, "chunks", len(chunkRefs),
|
||||
"uncompressed", finished.Uncompressed, "compressed", finished.Compressed,
|
||||
"ratio", fmt.Sprintf("%.2f", compressionRatio),
|
||||
"duration", time.Since(p.currentBlob.startTime))
|
||||
|
||||
// Collect inserted chunk hashes for the scanner to track
|
||||
var insertedChunkHashes []string
|
||||
for _, chunk := range chunksToInsert {
|
||||
insertedChunkHashes = append(insertedChunkHashes, chunk.Hash)
|
||||
}
|
||||
|
||||
// Call blob handler if set
|
||||
return p.deliverFinishedBlob(finished, insertedChunkHashes)
|
||||
}
|
||||
|
||||
// closeBlobWriter closes the writer, syncs to disk, and returns the blob hash and final size
|
||||
func (p *Packer) closeBlobWriter() (string, int64, error) {
|
||||
if err := p.currentBlob.writer.Close(); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return "", 0, fmt.Errorf("closing blobgen writer: %w", err)
|
||||
}
|
||||
if err := p.currentBlob.tempFile.Sync(); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return "", 0, fmt.Errorf("syncing temp file: %w", err)
|
||||
}
|
||||
|
||||
finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
p.cleanupTempFile()
|
||||
return "", 0, fmt.Errorf("getting file size: %w", err)
|
||||
}
|
||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return "", 0, fmt.Errorf("seeking to start: %w", err)
|
||||
}
|
||||
|
||||
finalHash := p.currentBlob.writer.Sum256()
|
||||
return hex.EncodeToString(finalHash), finalSize, nil
|
||||
}
|
||||
|
||||
// buildChunkRefs creates BlobChunkRef entries from the current blob's chunks
|
||||
func (p *Packer) buildChunkRefs() []*BlobChunkRef {
|
||||
refs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks))
|
||||
for _, chunk := range p.currentBlob.chunks {
|
||||
refs = append(refs, &BlobChunkRef{
|
||||
ChunkHash: chunk.Hash, Offset: chunk.Offset, Length: chunk.Size,
|
||||
})
|
||||
}
|
||||
return refs
|
||||
}
|
||||
|
||||
// commitBlobToDatabase inserts pending chunks, blob_chunks, and updates the blob record
|
||||
func (p *Packer) commitBlobToDatabase(blobHash string, finalSize int64, chunksToInsert []PendingChunk) error {
|
||||
if p.repos == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
blobIDTyped, parseErr := types.ParseBlobID(p.currentBlob.id)
|
||||
if parseErr != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("parsing blob ID: %w", parseErr)
|
||||
}
|
||||
|
||||
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
|
||||
for _, chunk := range chunksToInsert {
|
||||
dbChunk := &database.Chunk{ChunkHash: types.ChunkHash(chunk.Hash), Size: chunk.Size}
|
||||
if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil {
|
||||
return fmt.Errorf("creating chunk: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, chunk := range p.currentBlob.chunks {
|
||||
blobChunk := &database.BlobChunk{
|
||||
BlobID: blobIDTyped, ChunkHash: types.ChunkHash(chunk.Hash),
|
||||
Offset: chunk.Offset, Length: chunk.Size,
|
||||
}
|
||||
if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil {
|
||||
return fmt.Errorf("creating blob_chunk: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash, p.currentBlob.size, finalSize)
|
||||
})
|
||||
if err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("finalizing blob transaction: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("Committed blob transaction",
|
||||
"chunks_inserted", len(chunksToInsert), "blob_chunks_inserted", len(p.currentBlob.chunks))
|
||||
return nil
|
||||
}
|
||||
|
||||
// deliverFinishedBlob passes the blob to the handler or stores it internally
|
||||
func (p *Packer) deliverFinishedBlob(finished *FinishedBlob, insertedChunkHashes []string) error {
|
||||
if p.blobHandler != nil {
|
||||
// Reset file position for handler
|
||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("seeking for handler: %w", err)
|
||||
}
|
||||
|
||||
// Create a blob reader that includes the data stream
|
||||
blobWithReader := &BlobWithReader{
|
||||
FinishedBlob: finished,
|
||||
Reader: p.currentBlob.tempFile,
|
||||
@ -497,11 +496,12 @@ func (p *Packer) finalizeCurrentBlob() error {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("blob handler failed: %w", err)
|
||||
}
|
||||
// Note: blob handler is responsible for closing/cleaning up temp file
|
||||
p.currentBlob = nil
|
||||
} else {
|
||||
log.Debug("No blob handler callback configured", "blob_hash", blobHash[:8]+"...")
|
||||
// No handler, need to read data for legacy behavior
|
||||
return nil
|
||||
}
|
||||
|
||||
// No handler - read data for legacy behavior
|
||||
log.Debug("No blob handler callback configured", "blob_hash", finished.Hash[:8]+"...")
|
||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("seeking to read data: %w", err)
|
||||
@ -513,14 +513,9 @@ func (p *Packer) finalizeCurrentBlob() error {
|
||||
return fmt.Errorf("reading blob data: %w", err)
|
||||
}
|
||||
finished.Data = data
|
||||
|
||||
p.finishedBlobs = append(p.finishedBlobs, finished)
|
||||
|
||||
// Cleanup
|
||||
p.cleanupTempFile()
|
||||
p.currentBlob = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -931,40 +931,103 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
startTime := time.Now().UTC()
|
||||
finishedBlob := blobWithReader.FinishedBlob
|
||||
|
||||
// Report upload start and increment blobs created
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
|
||||
s.progress.GetStats().BlobsCreated.Add(1)
|
||||
}
|
||||
|
||||
// Upload to storage first (without holding any locks)
|
||||
// Use scan context for cancellation support
|
||||
ctx := s.scanCtx
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// Track bytes uploaded for accurate speed calculation
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
||||
blobExists, err := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime)
|
||||
if err != nil {
|
||||
s.cleanupBlobTempFile(blobWithReader)
|
||||
return fmt.Errorf("uploading blob %s: %w", finishedBlob.Hash, err)
|
||||
}
|
||||
|
||||
if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil {
|
||||
s.cleanupBlobTempFile(blobWithReader)
|
||||
return err
|
||||
}
|
||||
|
||||
s.cleanupBlobTempFile(blobWithReader)
|
||||
|
||||
// Chunks from this blob are now committed to DB - remove from pending set
|
||||
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
|
||||
|
||||
// Flush files whose chunks are now all committed
|
||||
if err := s.flushCompletedPendingFiles(ctx); err != nil {
|
||||
return fmt.Errorf("flushing completed files: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed
|
||||
func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) (bool, error) {
|
||||
finishedBlob := blobWithReader.FinishedBlob
|
||||
|
||||
// Check if blob already exists (deduplication after restart)
|
||||
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
||||
log.Info("Blob already exists in storage, skipping upload",
|
||||
"hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
fmt.Printf("Blob exists: %s (%s, skipped upload)\n",
|
||||
finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob)
|
||||
|
||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err)
|
||||
return false, fmt.Errorf("uploading blob to storage: %w", err)
|
||||
}
|
||||
|
||||
uploadDuration := time.Since(startTime)
|
||||
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
||||
|
||||
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
humanize.Bytes(uint64(uploadSpeedBps)),
|
||||
uploadDuration.Round(time.Millisecond))
|
||||
|
||||
log.Info("Successfully uploaded blob to storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
"speed", humanize.SI(uploadSpeedBps*8, "bps"))
|
||||
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
||||
stats := s.progress.GetStats()
|
||||
stats.BlobsUploaded.Add(1)
|
||||
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// makeUploadProgressCallback creates a progress callback for blob uploads
|
||||
func (s *Scanner) makeUploadProgressCallback(ctx context.Context, finishedBlob *blob.FinishedBlob) func(int64) error {
|
||||
lastProgressTime := time.Now()
|
||||
lastProgressBytes := int64(0)
|
||||
|
||||
progressCallback := func(uploaded int64) error {
|
||||
// Calculate instantaneous speed
|
||||
return func(uploaded int64) error {
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(lastProgressTime).Seconds()
|
||||
if elapsed > 0.5 { // Update speed every 0.5 seconds
|
||||
if elapsed > 0.5 {
|
||||
bytesSinceLastUpdate := uploaded - lastProgressBytes
|
||||
speed := float64(bytesSinceLastUpdate) / elapsed
|
||||
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed)
|
||||
}
|
||||
|
||||
lastProgressTime = now
|
||||
lastProgressBytes = uploaded
|
||||
}
|
||||
|
||||
// Check for cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@ -972,87 +1035,26 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create sharded path: blobs/ca/fe/cafebabe...
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
||||
|
||||
// Check if blob already exists in remote storage (deduplication after restart)
|
||||
blobExists := false
|
||||
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
||||
blobExists = true
|
||||
log.Info("Blob already exists in storage, skipping upload",
|
||||
"hash", finishedBlob.Hash,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
fmt.Printf("Blob exists: %s (%s, skipped upload)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
}
|
||||
|
||||
if !blobExists {
|
||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err)
|
||||
}
|
||||
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
// Calculate upload speed
|
||||
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
||||
|
||||
// Print blob stored message
|
||||
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
humanize.Bytes(uint64(uploadSpeedBps)),
|
||||
uploadDuration.Round(time.Millisecond))
|
||||
|
||||
// Log upload stats
|
||||
uploadSpeedBits := uploadSpeedBps * 8 // bits per second
|
||||
log.Info("Successfully uploaded blob to storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
"speed", humanize.SI(uploadSpeedBits, "bps"))
|
||||
|
||||
// Report upload complete
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
||||
}
|
||||
|
||||
// Update progress after upload completes
|
||||
if s.progress != nil {
|
||||
stats := s.progress.GetStats()
|
||||
stats.BlobsUploaded.Add(1)
|
||||
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
||||
}
|
||||
}
|
||||
|
||||
// Store metadata in database (after upload is complete)
|
||||
dbCtx := s.scanCtx
|
||||
if dbCtx == nil {
|
||||
dbCtx = context.Background()
|
||||
}
|
||||
|
||||
// Parse blob ID for typed operations
|
||||
// recordBlobMetadata stores blob upload metadata in the database
|
||||
func (s *Scanner) recordBlobMetadata(ctx context.Context, finishedBlob *blob.FinishedBlob, blobExists bool, startTime time.Time) error {
|
||||
finishedBlobID, err := types.ParseBlobID(finishedBlob.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing finished blob ID: %w", err)
|
||||
}
|
||||
|
||||
// Track upload duration (0 if blob already existed)
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
err = s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
// Update blob upload timestamp
|
||||
if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil {
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
if err := s.repos.Blobs.UpdateUploaded(txCtx, tx, finishedBlob.ID); err != nil {
|
||||
return fmt.Errorf("updating blob upload timestamp: %w", err)
|
||||
}
|
||||
|
||||
// Add the blob to the snapshot
|
||||
if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil {
|
||||
if err := s.repos.Snapshots.AddBlob(txCtx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil {
|
||||
return fmt.Errorf("adding blob to snapshot: %w", err)
|
||||
}
|
||||
|
||||
// Record upload metrics (only for actual uploads, not deduplicated blobs)
|
||||
if !blobExists {
|
||||
upload := &database.Upload{
|
||||
BlobHash: finishedBlob.Hash,
|
||||
@ -1061,15 +1063,17 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
Size: finishedBlob.Compressed,
|
||||
DurationMs: uploadDuration.Milliseconds(),
|
||||
}
|
||||
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
|
||||
if err := s.repos.Uploads.Create(txCtx, tx, upload); err != nil {
|
||||
return fmt.Errorf("recording upload metrics: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Cleanup temp file if needed
|
||||
// cleanupBlobTempFile closes and removes the blob's temporary file
|
||||
func (s *Scanner) cleanupBlobTempFile(blobWithReader *blob.BlobWithReader) {
|
||||
if blobWithReader.TempFile != nil {
|
||||
tempName := blobWithReader.TempFile.Name()
|
||||
if err := blobWithReader.TempFile.Close(); err != nil {
|
||||
@ -1079,77 +1083,41 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Chunks from this blob are now committed to DB - remove from pending set
|
||||
log.Debug("handleBlobReady: removing pending chunk hashes")
|
||||
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
|
||||
log.Debug("handleBlobReady: removed pending chunk hashes")
|
||||
|
||||
// Flush files whose chunks are now all committed
|
||||
// This maintains database consistency after each blob
|
||||
log.Debug("handleBlobReady: calling flushCompletedPendingFiles")
|
||||
if err := s.flushCompletedPendingFiles(dbCtx); err != nil {
|
||||
return fmt.Errorf("flushing completed files: %w", err)
|
||||
}
|
||||
log.Debug("handleBlobReady: flushCompletedPendingFiles returned")
|
||||
|
||||
log.Debug("handleBlobReady: complete")
|
||||
return nil
|
||||
// streamingChunkInfo tracks chunk metadata collected during streaming
|
||||
type streamingChunkInfo struct {
|
||||
fileChunk database.FileChunk
|
||||
offset int64
|
||||
size int64
|
||||
}
|
||||
|
||||
// processFileStreaming processes a file by streaming chunks directly to the packer
|
||||
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
|
||||
// Open the file
|
||||
file, err := s.fs.Open(fileToProcess.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening file: %w", err)
|
||||
}
|
||||
defer func() { _ = file.Close() }()
|
||||
|
||||
// We'll collect file chunks for database storage
|
||||
// but process them for packing as we go
|
||||
type chunkInfo struct {
|
||||
fileChunk database.FileChunk
|
||||
offset int64
|
||||
size int64
|
||||
}
|
||||
var chunks []chunkInfo
|
||||
var chunks []streamingChunkInfo
|
||||
chunkIndex := 0
|
||||
|
||||
// Process chunks in streaming fashion and get full file hash
|
||||
fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
|
||||
// Check for cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
log.Debug("Processing content-defined chunk from file",
|
||||
"file", fileToProcess.Path,
|
||||
"chunk_index", chunkIndex,
|
||||
"hash", chunk.Hash,
|
||||
"size", chunk.Size)
|
||||
|
||||
// Check if chunk already exists (fast in-memory lookup)
|
||||
chunkExists := s.chunkExists(chunk.Hash)
|
||||
|
||||
// Queue new chunks for batch insert when blob finalizes
|
||||
// This dramatically reduces transaction overhead
|
||||
if !chunkExists {
|
||||
s.packer.AddPendingChunk(chunk.Hash, chunk.Size)
|
||||
// Add to in-memory cache immediately for fast duplicate detection
|
||||
s.addKnownChunk(chunk.Hash)
|
||||
// Track as pending until blob finalizes and commits to DB
|
||||
s.addPendingChunkHash(chunk.Hash)
|
||||
}
|
||||
|
||||
// Track file chunk association for later storage
|
||||
chunks = append(chunks, chunkInfo{
|
||||
chunks = append(chunks, streamingChunkInfo{
|
||||
fileChunk: database.FileChunk{
|
||||
FileID: fileToProcess.File.ID,
|
||||
Idx: chunkIndex,
|
||||
@ -1159,55 +1127,15 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
size: chunk.Size,
|
||||
})
|
||||
|
||||
// Update stats
|
||||
if chunkExists {
|
||||
result.FilesSkipped++ // Track as skipped for now
|
||||
result.BytesSkipped += chunk.Size
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesSkipped.Add(chunk.Size)
|
||||
}
|
||||
} else {
|
||||
result.ChunksCreated++
|
||||
result.BytesScanned += chunk.Size
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().ChunksCreated.Add(1)
|
||||
s.progress.GetStats().BytesProcessed.Add(chunk.Size)
|
||||
s.progress.UpdateChunkingActivity()
|
||||
}
|
||||
}
|
||||
s.updateChunkStats(chunkExists, chunk.Size, result)
|
||||
|
||||
// Add chunk to packer immediately (streaming)
|
||||
// This happens outside the database transaction
|
||||
if !chunkExists {
|
||||
s.packerMu.Lock()
|
||||
err := s.packer.AddChunk(&blob.ChunkRef{
|
||||
Hash: chunk.Hash,
|
||||
Data: chunk.Data,
|
||||
})
|
||||
if err == blob.ErrBlobSizeLimitExceeded {
|
||||
// Finalize current blob and retry
|
||||
if err := s.packer.FinalizeBlob(); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("finalizing blob: %w", err)
|
||||
if err := s.addChunkToPacker(chunk); err != nil {
|
||||
return err
|
||||
}
|
||||
// Retry adding the chunk
|
||||
if err := s.packer.AddChunk(&blob.ChunkRef{
|
||||
Hash: chunk.Hash,
|
||||
Data: chunk.Data,
|
||||
}); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk after finalize: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk to packer: %w", err)
|
||||
}
|
||||
s.packerMu.Unlock()
|
||||
}
|
||||
|
||||
// Clear chunk data from memory immediately after use
|
||||
chunk.Data = nil
|
||||
|
||||
chunkIndex++
|
||||
return nil
|
||||
})
|
||||
@ -1217,12 +1145,54 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
}
|
||||
|
||||
log.Debug("Completed snapshotting file",
|
||||
"path", fileToProcess.Path,
|
||||
"file_hash", fileHash,
|
||||
"chunks", len(chunks))
|
||||
"path", fileToProcess.Path, "file_hash", fileHash, "chunks", len(chunks))
|
||||
|
||||
// Build file data for batch insertion
|
||||
// Update chunk associations with the file ID
|
||||
s.queueFileForBatchInsert(ctx, fileToProcess, chunks)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateChunkStats updates scan result and progress stats for a processed chunk
|
||||
func (s *Scanner) updateChunkStats(chunkExists bool, chunkSize int64, result *ScanResult) {
|
||||
if chunkExists {
|
||||
result.FilesSkipped++
|
||||
result.BytesSkipped += chunkSize
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesSkipped.Add(chunkSize)
|
||||
}
|
||||
} else {
|
||||
result.ChunksCreated++
|
||||
result.BytesScanned += chunkSize
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().ChunksCreated.Add(1)
|
||||
s.progress.GetStats().BytesProcessed.Add(chunkSize)
|
||||
s.progress.UpdateChunkingActivity()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addChunkToPacker adds a chunk to the blob packer, finalizing the current blob if needed
|
||||
func (s *Scanner) addChunkToPacker(chunk chunker.Chunk) error {
|
||||
s.packerMu.Lock()
|
||||
err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data})
|
||||
if err == blob.ErrBlobSizeLimitExceeded {
|
||||
if err := s.packer.FinalizeBlob(); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("finalizing blob: %w", err)
|
||||
}
|
||||
if err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data}); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk after finalize: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk to packer: %w", err)
|
||||
}
|
||||
s.packerMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// queueFileForBatchInsert builds file/chunk associations and queues the file for batch DB insert
|
||||
func (s *Scanner) queueFileForBatchInsert(ctx context.Context, fileToProcess *FileToProcess, chunks []streamingChunkInfo) {
|
||||
fileChunks := make([]database.FileChunk, len(chunks))
|
||||
chunkFiles := make([]database.ChunkFile, len(chunks))
|
||||
for i, ci := range chunks {
|
||||
@ -1239,14 +1209,11 @@ func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileT
|
||||
}
|
||||
}
|
||||
|
||||
// Queue file for batch insertion
|
||||
// Files will be flushed when their chunks are committed (after blob finalize)
|
||||
s.addPendingFile(ctx, pendingFileData{
|
||||
file: fileToProcess.File,
|
||||
fileChunks: fileChunks,
|
||||
chunkFiles: chunkFiles,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetProgress returns the progress reporter for this scanner
|
||||
|
||||
@ -151,7 +151,6 @@ type RemoteInfoResult struct {
|
||||
func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
result := &RemoteInfoResult{}
|
||||
|
||||
// Get storage info
|
||||
storageInfo := v.Storage.Info()
|
||||
result.StorageType = storageInfo.Type
|
||||
result.StorageLocation = storageInfo.Location
|
||||
@ -161,23 +160,46 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
v.printfStdout("Type: %s\n", storageInfo.Type)
|
||||
v.printfStdout("Location: %s\n", storageInfo.Location)
|
||||
v.printlnStdout()
|
||||
}
|
||||
|
||||
// List all snapshot metadata
|
||||
if !jsonOutput {
|
||||
v.printfStdout("Scanning snapshot metadata...\n")
|
||||
}
|
||||
|
||||
snapshotMetadata, snapshotIDs, err := v.collectSnapshotMetadata()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !jsonOutput {
|
||||
v.printfStdout("Downloading %d manifest(s)...\n", len(snapshotIDs))
|
||||
}
|
||||
|
||||
referencedBlobs := v.collectReferencedBlobsFromManifests(snapshotIDs, snapshotMetadata)
|
||||
|
||||
v.populateRemoteInfoResult(result, snapshotMetadata, snapshotIDs, referencedBlobs)
|
||||
|
||||
if err := v.scanRemoteBlobStorage(result, referencedBlobs, jsonOutput); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if jsonOutput {
|
||||
enc := json.NewEncoder(v.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(result)
|
||||
}
|
||||
|
||||
v.printRemoteInfoTable(result)
|
||||
return nil
|
||||
}
|
||||
|
||||
// collectSnapshotMetadata scans remote metadata and returns per-snapshot info and sorted IDs
|
||||
func (v *Vaultik) collectSnapshotMetadata() (map[string]*SnapshotMetadataInfo, []string, error) {
|
||||
snapshotMetadata := make(map[string]*SnapshotMetadataInfo)
|
||||
|
||||
// Collect metadata files
|
||||
metadataCh := v.Storage.ListStream(v.ctx, "metadata/")
|
||||
for obj := range metadataCh {
|
||||
if obj.Err != nil {
|
||||
return fmt.Errorf("listing metadata: %w", obj.Err)
|
||||
return nil, nil, fmt.Errorf("listing metadata: %w", obj.Err)
|
||||
}
|
||||
|
||||
// Parse key: metadata/<snapshot-id>/<filename>
|
||||
parts := strings.Split(obj.Key, "/")
|
||||
if len(parts) < 3 {
|
||||
continue
|
||||
@ -185,14 +207,11 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
snapshotID := parts[1]
|
||||
|
||||
if _, exists := snapshotMetadata[snapshotID]; !exists {
|
||||
snapshotMetadata[snapshotID] = &SnapshotMetadataInfo{
|
||||
SnapshotID: snapshotID,
|
||||
}
|
||||
snapshotMetadata[snapshotID] = &SnapshotMetadataInfo{SnapshotID: snapshotID}
|
||||
}
|
||||
|
||||
info := snapshotMetadata[snapshotID]
|
||||
filename := parts[2]
|
||||
|
||||
if strings.HasPrefix(filename, "manifest") {
|
||||
info.ManifestSize = obj.Size
|
||||
} else if strings.HasPrefix(filename, "db") {
|
||||
@ -201,19 +220,18 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
info.TotalSize = info.ManifestSize + info.DatabaseSize
|
||||
}
|
||||
|
||||
// Sort snapshots by ID for consistent output
|
||||
var snapshotIDs []string
|
||||
for id := range snapshotMetadata {
|
||||
snapshotIDs = append(snapshotIDs, id)
|
||||
}
|
||||
sort.Strings(snapshotIDs)
|
||||
|
||||
// Download and parse all manifests to get referenced blobs
|
||||
if !jsonOutput {
|
||||
v.printfStdout("Downloading %d manifest(s)...\n", len(snapshotIDs))
|
||||
}
|
||||
return snapshotMetadata, snapshotIDs, nil
|
||||
}
|
||||
|
||||
referencedBlobs := make(map[string]int64) // hash -> compressed size
|
||||
// collectReferencedBlobsFromManifests downloads manifests and returns referenced blob hashes with sizes
|
||||
func (v *Vaultik) collectReferencedBlobsFromManifests(snapshotIDs []string, snapshotMetadata map[string]*SnapshotMetadataInfo) map[string]int64 {
|
||||
referencedBlobs := make(map[string]int64)
|
||||
|
||||
for _, snapshotID := range snapshotIDs {
|
||||
manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
@ -230,10 +248,8 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// Record blob info from manifest
|
||||
info := snapshotMetadata[snapshotID]
|
||||
info.BlobCount = manifest.BlobCount
|
||||
|
||||
var blobsSize int64
|
||||
for _, blob := range manifest.Blobs {
|
||||
referencedBlobs[blob.Hash] = blob.CompressedSize
|
||||
@ -242,7 +258,11 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
info.BlobsSize = blobsSize
|
||||
}
|
||||
|
||||
// Build result snapshots
|
||||
return referencedBlobs
|
||||
}
|
||||
|
||||
// populateRemoteInfoResult fills in the result's snapshot and referenced blob stats
|
||||
func (v *Vaultik) populateRemoteInfoResult(result *RemoteInfoResult, snapshotMetadata map[string]*SnapshotMetadataInfo, snapshotIDs []string, referencedBlobs map[string]int64) {
|
||||
var totalMetadataSize int64
|
||||
for _, id := range snapshotIDs {
|
||||
info := snapshotMetadata[id]
|
||||
@ -252,26 +272,25 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
result.TotalMetadataSize = totalMetadataSize
|
||||
result.TotalMetadataCount = len(snapshotIDs)
|
||||
|
||||
// Calculate referenced blob stats
|
||||
for _, size := range referencedBlobs {
|
||||
result.ReferencedBlobCount++
|
||||
result.ReferencedBlobSize += size
|
||||
}
|
||||
}
|
||||
|
||||
// List all blobs on remote
|
||||
// scanRemoteBlobStorage lists all blobs on remote and computes orphan stats
|
||||
func (v *Vaultik) scanRemoteBlobStorage(result *RemoteInfoResult, referencedBlobs map[string]int64, jsonOutput bool) error {
|
||||
if !jsonOutput {
|
||||
v.printfStdout("Scanning blobs...\n")
|
||||
}
|
||||
|
||||
allBlobs := make(map[string]int64) // hash -> size from storage
|
||||
|
||||
blobCh := v.Storage.ListStream(v.ctx, "blobs/")
|
||||
allBlobs := make(map[string]int64)
|
||||
|
||||
for obj := range blobCh {
|
||||
if obj.Err != nil {
|
||||
return fmt.Errorf("listing blobs: %w", obj.Err)
|
||||
}
|
||||
|
||||
// Extract hash from key: blobs/xx/yy/hash
|
||||
parts := strings.Split(obj.Key, "/")
|
||||
if len(parts) < 4 {
|
||||
continue
|
||||
@ -282,7 +301,6 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
result.TotalBlobSize += obj.Size
|
||||
}
|
||||
|
||||
// Calculate orphaned blobs
|
||||
for hash, size := range allBlobs {
|
||||
if _, referenced := referencedBlobs[hash]; !referenced {
|
||||
result.OrphanedBlobCount++
|
||||
@ -290,14 +308,11 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Output results
|
||||
if jsonOutput {
|
||||
enc := json.NewEncoder(v.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(result)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Human-readable output
|
||||
// printRemoteInfoTable renders the human-readable remote info output
|
||||
func (v *Vaultik) printRemoteInfoTable(result *RemoteInfoResult) {
|
||||
v.printfStdout("\n=== Snapshot Metadata ===\n")
|
||||
if len(result.Snapshots) == 0 {
|
||||
v.printfStdout("No snapshots found\n")
|
||||
@ -320,20 +335,15 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
|
||||
v.printfStdout("\n=== Blob Storage ===\n")
|
||||
v.printfStdout("Total blobs on remote: %s (%s)\n",
|
||||
humanize.Comma(int64(result.TotalBlobCount)),
|
||||
humanize.Bytes(uint64(result.TotalBlobSize)))
|
||||
humanize.Comma(int64(result.TotalBlobCount)), humanize.Bytes(uint64(result.TotalBlobSize)))
|
||||
v.printfStdout("Referenced by snapshots: %s (%s)\n",
|
||||
humanize.Comma(int64(result.ReferencedBlobCount)),
|
||||
humanize.Bytes(uint64(result.ReferencedBlobSize)))
|
||||
humanize.Comma(int64(result.ReferencedBlobCount)), humanize.Bytes(uint64(result.ReferencedBlobSize)))
|
||||
v.printfStdout("Orphaned (unreferenced): %s (%s)\n",
|
||||
humanize.Comma(int64(result.OrphanedBlobCount)),
|
||||
humanize.Bytes(uint64(result.OrphanedBlobSize)))
|
||||
humanize.Comma(int64(result.OrphanedBlobCount)), humanize.Bytes(uint64(result.OrphanedBlobSize)))
|
||||
|
||||
if result.OrphanedBlobCount > 0 {
|
||||
v.printfStdout("\nRun 'vaultik prune --remote' to remove orphaned blobs.\n")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// truncateString truncates a string to maxLen, adding "..." if truncated
|
||||
|
||||
@ -27,95 +27,19 @@ type PruneBlobsResult struct {
|
||||
func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
log.Info("Starting prune operation")
|
||||
|
||||
// Get all remote snapshots and their manifests
|
||||
allBlobsReferenced := make(map[string]bool)
|
||||
manifestCount := 0
|
||||
|
||||
// List all snapshots in storage
|
||||
log.Info("Listing remote snapshots")
|
||||
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
|
||||
|
||||
var snapshotIDs []string
|
||||
for object := range objectCh {
|
||||
if object.Err != nil {
|
||||
return fmt.Errorf("listing remote snapshots: %w", object.Err)
|
||||
}
|
||||
|
||||
// Extract snapshot ID from paths like metadata/hostname-20240115-143052Z/
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
|
||||
// Check if this is a directory by looking for trailing slash
|
||||
if strings.HasSuffix(object.Key, "/") || strings.Contains(object.Key, "/manifest.json.zst") {
|
||||
snapshotID := parts[1]
|
||||
// Only add unique snapshot IDs
|
||||
found := false
|
||||
for _, id := range snapshotIDs {
|
||||
if id == snapshotID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
snapshotIDs = append(snapshotIDs, snapshotID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Found manifests in remote storage", "count", len(snapshotIDs))
|
||||
|
||||
// Download and parse each manifest to get referenced blobs
|
||||
for _, snapshotID := range snapshotIDs {
|
||||
log.Debug("Processing manifest", "snapshot_id", snapshotID)
|
||||
|
||||
manifest, err := v.downloadManifest(snapshotID)
|
||||
allBlobsReferenced, err := v.collectReferencedBlobs()
|
||||
if err != nil {
|
||||
log.Error("Failed to download manifest", "snapshot_id", snapshotID, "error", err)
|
||||
continue
|
||||
return err
|
||||
}
|
||||
|
||||
// Add all blobs from this manifest to our referenced set
|
||||
for _, blob := range manifest.Blobs {
|
||||
allBlobsReferenced[blob.Hash] = true
|
||||
}
|
||||
manifestCount++
|
||||
allBlobs, err := v.listAllRemoteBlobs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Processed manifests", "count", manifestCount, "unique_blobs_referenced", len(allBlobsReferenced))
|
||||
unreferencedBlobs, totalSize := v.findUnreferencedBlobs(allBlobs, allBlobsReferenced)
|
||||
|
||||
// List all blobs in storage
|
||||
log.Info("Listing all blobs in storage")
|
||||
allBlobs := make(map[string]int64) // hash -> size
|
||||
blobObjectCh := v.Storage.ListStream(v.ctx, "blobs/")
|
||||
|
||||
for object := range blobObjectCh {
|
||||
if object.Err != nil {
|
||||
return fmt.Errorf("listing blobs: %w", object.Err)
|
||||
}
|
||||
|
||||
// Extract hash from path like blobs/ab/cd/abcdef123456...
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) == 4 && parts[0] == "blobs" {
|
||||
hash := parts[3]
|
||||
allBlobs[hash] = object.Size
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Found blobs in storage", "count", len(allBlobs))
|
||||
|
||||
// Find unreferenced blobs
|
||||
var unreferencedBlobs []string
|
||||
var totalSize int64
|
||||
for hash, size := range allBlobs {
|
||||
if !allBlobsReferenced[hash] {
|
||||
unreferencedBlobs = append(unreferencedBlobs, hash)
|
||||
totalSize += size
|
||||
}
|
||||
}
|
||||
|
||||
result := &PruneBlobsResult{
|
||||
BlobsFound: len(unreferencedBlobs),
|
||||
}
|
||||
result := &PruneBlobsResult{BlobsFound: len(unreferencedBlobs)}
|
||||
|
||||
if len(unreferencedBlobs) == 0 {
|
||||
log.Info("No unreferenced blobs found")
|
||||
@ -126,18 +50,15 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Show what will be deleted
|
||||
log.Info("Found unreferenced blobs", "count", len(unreferencedBlobs), "total_size", humanize.Bytes(uint64(totalSize)))
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Found %d unreferenced blob(s) totaling %s\n", len(unreferencedBlobs), humanize.Bytes(uint64(totalSize)))
|
||||
}
|
||||
|
||||
// Confirm unless --force is used (skip in JSON mode - require --force)
|
||||
if !opts.Force && !opts.JSON {
|
||||
v.printfStdout("\nDelete %d unreferenced blob(s)? [y/N] ", len(unreferencedBlobs))
|
||||
var confirm string
|
||||
if _, err := v.scanStdin(&confirm); err != nil {
|
||||
// Treat EOF or error as "no"
|
||||
v.printlnStdout("Cancelled")
|
||||
return nil
|
||||
}
|
||||
@ -147,10 +68,109 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Delete unreferenced blobs
|
||||
v.deleteUnreferencedBlobs(unreferencedBlobs, allBlobs, result)
|
||||
|
||||
if opts.JSON {
|
||||
return v.outputPruneBlobsJSON(result)
|
||||
}
|
||||
|
||||
v.printfStdout("\nDeleted %d blob(s) totaling %s\n", result.BlobsDeleted, humanize.Bytes(uint64(result.BytesFreed)))
|
||||
if result.BlobsFailed > 0 {
|
||||
v.printfStdout("Failed to delete %d blob(s)\n", result.BlobsFailed)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// collectReferencedBlobs downloads all manifests and returns the set of referenced blob hashes
|
||||
func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) {
|
||||
log.Info("Listing remote snapshots")
|
||||
snapshotIDs, err := v.listUniqueSnapshotIDs()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing snapshot IDs: %w", err)
|
||||
}
|
||||
log.Info("Found manifests in remote storage", "count", len(snapshotIDs))
|
||||
|
||||
allBlobsReferenced := make(map[string]bool)
|
||||
manifestCount := 0
|
||||
|
||||
for _, snapshotID := range snapshotIDs {
|
||||
log.Debug("Processing manifest", "snapshot_id", snapshotID)
|
||||
manifest, err := v.downloadManifest(snapshotID)
|
||||
if err != nil {
|
||||
log.Error("Failed to download manifest", "snapshot_id", snapshotID, "error", err)
|
||||
continue
|
||||
}
|
||||
for _, blob := range manifest.Blobs {
|
||||
allBlobsReferenced[blob.Hash] = true
|
||||
}
|
||||
manifestCount++
|
||||
}
|
||||
|
||||
log.Info("Processed manifests", "count", manifestCount, "unique_blobs_referenced", len(allBlobsReferenced))
|
||||
return allBlobsReferenced, nil
|
||||
}
|
||||
|
||||
// listUniqueSnapshotIDs returns deduplicated snapshot IDs from remote metadata
|
||||
func (v *Vaultik) listUniqueSnapshotIDs() ([]string, error) {
|
||||
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
|
||||
seen := make(map[string]bool)
|
||||
var snapshotIDs []string
|
||||
|
||||
for object := range objectCh {
|
||||
if object.Err != nil {
|
||||
return nil, fmt.Errorf("listing metadata objects: %w", object.Err)
|
||||
}
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
|
||||
if strings.HasSuffix(object.Key, "/") || strings.Contains(object.Key, "/manifest.json.zst") {
|
||||
snapshotID := parts[1]
|
||||
if !seen[snapshotID] {
|
||||
seen[snapshotID] = true
|
||||
snapshotIDs = append(snapshotIDs, snapshotID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return snapshotIDs, nil
|
||||
}
|
||||
|
||||
// listAllRemoteBlobs returns a map of all blob hashes to their sizes in remote storage
|
||||
func (v *Vaultik) listAllRemoteBlobs() (map[string]int64, error) {
|
||||
log.Info("Listing all blobs in storage")
|
||||
allBlobs := make(map[string]int64)
|
||||
blobObjectCh := v.Storage.ListStream(v.ctx, "blobs/")
|
||||
|
||||
for object := range blobObjectCh {
|
||||
if object.Err != nil {
|
||||
return nil, fmt.Errorf("listing blobs: %w", object.Err)
|
||||
}
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) == 4 && parts[0] == "blobs" {
|
||||
allBlobs[parts[3]] = object.Size
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Found blobs in storage", "count", len(allBlobs))
|
||||
return allBlobs, nil
|
||||
}
|
||||
|
||||
// findUnreferencedBlobs returns blob hashes not referenced by any manifest and their total size
|
||||
func (v *Vaultik) findUnreferencedBlobs(allBlobs map[string]int64, referenced map[string]bool) ([]string, int64) {
|
||||
var unreferenced []string
|
||||
var totalSize int64
|
||||
for hash, size := range allBlobs {
|
||||
if !referenced[hash] {
|
||||
unreferenced = append(unreferenced, hash)
|
||||
totalSize += size
|
||||
}
|
||||
}
|
||||
return unreferenced, totalSize
|
||||
}
|
||||
|
||||
// deleteUnreferencedBlobs deletes the given blobs from storage and populates the result
|
||||
func (v *Vaultik) deleteUnreferencedBlobs(unreferencedBlobs []string, allBlobs map[string]int64, result *PruneBlobsResult) {
|
||||
log.Info("Deleting unreferenced blobs")
|
||||
deletedCount := 0
|
||||
deletedSize := int64(0)
|
||||
|
||||
for i, hash := range unreferencedBlobs {
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", hash[:2], hash[2:4], hash)
|
||||
@ -160,10 +180,9 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
continue
|
||||
}
|
||||
|
||||
deletedCount++
|
||||
deletedSize += allBlobs[hash]
|
||||
result.BlobsDeleted++
|
||||
result.BytesFreed += allBlobs[hash]
|
||||
|
||||
// Progress update every 100 blobs
|
||||
if (i+1)%100 == 0 || i == len(unreferencedBlobs)-1 {
|
||||
log.Info("Deletion progress",
|
||||
"deleted", i+1,
|
||||
@ -173,26 +192,13 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
result.BlobsDeleted = deletedCount
|
||||
result.BlobsFailed = len(unreferencedBlobs) - deletedCount
|
||||
result.BytesFreed = deletedSize
|
||||
result.BlobsFailed = len(unreferencedBlobs) - result.BlobsDeleted
|
||||
|
||||
log.Info("Prune complete",
|
||||
"deleted_count", deletedCount,
|
||||
"deleted_size", humanize.Bytes(uint64(deletedSize)),
|
||||
"failed", len(unreferencedBlobs)-deletedCount,
|
||||
"deleted_count", result.BlobsDeleted,
|
||||
"deleted_size", humanize.Bytes(uint64(result.BytesFreed)),
|
||||
"failed", result.BlobsFailed,
|
||||
)
|
||||
|
||||
if opts.JSON {
|
||||
return v.outputPruneBlobsJSON(result)
|
||||
}
|
||||
|
||||
v.printfStdout("\nDeleted %d blob(s) totaling %s\n", deletedCount, humanize.Bytes(uint64(deletedSize)))
|
||||
if deletedCount < len(unreferencedBlobs) {
|
||||
v.printfStdout("Failed to delete %d blob(s)\n", len(unreferencedBlobs)-deletedCount)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// outputPruneBlobsJSON outputs the prune result as JSON
|
||||
|
||||
@ -111,40 +111,34 @@ func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// snapshotStats tracks aggregate statistics across directory scans
|
||||
type snapshotStats struct {
|
||||
totalFiles int
|
||||
totalBytes int64
|
||||
totalChunks int
|
||||
totalBlobs int
|
||||
totalBytesSkipped int64
|
||||
totalFilesSkipped int
|
||||
totalFilesDeleted int
|
||||
totalBytesDeleted int64
|
||||
totalBytesUploaded int64
|
||||
totalBlobsUploaded int
|
||||
uploadDuration time.Duration
|
||||
}
|
||||
|
||||
// createNamedSnapshot creates a single named snapshot
|
||||
func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, snapName string, idx, total int) error {
|
||||
snapshotStartTime := time.Now()
|
||||
|
||||
snapConfig := v.Config.Snapshots[snapName]
|
||||
|
||||
if total > 1 {
|
||||
v.printfStdout("\n=== Snapshot %d/%d: %s ===\n", idx, total, snapName)
|
||||
}
|
||||
|
||||
// Resolve source directories to absolute paths
|
||||
resolvedDirs := make([]string, 0, len(snapConfig.Paths))
|
||||
for _, dir := range snapConfig.Paths {
|
||||
absPath, err := filepath.Abs(dir)
|
||||
resolvedDirs, err := v.resolveSnapshotPaths(snapName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to resolve absolute path for %s: %w", dir, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Resolve symlinks
|
||||
resolvedPath, err := filepath.EvalSymlinks(absPath)
|
||||
if err != nil {
|
||||
// If the path doesn't exist yet, use the absolute path
|
||||
if os.IsNotExist(err) {
|
||||
resolvedPath = absPath
|
||||
} else {
|
||||
return fmt.Errorf("failed to resolve symlinks for %s: %w", absPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
resolvedDirs = append(resolvedDirs, resolvedPath)
|
||||
}
|
||||
|
||||
// Create scanner with progress enabled (unless in cron mode)
|
||||
// Pass the combined excludes for this snapshot
|
||||
scanner := v.ScannerFactory(snapshot.ScannerParams{
|
||||
EnableProgress: !opts.Cron,
|
||||
Fs: v.Fs,
|
||||
@ -152,20 +146,6 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna
|
||||
SkipErrors: opts.SkipErrors,
|
||||
})
|
||||
|
||||
// Statistics tracking
|
||||
totalFiles := 0
|
||||
totalBytes := int64(0)
|
||||
totalChunks := 0
|
||||
totalBlobs := 0
|
||||
totalBytesSkipped := int64(0)
|
||||
totalFilesSkipped := 0
|
||||
totalFilesDeleted := 0
|
||||
totalBytesDeleted := int64(0)
|
||||
totalBytesUploaded := int64(0)
|
||||
totalBlobsUploaded := 0
|
||||
uploadDuration := time.Duration(0)
|
||||
|
||||
// Create a new snapshot at the beginning (with snapshot name in ID)
|
||||
snapshotID, err := v.SnapshotManager.CreateSnapshotWithName(v.ctx, hostname, snapName, v.Globals.Version, v.Globals.Commit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating snapshot: %w", err)
|
||||
@ -173,12 +153,56 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna
|
||||
log.Info("Beginning snapshot", "snapshot_id", snapshotID, "name", snapName)
|
||||
v.printfStdout("Beginning snapshot: %s\n", snapshotID)
|
||||
|
||||
stats, err := v.scanAllDirectories(scanner, resolvedDirs, snapshotID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
v.collectUploadStats(scanner, stats)
|
||||
|
||||
if err := v.finalizeSnapshotMetadata(snapshotID, stats); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
v.printSnapshotSummary(snapshotID, snapshotStartTime, stats)
|
||||
return nil
|
||||
}
|
||||
|
||||
// resolveSnapshotPaths resolves source directories to absolute paths with symlink resolution
|
||||
func (v *Vaultik) resolveSnapshotPaths(snapName string) ([]string, error) {
|
||||
snapConfig := v.Config.Snapshots[snapName]
|
||||
resolvedDirs := make([]string, 0, len(snapConfig.Paths))
|
||||
|
||||
for _, dir := range snapConfig.Paths {
|
||||
absPath, err := filepath.Abs(dir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to resolve absolute path for %s: %w", dir, err)
|
||||
}
|
||||
|
||||
resolvedPath, err := filepath.EvalSymlinks(absPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
resolvedPath = absPath
|
||||
} else {
|
||||
return nil, fmt.Errorf("failed to resolve symlinks for %s: %w", absPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
resolvedDirs = append(resolvedDirs, resolvedPath)
|
||||
}
|
||||
|
||||
return resolvedDirs, nil
|
||||
}
|
||||
|
||||
// scanAllDirectories runs the scanner on each resolved directory and accumulates stats
|
||||
func (v *Vaultik) scanAllDirectories(scanner *snapshot.Scanner, resolvedDirs []string, snapshotID string) (*snapshotStats, error) {
|
||||
stats := &snapshotStats{}
|
||||
|
||||
for i, dir := range resolvedDirs {
|
||||
// Check if context is cancelled
|
||||
select {
|
||||
case <-v.ctx.Done():
|
||||
log.Info("Snapshot creation cancelled")
|
||||
return v.ctx.Err()
|
||||
return nil, v.ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
@ -186,17 +210,17 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna
|
||||
v.printfStdout("Beginning directory scan (%d/%d): %s\n", i+1, len(resolvedDirs), dir)
|
||||
result, err := scanner.Scan(v.ctx, dir, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to scan %s: %w", dir, err)
|
||||
return nil, fmt.Errorf("failed to scan %s: %w", dir, err)
|
||||
}
|
||||
|
||||
totalFiles += result.FilesScanned
|
||||
totalBytes += result.BytesScanned
|
||||
totalChunks += result.ChunksCreated
|
||||
totalBlobs += result.BlobsCreated
|
||||
totalFilesSkipped += result.FilesSkipped
|
||||
totalBytesSkipped += result.BytesSkipped
|
||||
totalFilesDeleted += result.FilesDeleted
|
||||
totalBytesDeleted += result.BytesDeleted
|
||||
stats.totalFiles += result.FilesScanned
|
||||
stats.totalBytes += result.BytesScanned
|
||||
stats.totalChunks += result.ChunksCreated
|
||||
stats.totalBlobs += result.BlobsCreated
|
||||
stats.totalFilesSkipped += result.FilesSkipped
|
||||
stats.totalBytesSkipped += result.BytesSkipped
|
||||
stats.totalFilesDeleted += result.FilesDeleted
|
||||
stats.totalBytesDeleted += result.BytesDeleted
|
||||
|
||||
log.Info("Directory scan complete",
|
||||
"path", dir,
|
||||
@ -207,85 +231,79 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna
|
||||
"chunks", result.ChunksCreated,
|
||||
"blobs", result.BlobsCreated,
|
||||
"duration", result.EndTime.Sub(result.StartTime))
|
||||
|
||||
// Remove per-directory summary - the scanner already prints its own summary
|
||||
}
|
||||
|
||||
// Get upload statistics from scanner progress if available
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// collectUploadStats gathers upload statistics from the scanner's progress reporter
|
||||
func (v *Vaultik) collectUploadStats(scanner *snapshot.Scanner, stats *snapshotStats) {
|
||||
if s := scanner.GetProgress(); s != nil {
|
||||
stats := s.GetStats()
|
||||
totalBytesUploaded = stats.BytesUploaded.Load()
|
||||
totalBlobsUploaded = int(stats.BlobsUploaded.Load())
|
||||
uploadDuration = time.Duration(stats.UploadDurationMs.Load()) * time.Millisecond
|
||||
progressStats := s.GetStats()
|
||||
stats.totalBytesUploaded = progressStats.BytesUploaded.Load()
|
||||
stats.totalBlobsUploaded = int(progressStats.BlobsUploaded.Load())
|
||||
stats.uploadDuration = time.Duration(progressStats.UploadDurationMs.Load()) * time.Millisecond
|
||||
}
|
||||
}
|
||||
|
||||
// Update snapshot statistics with extended fields
|
||||
// finalizeSnapshotMetadata updates stats, marks complete, and exports metadata
|
||||
func (v *Vaultik) finalizeSnapshotMetadata(snapshotID string, stats *snapshotStats) error {
|
||||
extStats := snapshot.ExtendedBackupStats{
|
||||
BackupStats: snapshot.BackupStats{
|
||||
FilesScanned: totalFiles,
|
||||
BytesScanned: totalBytes,
|
||||
ChunksCreated: totalChunks,
|
||||
BlobsCreated: totalBlobs,
|
||||
BytesUploaded: totalBytesUploaded,
|
||||
FilesScanned: stats.totalFiles,
|
||||
BytesScanned: stats.totalBytes,
|
||||
ChunksCreated: stats.totalChunks,
|
||||
BlobsCreated: stats.totalBlobs,
|
||||
BytesUploaded: stats.totalBytesUploaded,
|
||||
},
|
||||
BlobUncompressedSize: 0, // Will be set from database query below
|
||||
BlobUncompressedSize: 0,
|
||||
CompressionLevel: v.Config.CompressionLevel,
|
||||
UploadDurationMs: uploadDuration.Milliseconds(),
|
||||
UploadDurationMs: stats.uploadDuration.Milliseconds(),
|
||||
}
|
||||
|
||||
if err := v.SnapshotManager.UpdateSnapshotStatsExtended(v.ctx, snapshotID, extStats); err != nil {
|
||||
return fmt.Errorf("updating snapshot stats: %w", err)
|
||||
}
|
||||
|
||||
// Mark snapshot as complete
|
||||
if err := v.SnapshotManager.CompleteSnapshot(v.ctx, snapshotID); err != nil {
|
||||
return fmt.Errorf("completing snapshot: %w", err)
|
||||
}
|
||||
|
||||
// Export snapshot metadata
|
||||
// Export snapshot metadata without closing the database
|
||||
// The export function should handle its own database connection
|
||||
if err := v.SnapshotManager.ExportSnapshotMetadata(v.ctx, v.Config.IndexPath, snapshotID); err != nil {
|
||||
return fmt.Errorf("exporting snapshot metadata: %w", err)
|
||||
}
|
||||
|
||||
// Calculate final statistics
|
||||
snapshotDuration := time.Since(snapshotStartTime)
|
||||
totalFilesChanged := totalFiles - totalFilesSkipped
|
||||
totalBytesChanged := totalBytes
|
||||
totalBytesAll := totalBytes + totalBytesSkipped
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calculate upload speed
|
||||
var avgUploadSpeed string
|
||||
if totalBytesUploaded > 0 && uploadDuration > 0 {
|
||||
bytesPerSec := float64(totalBytesUploaded) / uploadDuration.Seconds()
|
||||
// formatUploadSpeed formats bytes uploaded and duration into a human-readable speed string
|
||||
func formatUploadSpeed(bytesUploaded int64, duration time.Duration) string {
|
||||
if bytesUploaded <= 0 || duration <= 0 {
|
||||
return "N/A"
|
||||
}
|
||||
bytesPerSec := float64(bytesUploaded) / duration.Seconds()
|
||||
bitsPerSec := bytesPerSec * 8
|
||||
if bitsPerSec >= 1e9 {
|
||||
avgUploadSpeed = fmt.Sprintf("%.1f Gbit/s", bitsPerSec/1e9)
|
||||
} else if bitsPerSec >= 1e6 {
|
||||
avgUploadSpeed = fmt.Sprintf("%.0f Mbit/s", bitsPerSec/1e6)
|
||||
} else if bitsPerSec >= 1e3 {
|
||||
avgUploadSpeed = fmt.Sprintf("%.0f Kbit/s", bitsPerSec/1e3)
|
||||
} else {
|
||||
avgUploadSpeed = fmt.Sprintf("%.0f bit/s", bitsPerSec)
|
||||
}
|
||||
} else {
|
||||
avgUploadSpeed = "N/A"
|
||||
switch {
|
||||
case bitsPerSec >= 1e9:
|
||||
return fmt.Sprintf("%.1f Gbit/s", bitsPerSec/1e9)
|
||||
case bitsPerSec >= 1e6:
|
||||
return fmt.Sprintf("%.0f Mbit/s", bitsPerSec/1e6)
|
||||
case bitsPerSec >= 1e3:
|
||||
return fmt.Sprintf("%.0f Kbit/s", bitsPerSec/1e3)
|
||||
default:
|
||||
return fmt.Sprintf("%.0f bit/s", bitsPerSec)
|
||||
}
|
||||
}
|
||||
|
||||
// printSnapshotSummary prints the comprehensive snapshot completion summary
|
||||
func (v *Vaultik) printSnapshotSummary(snapshotID string, startTime time.Time, stats *snapshotStats) {
|
||||
snapshotDuration := time.Since(startTime)
|
||||
totalFilesChanged := stats.totalFiles - stats.totalFilesSkipped
|
||||
totalBytesAll := stats.totalBytes + stats.totalBytesSkipped
|
||||
|
||||
// Get total blob sizes from database
|
||||
totalBlobSizeCompressed := int64(0)
|
||||
totalBlobSizeUncompressed := int64(0)
|
||||
if blobHashes, err := v.Repositories.Snapshots.GetBlobHashes(v.ctx, snapshotID); err == nil {
|
||||
for _, hash := range blobHashes {
|
||||
if blob, err := v.Repositories.Blobs.GetByHash(v.ctx, hash); err == nil && blob != nil {
|
||||
totalBlobSizeCompressed += blob.CompressedSize
|
||||
totalBlobSizeUncompressed += blob.UncompressedSize
|
||||
}
|
||||
}
|
||||
}
|
||||
totalBlobSizeCompressed, totalBlobSizeUncompressed := v.getSnapshotBlobSizes(snapshotID)
|
||||
|
||||
// Calculate compression ratio
|
||||
var compressionRatio float64
|
||||
if totalBlobSizeUncompressed > 0 {
|
||||
compressionRatio = float64(totalBlobSizeCompressed) / float64(totalBlobSizeUncompressed)
|
||||
@ -293,55 +311,95 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna
|
||||
compressionRatio = 1.0
|
||||
}
|
||||
|
||||
// Print comprehensive summary
|
||||
v.printfStdout("=== Snapshot Complete ===\n")
|
||||
v.printfStdout("ID: %s\n", snapshotID)
|
||||
v.printfStdout("Files: %s examined, %s to process, %s unchanged",
|
||||
formatNumber(totalFiles),
|
||||
formatNumber(stats.totalFiles),
|
||||
formatNumber(totalFilesChanged),
|
||||
formatNumber(totalFilesSkipped))
|
||||
if totalFilesDeleted > 0 {
|
||||
v.printfStdout(", %s deleted", formatNumber(totalFilesDeleted))
|
||||
formatNumber(stats.totalFilesSkipped))
|
||||
if stats.totalFilesDeleted > 0 {
|
||||
v.printfStdout(", %s deleted", formatNumber(stats.totalFilesDeleted))
|
||||
}
|
||||
v.printlnStdout()
|
||||
v.printfStdout("Data: %s total (%s to process)",
|
||||
humanize.Bytes(uint64(totalBytesAll)),
|
||||
humanize.Bytes(uint64(totalBytesChanged)))
|
||||
if totalBytesDeleted > 0 {
|
||||
v.printfStdout(", %s deleted", humanize.Bytes(uint64(totalBytesDeleted)))
|
||||
humanize.Bytes(uint64(stats.totalBytes)))
|
||||
if stats.totalBytesDeleted > 0 {
|
||||
v.printfStdout(", %s deleted", humanize.Bytes(uint64(stats.totalBytesDeleted)))
|
||||
}
|
||||
v.printlnStdout()
|
||||
if totalBlobsUploaded > 0 {
|
||||
if stats.totalBlobsUploaded > 0 {
|
||||
v.printfStdout("Storage: %s compressed from %s (%.2fx)\n",
|
||||
humanize.Bytes(uint64(totalBlobSizeCompressed)),
|
||||
humanize.Bytes(uint64(totalBlobSizeUncompressed)),
|
||||
compressionRatio)
|
||||
v.printfStdout("Upload: %d blobs, %s in %s (%s)\n",
|
||||
totalBlobsUploaded,
|
||||
humanize.Bytes(uint64(totalBytesUploaded)),
|
||||
formatDuration(uploadDuration),
|
||||
avgUploadSpeed)
|
||||
stats.totalBlobsUploaded,
|
||||
humanize.Bytes(uint64(stats.totalBytesUploaded)),
|
||||
formatDuration(stats.uploadDuration),
|
||||
formatUploadSpeed(stats.totalBytesUploaded, stats.uploadDuration))
|
||||
}
|
||||
v.printfStdout("Duration: %s\n", formatDuration(snapshotDuration))
|
||||
}
|
||||
|
||||
return nil
|
||||
// getSnapshotBlobSizes returns total compressed and uncompressed blob sizes for a snapshot
|
||||
func (v *Vaultik) getSnapshotBlobSizes(snapshotID string) (compressed int64, uncompressed int64) {
|
||||
blobHashes, err := v.Repositories.Snapshots.GetBlobHashes(v.ctx, snapshotID)
|
||||
if err != nil {
|
||||
return 0, 0
|
||||
}
|
||||
for _, hash := range blobHashes {
|
||||
if blob, err := v.Repositories.Blobs.GetByHash(v.ctx, hash); err == nil && blob != nil {
|
||||
compressed += blob.CompressedSize
|
||||
uncompressed += blob.UncompressedSize
|
||||
}
|
||||
}
|
||||
return compressed, uncompressed
|
||||
}
|
||||
|
||||
// ListSnapshots lists all snapshots
|
||||
func (v *Vaultik) ListSnapshots(jsonOutput bool) error {
|
||||
// Get all remote snapshots
|
||||
remoteSnapshots, err := v.listRemoteSnapshotIDs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
localSnapshotMap, err := v.reconcileLocalWithRemote(remoteSnapshots)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
snapshots, err := v.buildSnapshotInfoList(remoteSnapshots, localSnapshotMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Sort by timestamp (newest first)
|
||||
sort.Slice(snapshots, func(i, j int) bool {
|
||||
return snapshots[i].Timestamp.After(snapshots[j].Timestamp)
|
||||
})
|
||||
|
||||
if jsonOutput {
|
||||
encoder := json.NewEncoder(v.Stdout)
|
||||
encoder.SetIndent("", " ")
|
||||
return encoder.Encode(snapshots)
|
||||
}
|
||||
|
||||
return v.printSnapshotTable(snapshots)
|
||||
}
|
||||
|
||||
// listRemoteSnapshotIDs returns a set of snapshot IDs found in remote storage
|
||||
func (v *Vaultik) listRemoteSnapshotIDs() (map[string]bool, error) {
|
||||
remoteSnapshots := make(map[string]bool)
|
||||
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
|
||||
|
||||
for object := range objectCh {
|
||||
if object.Err != nil {
|
||||
return fmt.Errorf("listing remote snapshots: %w", object.Err)
|
||||
return nil, fmt.Errorf("listing remote snapshots: %w", object.Err)
|
||||
}
|
||||
|
||||
// Extract snapshot ID from paths like metadata/hostname-20240115-143052Z/
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
|
||||
// Skip macOS resource fork files (._*) and other hidden files
|
||||
if strings.HasPrefix(parts[1], ".") {
|
||||
continue
|
||||
}
|
||||
@ -349,56 +407,46 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Get all local snapshots
|
||||
return remoteSnapshots, nil
|
||||
}
|
||||
|
||||
// reconcileLocalWithRemote removes local snapshots not in remote and returns the surviving local map
|
||||
func (v *Vaultik) reconcileLocalWithRemote(remoteSnapshots map[string]bool) (map[string]*database.Snapshot, error) {
|
||||
localSnapshots, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing local snapshots: %w", err)
|
||||
return nil, fmt.Errorf("listing local snapshots: %w", err)
|
||||
}
|
||||
|
||||
// Build a map of local snapshots for quick lookup
|
||||
localSnapshotMap := make(map[string]*database.Snapshot)
|
||||
for _, s := range localSnapshots {
|
||||
localSnapshotMap[s.ID.String()] = s
|
||||
}
|
||||
|
||||
// Remove local snapshots that don't exist remotely
|
||||
for _, snapshot := range localSnapshots {
|
||||
snapshotIDStr := snapshot.ID.String()
|
||||
for _, snap := range localSnapshots {
|
||||
snapshotIDStr := snap.ID.String()
|
||||
if !remoteSnapshots[snapshotIDStr] {
|
||||
log.Info("Removing local snapshot not found in remote", "snapshot_id", snapshot.ID)
|
||||
|
||||
// Delete related records first to avoid foreign key constraints
|
||||
if err := v.Repositories.Snapshots.DeleteSnapshotFiles(v.ctx, snapshotIDStr); err != nil {
|
||||
log.Error("Failed to delete snapshot files", "snapshot_id", snapshot.ID, "error", err)
|
||||
}
|
||||
if err := v.Repositories.Snapshots.DeleteSnapshotBlobs(v.ctx, snapshotIDStr); err != nil {
|
||||
log.Error("Failed to delete snapshot blobs", "snapshot_id", snapshot.ID, "error", err)
|
||||
}
|
||||
if err := v.Repositories.Snapshots.DeleteSnapshotUploads(v.ctx, snapshotIDStr); err != nil {
|
||||
log.Error("Failed to delete snapshot uploads", "snapshot_id", snapshot.ID, "error", err)
|
||||
}
|
||||
|
||||
// Now delete the snapshot itself
|
||||
if err := v.Repositories.Snapshots.Delete(v.ctx, snapshotIDStr); err != nil {
|
||||
log.Error("Failed to delete local snapshot", "snapshot_id", snapshot.ID, "error", err)
|
||||
log.Info("Removing local snapshot not found in remote", "snapshot_id", snap.ID)
|
||||
if err := v.deleteSnapshotFromLocalDB(snapshotIDStr); err != nil {
|
||||
log.Error("Failed to delete local snapshot", "snapshot_id", snap.ID, "error", err)
|
||||
} else {
|
||||
log.Info("Deleted local snapshot not found in remote", "snapshot_id", snapshot.ID)
|
||||
log.Info("Deleted local snapshot not found in remote", "snapshot_id", snap.ID)
|
||||
delete(localSnapshotMap, snapshotIDStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build final snapshot list
|
||||
return localSnapshotMap, nil
|
||||
}
|
||||
|
||||
// buildSnapshotInfoList constructs SnapshotInfo entries from remote IDs and local data
|
||||
func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSnapshotMap map[string]*database.Snapshot) ([]SnapshotInfo, error) {
|
||||
snapshots := make([]SnapshotInfo, 0, len(remoteSnapshots))
|
||||
|
||||
for snapshotID := range remoteSnapshots {
|
||||
// Check if we have this snapshot locally
|
||||
if localSnap, exists := localSnapshotMap[snapshotID]; exists && localSnap.CompletedAt != nil {
|
||||
// Get total compressed size of all blobs referenced by this snapshot
|
||||
totalSize, err := v.Repositories.Snapshots.GetSnapshotTotalCompressedSize(v.ctx, snapshotID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to get total compressed size", "id", snapshotID, "error", err)
|
||||
// Fall back to stored blob size
|
||||
totalSize = localSnap.BlobSize
|
||||
}
|
||||
|
||||
@ -408,17 +456,15 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error {
|
||||
CompressedSize: totalSize,
|
||||
})
|
||||
} else {
|
||||
// Remote snapshot not in local DB - fetch manifest to get size
|
||||
timestamp, err := parseSnapshotTimestamp(snapshotID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to parse snapshot timestamp", "id", snapshotID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Try to download manifest to get size
|
||||
totalSize, err := v.getManifestSize(snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get manifest size for %s: %w", snapshotID, err)
|
||||
return nil, fmt.Errorf("failed to get manifest size for %s: %w", snapshotID, err)
|
||||
}
|
||||
|
||||
snapshots = append(snapshots, SnapshotInfo{
|
||||
@ -429,22 +475,13 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by timestamp (newest first)
|
||||
sort.Slice(snapshots, func(i, j int) bool {
|
||||
return snapshots[i].Timestamp.After(snapshots[j].Timestamp)
|
||||
})
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
if jsonOutput {
|
||||
// JSON output
|
||||
encoder := json.NewEncoder(v.Stdout)
|
||||
encoder.SetIndent("", " ")
|
||||
return encoder.Encode(snapshots)
|
||||
}
|
||||
|
||||
// Table output
|
||||
// printSnapshotTable renders the snapshot list as a formatted table
|
||||
func (v *Vaultik) printSnapshotTable(snapshots []SnapshotInfo) error {
|
||||
w := tabwriter.NewWriter(v.Stdout, 0, 0, 3, ' ', 0)
|
||||
|
||||
// Show configured snapshots from config file
|
||||
if _, err := fmt.Fprintln(w, "CONFIGURED SNAPSHOTS:"); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -465,7 +502,6 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Show remote snapshots
|
||||
if _, err := fmt.Fprintln(w, "REMOTE SNAPSHOTS:"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -35,6 +35,19 @@ type VerifyResult struct {
|
||||
ErrorMessage string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// deepVerifyFailure records a failure in the result and returns it appropriately
|
||||
func (v *Vaultik) deepVerifyFailure(result *VerifyResult, opts *VerifyOptions, msg string, err error) error {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = msg
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("%s", msg)
|
||||
}
|
||||
|
||||
// RunDeepVerify executes deep verification operation
|
||||
func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
result := &VerifyResult{
|
||||
@ -42,89 +55,20 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
Mode: "deep",
|
||||
}
|
||||
|
||||
// Check for decryption capability
|
||||
if !v.CanDecrypt() {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = "VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification"
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification")
|
||||
return v.deepVerifyFailure(result, opts,
|
||||
"VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification",
|
||||
fmt.Errorf("VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification"))
|
||||
}
|
||||
|
||||
log.Info("Starting snapshot verification",
|
||||
"snapshot_id", snapshotID,
|
||||
"mode", "deep",
|
||||
)
|
||||
|
||||
log.Info("Starting snapshot verification", "snapshot_id", snapshotID, "mode", "deep")
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Deep verification of snapshot: %s\n\n", snapshotID)
|
||||
}
|
||||
|
||||
// Step 1: Download manifest
|
||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
log.Info("Downloading manifest", "path", manifestPath)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Downloading manifest...\n")
|
||||
}
|
||||
|
||||
manifestReader, err := v.Storage.Get(v.ctx, manifestPath)
|
||||
manifest, tempDB, dbBlobs, err := v.loadVerificationData(snapshotID, opts, result)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to download manifest: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to download manifest: %w", err)
|
||||
}
|
||||
defer func() { _ = manifestReader.Close() }()
|
||||
|
||||
// Decompress manifest
|
||||
manifest, err := snapshot.DecodeManifest(manifestReader)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to decode manifest: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to decode manifest: %w", err)
|
||||
}
|
||||
|
||||
log.Info("Manifest loaded",
|
||||
"manifest_blob_count", manifest.BlobCount,
|
||||
"manifest_total_size", humanize.Bytes(uint64(manifest.TotalCompressedSize)),
|
||||
)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Manifest loaded: %d blobs (%s)\n", manifest.BlobCount, humanize.Bytes(uint64(manifest.TotalCompressedSize)))
|
||||
}
|
||||
|
||||
// Step 2: Download and decrypt database (authoritative source)
|
||||
dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
||||
log.Info("Downloading encrypted database", "path", dbPath)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Downloading and decrypting database...\n")
|
||||
}
|
||||
|
||||
dbReader, err := v.Storage.Get(v.ctx, dbPath)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to download database: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to download database: %w", err)
|
||||
}
|
||||
defer func() { _ = dbReader.Close() }()
|
||||
|
||||
// Decrypt and decompress database
|
||||
tempDB, err := v.decryptAndLoadDatabase(dbReader, v.Config.AgeSecretKey)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to decrypt database: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to decrypt database: %w", err)
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if tempDB != nil {
|
||||
@ -132,17 +76,6 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
}
|
||||
}()
|
||||
|
||||
// Step 3: Get authoritative blob list from database
|
||||
dbBlobs, err := v.getBlobsFromDatabase(snapshotID, tempDB.DB)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to get blobs from database: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to get blobs from database: %w", err)
|
||||
}
|
||||
|
||||
result.BlobCount = len(dbBlobs)
|
||||
var totalSize int64
|
||||
for _, blob := range dbBlobs {
|
||||
@ -150,54 +83,10 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
}
|
||||
result.TotalSize = totalSize
|
||||
|
||||
log.Info("Database loaded",
|
||||
"db_blob_count", len(dbBlobs),
|
||||
"db_total_size", humanize.Bytes(uint64(totalSize)),
|
||||
)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Database loaded: %d blobs (%s)\n", len(dbBlobs), humanize.Bytes(uint64(totalSize)))
|
||||
v.printfStdout("Verifying manifest against database...\n")
|
||||
}
|
||||
|
||||
// Step 4: Verify manifest matches database
|
||||
if err := v.verifyManifestAgainstDatabase(manifest, dbBlobs); err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = err.Error()
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
if err := v.runVerificationSteps(manifest, dbBlobs, tempDB, opts, result, totalSize); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Step 5: Verify all blobs exist in S3 (using database as source)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Manifest verified.\n")
|
||||
v.printfStdout("Checking blob existence in remote storage...\n")
|
||||
}
|
||||
if err := v.verifyBlobExistenceFromDB(dbBlobs); err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = err.Error()
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Step 6: Deep verification - download and verify blob contents
|
||||
if !opts.JSON {
|
||||
v.printfStdout("All blobs exist.\n")
|
||||
v.printfStdout("Downloading and verifying blob contents (%d blobs, %s)...\n", len(dbBlobs), humanize.Bytes(uint64(totalSize)))
|
||||
}
|
||||
if err := v.performDeepVerificationFromDB(dbBlobs, tempDB.DB, opts); err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = err.Error()
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Success
|
||||
result.Status = "ok"
|
||||
result.Verified = len(dbBlobs)
|
||||
|
||||
@ -206,11 +95,7 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
}
|
||||
|
||||
log.Info("✓ Verification completed successfully",
|
||||
"snapshot_id", snapshotID,
|
||||
"mode", "deep",
|
||||
"blobs_verified", len(dbBlobs),
|
||||
)
|
||||
|
||||
"snapshot_id", snapshotID, "mode", "deep", "blobs_verified", len(dbBlobs))
|
||||
v.printfStdout("\n✓ Verification completed successfully\n")
|
||||
v.printfStdout(" Snapshot: %s\n", snapshotID)
|
||||
v.printfStdout(" Blobs verified: %d\n", len(dbBlobs))
|
||||
@ -219,6 +104,99 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadVerificationData downloads manifest, database, and blob list for verification
|
||||
func (v *Vaultik) loadVerificationData(snapshotID string, opts *VerifyOptions, result *VerifyResult) (*snapshot.Manifest, *tempDB, []snapshot.BlobInfo, error) {
|
||||
// Download manifest
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Downloading manifest...\n")
|
||||
}
|
||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
manifestReader, err := v.Storage.Get(v.ctx, manifestPath)
|
||||
if err != nil {
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to download manifest: %v", err),
|
||||
fmt.Errorf("failed to download manifest: %w", err))
|
||||
}
|
||||
defer func() { _ = manifestReader.Close() }()
|
||||
|
||||
manifest, err := snapshot.DecodeManifest(manifestReader)
|
||||
if err != nil {
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to decode manifest: %v", err),
|
||||
fmt.Errorf("failed to decode manifest: %w", err))
|
||||
}
|
||||
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Manifest loaded: %d blobs (%s)\n", manifest.BlobCount, humanize.Bytes(uint64(manifest.TotalCompressedSize)))
|
||||
v.printfStdout("Downloading and decrypting database...\n")
|
||||
}
|
||||
|
||||
// Download and decrypt database
|
||||
dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
||||
dbReader, err := v.Storage.Get(v.ctx, dbPath)
|
||||
if err != nil {
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to download database: %v", err),
|
||||
fmt.Errorf("failed to download database: %w", err))
|
||||
}
|
||||
defer func() { _ = dbReader.Close() }()
|
||||
|
||||
tdb, err := v.decryptAndLoadDatabase(dbReader, v.Config.AgeSecretKey)
|
||||
if err != nil {
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to decrypt database: %v", err),
|
||||
fmt.Errorf("failed to decrypt database: %w", err))
|
||||
}
|
||||
|
||||
dbBlobs, err := v.getBlobsFromDatabase(snapshotID, tdb.DB)
|
||||
if err != nil {
|
||||
_ = tdb.Close()
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to get blobs from database: %v", err),
|
||||
fmt.Errorf("failed to get blobs from database: %w", err))
|
||||
}
|
||||
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Database loaded: %d blobs (%s)\n", len(dbBlobs), humanize.Bytes(uint64(func() int64 {
|
||||
var s int64
|
||||
for _, b := range dbBlobs {
|
||||
s += b.CompressedSize
|
||||
}
|
||||
return s
|
||||
}())))
|
||||
}
|
||||
|
||||
return manifest, tdb, dbBlobs, nil
|
||||
}
|
||||
|
||||
// runVerificationSteps executes manifest verification, blob existence check, and deep content verification
|
||||
func (v *Vaultik) runVerificationSteps(manifest *snapshot.Manifest, dbBlobs []snapshot.BlobInfo, tdb *tempDB, opts *VerifyOptions, result *VerifyResult, totalSize int64) error {
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Verifying manifest against database...\n")
|
||||
}
|
||||
if err := v.verifyManifestAgainstDatabase(manifest, dbBlobs); err != nil {
|
||||
return v.deepVerifyFailure(result, opts, err.Error(), err)
|
||||
}
|
||||
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Manifest verified.\n")
|
||||
v.printfStdout("Checking blob existence in remote storage...\n")
|
||||
}
|
||||
if err := v.verifyBlobExistenceFromDB(dbBlobs); err != nil {
|
||||
return v.deepVerifyFailure(result, opts, err.Error(), err)
|
||||
}
|
||||
|
||||
if !opts.JSON {
|
||||
v.printfStdout("All blobs exist.\n")
|
||||
v.printfStdout("Downloading and verifying blob contents (%d blobs, %s)...\n", len(dbBlobs), humanize.Bytes(uint64(totalSize)))
|
||||
}
|
||||
if err := v.performDeepVerificationFromDB(dbBlobs, tdb.DB, opts); err != nil {
|
||||
return v.deepVerifyFailure(result, opts, err.Error(), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// tempDB wraps sql.DB with cleanup
|
||||
type tempDB struct {
|
||||
*sql.DB
|
||||
|
||||
Loading…
Reference in New Issue
Block a user