diff --git a/internal/snapshot/scanner.go b/internal/snapshot/scanner.go index 74391d1..5c8fa88 100644 --- a/internal/snapshot/scanner.go +++ b/internal/snapshot/scanner.go @@ -942,7 +942,11 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { } blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash) - blobExists := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime) + blobExists, err := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime) + if err != nil { + s.cleanupBlobTempFile(blobWithReader) + return fmt.Errorf("uploading blob %s: %w", finishedBlob.Hash, err) + } if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil { s.cleanupBlobTempFile(blobWithReader) @@ -963,7 +967,7 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error { } // uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed -func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) bool { +func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) (bool, error) { finishedBlob := blobWithReader.FinishedBlob // Check if blob already exists (deduplication after restart) @@ -972,14 +976,14 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW "hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed))) fmt.Printf("Blob exists: %s (%s, skipped upload)\n", finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed))) - return true + return true, nil } progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob) if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err) - return false + return false, fmt.Errorf("uploading blob to storage: %w", err) } uploadDuration := time.Since(startTime) @@ -1004,7 +1008,7 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW stats.BytesUploaded.Add(finishedBlob.Compressed) } - return false + return false, nil } // makeUploadProgressCallback creates a progress callback for blob uploads diff --git a/internal/vaultik/prune.go b/internal/vaultik/prune.go index 2aefc48..2fb1a35 100644 --- a/internal/vaultik/prune.go +++ b/internal/vaultik/prune.go @@ -85,7 +85,10 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error { // collectReferencedBlobs downloads all manifests and returns the set of referenced blob hashes func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) { log.Info("Listing remote snapshots") - snapshotIDs := v.listUniqueSnapshotIDs() + snapshotIDs, err := v.listUniqueSnapshotIDs() + if err != nil { + return nil, fmt.Errorf("listing snapshot IDs: %w", err) + } log.Info("Found manifests in remote storage", "count", len(snapshotIDs)) allBlobsReferenced := make(map[string]bool) @@ -109,14 +112,14 @@ func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) { } // listUniqueSnapshotIDs returns deduplicated snapshot IDs from remote metadata -func (v *Vaultik) listUniqueSnapshotIDs() []string { +func (v *Vaultik) listUniqueSnapshotIDs() ([]string, error) { objectCh := v.Storage.ListStream(v.ctx, "metadata/") seen := make(map[string]bool) var snapshotIDs []string for object := range objectCh { if object.Err != nil { - continue + return nil, fmt.Errorf("listing metadata objects: %w", object.Err) } parts := strings.Split(object.Key, "/") if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" { @@ -129,7 +132,7 @@ func (v *Vaultik) listUniqueSnapshotIDs() []string { } } } - return snapshotIDs + return snapshotIDs, nil } // listAllRemoteBlobs returns a map of all blob hashes to their sizes in remote storage