fix: propagate errors in uploadBlobIfNeeded and listUniqueSnapshotIDs

- scanner.go: uploadBlobIfNeeded now returns (bool, error) instead of bool,
  preventing data loss where blobs were recorded in DB but upload failures
  were silently swallowed
- prune.go: listUniqueSnapshotIDs now returns ([]string, error) instead of
  []string, preventing incorrect orphan detection when listing errors occur
This commit is contained in:
user 2026-02-20 03:50:29 -08:00
parent eb23e14799
commit 37780d59de
2 changed files with 16 additions and 9 deletions

View File

@ -942,7 +942,11 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
} }
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash) blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
blobExists := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime) blobExists, err := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime)
if err != nil {
s.cleanupBlobTempFile(blobWithReader)
return fmt.Errorf("uploading blob %s: %w", finishedBlob.Hash, err)
}
if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil { if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil {
s.cleanupBlobTempFile(blobWithReader) s.cleanupBlobTempFile(blobWithReader)
@ -963,7 +967,7 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
} }
// uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed // uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed
func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) bool { func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) (bool, error) {
finishedBlob := blobWithReader.FinishedBlob finishedBlob := blobWithReader.FinishedBlob
// Check if blob already exists (deduplication after restart) // Check if blob already exists (deduplication after restart)
@ -972,14 +976,14 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW
"hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed))) "hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed)))
fmt.Printf("Blob exists: %s (%s, skipped upload)\n", fmt.Printf("Blob exists: %s (%s, skipped upload)\n",
finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed))) finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed)))
return true return true, nil
} }
progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob) progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob)
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil { if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err) log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err)
return false return false, fmt.Errorf("uploading blob to storage: %w", err)
} }
uploadDuration := time.Since(startTime) uploadDuration := time.Since(startTime)
@ -1004,7 +1008,7 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW
stats.BytesUploaded.Add(finishedBlob.Compressed) stats.BytesUploaded.Add(finishedBlob.Compressed)
} }
return false return false, nil
} }
// makeUploadProgressCallback creates a progress callback for blob uploads // makeUploadProgressCallback creates a progress callback for blob uploads

View File

@ -85,7 +85,10 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
// collectReferencedBlobs downloads all manifests and returns the set of referenced blob hashes // collectReferencedBlobs downloads all manifests and returns the set of referenced blob hashes
func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) { func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) {
log.Info("Listing remote snapshots") log.Info("Listing remote snapshots")
snapshotIDs := v.listUniqueSnapshotIDs() snapshotIDs, err := v.listUniqueSnapshotIDs()
if err != nil {
return nil, fmt.Errorf("listing snapshot IDs: %w", err)
}
log.Info("Found manifests in remote storage", "count", len(snapshotIDs)) log.Info("Found manifests in remote storage", "count", len(snapshotIDs))
allBlobsReferenced := make(map[string]bool) allBlobsReferenced := make(map[string]bool)
@ -109,14 +112,14 @@ func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) {
} }
// listUniqueSnapshotIDs returns deduplicated snapshot IDs from remote metadata // listUniqueSnapshotIDs returns deduplicated snapshot IDs from remote metadata
func (v *Vaultik) listUniqueSnapshotIDs() []string { func (v *Vaultik) listUniqueSnapshotIDs() ([]string, error) {
objectCh := v.Storage.ListStream(v.ctx, "metadata/") objectCh := v.Storage.ListStream(v.ctx, "metadata/")
seen := make(map[string]bool) seen := make(map[string]bool)
var snapshotIDs []string var snapshotIDs []string
for object := range objectCh { for object := range objectCh {
if object.Err != nil { if object.Err != nil {
continue return nil, fmt.Errorf("listing metadata objects: %w", object.Err)
} }
parts := strings.Split(object.Key, "/") parts := strings.Split(object.Key, "/")
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" { if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
@ -129,7 +132,7 @@ func (v *Vaultik) listUniqueSnapshotIDs() []string {
} }
} }
} }
return snapshotIDs return snapshotIDs, nil
} }
// listAllRemoteBlobs returns a map of all blob hashes to their sizes in remote storage // listAllRemoteBlobs returns a map of all blob hashes to their sizes in remote storage