Two related changes, both addressing leakage and brittleness around
the public bytes the destination store sees.
First, every remote storage path that previously embedded a human
snapshot ID (e.g. metadata/heraklion_berlin.sneak.fs.photos.2026.
catalog_2026-06-24T07:00:15Z/...) now uses the hashed remote key:
RemoteSnapshotKey(id) = hex(SHA256(SHA256("vaultik|" + id)))
Applied at:
* uploadSnapshotArtifacts (snapshot create write path)
* the manifest.json.zst snapshot_id field — manifest is
unencrypted, so the human ID would otherwise be readable to
anyone with bucket-list permission
* cleanupIncompleteSnapshots metadata-existence probe
* snapshot restore / verify (downloadSnapshotDB,
loadVerificationData)
* downloadManifestByKey, deleteRemoteSnapshotByKey
* CleanupLocalSnapshots reconciliation
* the locally-driven removal paths (RemoveSnapshot,
RemoveAllSnapshots, confirmAndExecutePurge)
The local index database keeps human IDs everywhere — the hash is a
boundary translation, not a rename. A directory listing of the
backup destination now looks like
"metadata/<64-hex>/{db.zst.age,manifest.json.zst}" with no host,
snapshot-name, or timestamp information visible.
Second, snapshot list no longer fails just because remote storage is
unreachable, and only consults the remote when the local machine can
plausibly decrypt:
* Listing is always driven by the local index database — that's
what holds the human IDs, timestamps, and per-snapshot stats
that the table actually shows.
* If no age secret key is configured, we skip remote listing
entirely (the box is treated as a write-only backup machine —
there's no value showing it remote-only keys it could never
restore).
* If a key IS configured, we try the remote listing; failures
(volume unmounted, permission denied, network error) downgrade
to a warning instead of aborting the command.
* When the remote listing succeeds, we cross-reference by hashing
each local human ID and diffing against the returned key set.
Local-only snapshots get the existing "stale local record"
cleanup hint; remote-only keys are surfaced as a single
"NOTE: N remote snapshot(s) found in backup destination store
but not in local database" line.
FileStorer construction also no longer does an eager mkdir — the
basePath is recorded and the directory is created lazily on first
write. A missing or unmounted destination during `snapshot list`
should NOT block the command, and now it doesn't.
RemoveAllSnapshots is rewritten to drive deletion from the local
index instead of from a remote listing, hashing each local ID to
find the corresponding remote key. Orphan remote keys (no matching
local snapshot) are handled separately and only deleted when
--remote is set. Existing tests are updated to hash storage paths
through the new RemoteSnapshotKey helper.
The hash format is a hard pre-1.0 break: existing remote snapshots
written under the human-ID path scheme are no longer readable; they
need to be either re-uploaded under the new scheme or manually
renamed. There is no fallback path; matching the project policy of
"no migrations pre-1.0."
599 lines
18 KiB
Go
599 lines
18 KiB
Go
package vaultik
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"database/sql"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/klauspost/compress/zstd"
|
|
_ "modernc.org/sqlite"
|
|
"sneak.berlin/go/vaultik/internal/log"
|
|
"sneak.berlin/go/vaultik/internal/snapshot"
|
|
)
|
|
|
|
// VerifyOptions contains options for the verify command
|
|
type VerifyOptions struct {
|
|
Deep bool
|
|
JSON bool
|
|
}
|
|
|
|
// VerifyResult contains the result of a snapshot verification
|
|
type VerifyResult struct {
|
|
SnapshotID string `json:"snapshot_id"`
|
|
Status string `json:"status"` // "ok" or "failed"
|
|
Mode string `json:"mode"` // "shallow" or "deep"
|
|
BlobCount int `json:"blob_count"`
|
|
TotalSize int64 `json:"total_size"`
|
|
Verified int `json:"verified"`
|
|
Missing int `json:"missing"`
|
|
MissingSize int64 `json:"missing_size,omitempty"`
|
|
ErrorMessage string `json:"error,omitempty"`
|
|
}
|
|
|
|
// deepVerifyFailure records a failure in the result and returns it appropriately
|
|
func (v *Vaultik) deepVerifyFailure(result *VerifyResult, opts *VerifyOptions, msg string, err error) error {
|
|
result.Status = "failed"
|
|
result.ErrorMessage = msg
|
|
if opts.JSON {
|
|
return v.outputVerifyJSON(result)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return fmt.Errorf("%s", msg)
|
|
}
|
|
|
|
// RunDeepVerify executes deep verification operation
|
|
func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
|
result := &VerifyResult{
|
|
SnapshotID: snapshotID,
|
|
Mode: "deep",
|
|
}
|
|
|
|
if !v.CanDecrypt() {
|
|
msg := "VAULTIK_AGE_SECRET_KEY not set; required for deep verification"
|
|
return v.deepVerifyFailure(result, opts, msg, fmt.Errorf("%s", msg))
|
|
}
|
|
|
|
log.Info("Starting snapshot verification", "snapshot_id", snapshotID, "mode", "deep")
|
|
if !opts.JSON {
|
|
v.printfStdout("Deep verification of snapshot: %s\n\n", snapshotID)
|
|
}
|
|
|
|
manifest, tempDB, dbBlobs, err := v.loadVerificationData(snapshotID, opts, result)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if tempDB != nil {
|
|
_ = tempDB.Close()
|
|
}
|
|
}()
|
|
|
|
result.BlobCount = len(dbBlobs)
|
|
var totalSize int64
|
|
for _, blob := range dbBlobs {
|
|
totalSize += blob.CompressedSize
|
|
}
|
|
result.TotalSize = totalSize
|
|
|
|
if err := v.runVerificationSteps(manifest, dbBlobs, tempDB, opts, result, totalSize); err != nil {
|
|
return err
|
|
}
|
|
|
|
result.Status = "ok"
|
|
result.Verified = len(dbBlobs)
|
|
|
|
if opts.JSON {
|
|
return v.outputVerifyJSON(result)
|
|
}
|
|
|
|
log.Info("✓ Verification completed successfully",
|
|
"snapshot_id", snapshotID, "mode", "deep", "blobs_verified", len(dbBlobs))
|
|
v.printfStdout("\n✓ Verification completed successfully\n")
|
|
v.printfStdout(" Snapshot: %s\n", snapshotID)
|
|
v.printfStdout(" Blobs verified: %d\n", len(dbBlobs))
|
|
v.printfStdout(" Total size: %s\n", humanize.Bytes(uint64(totalSize)))
|
|
|
|
return nil
|
|
}
|
|
|
|
// loadVerificationData downloads manifest, database, and blob list for verification
|
|
func (v *Vaultik) loadVerificationData(snapshotID string, opts *VerifyOptions, result *VerifyResult) (*snapshot.Manifest, *tempDB, []snapshot.BlobInfo, error) {
|
|
// All remote paths use the hashed key derived from the human ID.
|
|
remoteKey := snapshot.RemoteSnapshotKey(snapshotID)
|
|
|
|
// Download manifest
|
|
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", remoteKey)
|
|
log.Info("Downloading manifest", "path", manifestPath)
|
|
if !opts.JSON {
|
|
v.printfStdout("Downloading manifest...\n")
|
|
}
|
|
manifestReader, err := v.Storage.Get(v.ctx, manifestPath)
|
|
if err != nil {
|
|
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
|
fmt.Sprintf("failed to download manifest: %v", err),
|
|
fmt.Errorf("failed to download manifest: %w", err))
|
|
}
|
|
defer func() { _ = manifestReader.Close() }()
|
|
|
|
manifest, err := snapshot.DecodeManifest(manifestReader)
|
|
if err != nil {
|
|
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
|
fmt.Sprintf("failed to decode manifest: %v", err),
|
|
fmt.Errorf("failed to decode manifest: %w", err))
|
|
}
|
|
|
|
log.Info("Manifest loaded",
|
|
"manifest_blob_count", manifest.BlobCount,
|
|
"manifest_total_size", humanize.Bytes(uint64(manifest.TotalCompressedSize)))
|
|
if !opts.JSON {
|
|
v.printfStdout("Manifest loaded: %d blobs (%s)\n", manifest.BlobCount, humanize.Bytes(uint64(manifest.TotalCompressedSize)))
|
|
v.printfStdout("Downloading and decrypting database...\n")
|
|
}
|
|
|
|
// Download and decrypt database
|
|
dbPath := fmt.Sprintf("metadata/%s/db.zst.age", remoteKey)
|
|
log.Info("Downloading encrypted database", "path", dbPath)
|
|
dbReader, err := v.Storage.Get(v.ctx, dbPath)
|
|
if err != nil {
|
|
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
|
fmt.Sprintf("failed to download database: %v", err),
|
|
fmt.Errorf("failed to download database: %w", err))
|
|
}
|
|
defer func() { _ = dbReader.Close() }()
|
|
|
|
tdb, err := v.decryptAndLoadDatabase(dbReader, v.Config.AgeSecretKey)
|
|
if err != nil {
|
|
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
|
fmt.Sprintf("failed to decrypt database: %v", err),
|
|
fmt.Errorf("failed to decrypt database: %w", err))
|
|
}
|
|
|
|
dbBlobs, err := v.getBlobsFromDatabase(snapshotID, tdb.DB)
|
|
if err != nil {
|
|
_ = tdb.Close()
|
|
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
|
fmt.Sprintf("failed to get blobs from database: %v", err),
|
|
fmt.Errorf("failed to get blobs from database: %w", err))
|
|
}
|
|
|
|
var dbTotalSize int64
|
|
for _, b := range dbBlobs {
|
|
dbTotalSize += b.CompressedSize
|
|
}
|
|
|
|
log.Info("Database loaded",
|
|
"db_blob_count", len(dbBlobs),
|
|
"db_total_size", humanize.Bytes(uint64(dbTotalSize)))
|
|
if !opts.JSON {
|
|
v.printfStdout("Database loaded: %d blobs (%s)\n", len(dbBlobs), humanize.Bytes(uint64(dbTotalSize)))
|
|
}
|
|
|
|
return manifest, tdb, dbBlobs, nil
|
|
}
|
|
|
|
// runVerificationSteps executes manifest verification, blob existence check, and deep content verification
|
|
func (v *Vaultik) runVerificationSteps(manifest *snapshot.Manifest, dbBlobs []snapshot.BlobInfo, tdb *tempDB, opts *VerifyOptions, result *VerifyResult, totalSize int64) error {
|
|
if !opts.JSON {
|
|
v.printfStdout("Verifying manifest against database...\n")
|
|
}
|
|
if err := v.verifyManifestAgainstDatabase(manifest, dbBlobs); err != nil {
|
|
return v.deepVerifyFailure(result, opts, err.Error(), err)
|
|
}
|
|
|
|
if !opts.JSON {
|
|
v.printfStdout("Manifest verified.\n")
|
|
v.printfStdout("Checking blob existence in remote storage...\n")
|
|
}
|
|
if err := v.verifyBlobExistenceFromDB(dbBlobs); err != nil {
|
|
return v.deepVerifyFailure(result, opts, err.Error(), err)
|
|
}
|
|
|
|
if !opts.JSON {
|
|
v.printfStdout("All blobs exist.\n")
|
|
v.printfStdout("Downloading and verifying blob contents (%d blobs, %s)...\n", len(dbBlobs), humanize.Bytes(uint64(totalSize)))
|
|
}
|
|
if err := v.performDeepVerificationFromDB(dbBlobs, tdb.DB, opts); err != nil {
|
|
return v.deepVerifyFailure(result, opts, err.Error(), err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// tempDB wraps sql.DB with cleanup
|
|
type tempDB struct {
|
|
*sql.DB
|
|
tempPath string
|
|
}
|
|
|
|
func (t *tempDB) Close() error {
|
|
err := t.DB.Close()
|
|
_ = os.Remove(t.tempPath)
|
|
return err
|
|
}
|
|
|
|
// decryptAndLoadDatabase decrypts and loads the binary SQLite database from the encrypted stream
|
|
func (v *Vaultik) decryptAndLoadDatabase(reader io.ReadCloser, secretKey string) (*tempDB, error) {
|
|
// Get decryptor
|
|
decryptor, err := v.GetDecryptor()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get decryptor: %w", err)
|
|
}
|
|
|
|
// Decrypt the stream
|
|
decryptedReader, err := decryptor.DecryptStream(reader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decrypt database: %w", err)
|
|
}
|
|
|
|
// Decompress the binary database
|
|
decompressor, err := zstd.NewReader(decryptedReader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create decompressor: %w", err)
|
|
}
|
|
defer decompressor.Close()
|
|
|
|
// Create temporary file for the database
|
|
tempFile, err := os.CreateTemp("", "vaultik-verify-*.db")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create temp file: %w", err)
|
|
}
|
|
tempPath := tempFile.Name()
|
|
|
|
// Stream decompress directly to file
|
|
log.Info("Decompressing database...")
|
|
written, err := io.Copy(tempFile, decompressor)
|
|
if err != nil {
|
|
_ = tempFile.Close()
|
|
_ = os.Remove(tempPath)
|
|
return nil, fmt.Errorf("failed to decompress database: %w", err)
|
|
}
|
|
_ = tempFile.Close()
|
|
|
|
log.Info("Database decompressed", "size", humanize.Bytes(uint64(written)))
|
|
|
|
// Open the database
|
|
db, err := sql.Open("sqlite", tempPath)
|
|
if err != nil {
|
|
_ = os.Remove(tempPath)
|
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
|
|
return &tempDB{
|
|
DB: db,
|
|
tempPath: tempPath,
|
|
}, nil
|
|
}
|
|
|
|
// verifyBlob downloads and verifies a single blob
|
|
func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
|
|
// Download blob using shared fetch method
|
|
reader, _, err := v.FetchBlob(v.ctx, blobInfo.Hash, blobInfo.CompressedSize)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to download: %w", err)
|
|
}
|
|
defer func() { _ = reader.Close() }()
|
|
|
|
// Get decryptor
|
|
decryptor, err := v.GetDecryptor()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get decryptor: %w", err)
|
|
}
|
|
|
|
// Hash the encrypted blob data as it streams through to decryption
|
|
blobHasher := sha256.New()
|
|
teeReader := io.TeeReader(reader, blobHasher)
|
|
|
|
// Decrypt blob (reading through teeReader to hash encrypted data)
|
|
decryptedReader, err := decryptor.DecryptStream(teeReader)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to decrypt: %w", err)
|
|
}
|
|
|
|
// Decompress blob
|
|
decompressor, err := zstd.NewReader(decryptedReader)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to decompress: %w", err)
|
|
}
|
|
defer decompressor.Close()
|
|
|
|
chunkCount, err := v.verifyBlobChunks(db, blobInfo.Hash, decompressor)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := v.verifyBlobFinalIntegrity(decompressor, blobHasher, blobInfo.Hash); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("Blob verified",
|
|
"hash", blobInfo.Hash[:16]+"...",
|
|
"chunks", chunkCount,
|
|
"size", humanize.Bytes(uint64(blobInfo.CompressedSize)),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// verifyBlobChunks queries blob chunks from the database and verifies each chunk's hash
|
|
// against the decompressed blob stream
|
|
func (v *Vaultik) verifyBlobChunks(db *sql.DB, blobHash string, decompressor io.Reader) (int, error) {
|
|
query := `
|
|
SELECT bc.chunk_hash, bc.offset, bc.length
|
|
FROM blob_chunks bc
|
|
JOIN blobs b ON bc.blob_id = b.id
|
|
WHERE b.blob_hash = ?
|
|
ORDER BY bc.offset
|
|
`
|
|
rows, err := db.QueryContext(v.ctx, query, blobHash)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to query blob chunks: %w", err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
|
|
var lastOffset int64 = -1
|
|
chunkCount := 0
|
|
totalRead := int64(0)
|
|
|
|
// Verify each chunk in the blob
|
|
for rows.Next() {
|
|
var chunkHash string
|
|
var offset, length int64
|
|
if err := rows.Scan(&chunkHash, &offset, &length); err != nil {
|
|
return 0, fmt.Errorf("failed to scan chunk row: %w", err)
|
|
}
|
|
|
|
// Verify chunk ordering
|
|
if offset <= lastOffset {
|
|
return 0, fmt.Errorf("chunks out of order: offset %d after %d", offset, lastOffset)
|
|
}
|
|
lastOffset = offset
|
|
|
|
// Read chunk data from decompressed stream
|
|
if offset > totalRead {
|
|
// Skip to the correct offset
|
|
skipBytes := offset - totalRead
|
|
if _, err := io.CopyN(io.Discard, decompressor, skipBytes); err != nil {
|
|
return 0, fmt.Errorf("failed to skip to offset %d: %w", offset, err)
|
|
}
|
|
totalRead = offset
|
|
}
|
|
|
|
// Read chunk data
|
|
chunkData := make([]byte, length)
|
|
if _, err := io.ReadFull(decompressor, chunkData); err != nil {
|
|
return 0, fmt.Errorf("failed to read chunk at offset %d: %w", offset, err)
|
|
}
|
|
totalRead += length
|
|
|
|
// Verify chunk hash
|
|
hasher := sha256.New()
|
|
hasher.Write(chunkData)
|
|
calculatedHash := hex.EncodeToString(hasher.Sum(nil))
|
|
|
|
if calculatedHash != chunkHash {
|
|
return 0, fmt.Errorf("chunk hash mismatch at offset %d: calculated %s, expected %s",
|
|
offset, calculatedHash, chunkHash)
|
|
}
|
|
|
|
chunkCount++
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return 0, fmt.Errorf("error iterating blob chunks: %w", err)
|
|
}
|
|
|
|
return chunkCount, nil
|
|
}
|
|
|
|
// verifyBlobFinalIntegrity checks that no trailing data exists in the decompressed stream
|
|
// and that the encrypted blob hash matches the expected value
|
|
func (v *Vaultik) verifyBlobFinalIntegrity(decompressor io.Reader, blobHasher hash.Hash, expectedHash string) error {
|
|
// Verify no remaining data in blob - if chunk list is accurate, blob should be fully consumed
|
|
remaining, err := io.Copy(io.Discard, decompressor)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check for remaining blob data: %w", err)
|
|
}
|
|
if remaining > 0 {
|
|
return fmt.Errorf("blob has %d unexpected trailing bytes not covered by chunk list", remaining)
|
|
}
|
|
|
|
// Verify blob hash matches the encrypted data we downloaded
|
|
calculatedBlobHash := hex.EncodeToString(blobHasher.Sum(nil))
|
|
if calculatedBlobHash != expectedHash {
|
|
return fmt.Errorf("blob hash mismatch: calculated %s, expected %s",
|
|
calculatedBlobHash, expectedHash)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getBlobsFromDatabase gets all blobs for the snapshot from the database
|
|
func (v *Vaultik) getBlobsFromDatabase(snapshotID string, db *sql.DB) ([]snapshot.BlobInfo, error) {
|
|
query := `
|
|
SELECT b.blob_hash, b.compressed_size
|
|
FROM snapshot_blobs sb
|
|
JOIN blobs b ON sb.blob_hash = b.blob_hash
|
|
WHERE sb.snapshot_id = ?
|
|
ORDER BY b.blob_hash
|
|
`
|
|
rows, err := db.QueryContext(v.ctx, query, snapshotID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query snapshot blobs: %w", err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
|
|
var blobs []snapshot.BlobInfo
|
|
for rows.Next() {
|
|
var hash string
|
|
var size int64
|
|
if err := rows.Scan(&hash, &size); err != nil {
|
|
return nil, fmt.Errorf("failed to scan blob row: %w", err)
|
|
}
|
|
blobs = append(blobs, snapshot.BlobInfo{
|
|
Hash: hash,
|
|
CompressedSize: size,
|
|
})
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating blobs: %w", err)
|
|
}
|
|
|
|
return blobs, nil
|
|
}
|
|
|
|
// verifyManifestAgainstDatabase verifies the manifest matches the authoritative database
|
|
func (v *Vaultik) verifyManifestAgainstDatabase(manifest *snapshot.Manifest, dbBlobs []snapshot.BlobInfo) error {
|
|
log.Info("Verifying manifest against database")
|
|
|
|
// Build map of database blobs
|
|
dbBlobMap := make(map[string]int64)
|
|
for _, blob := range dbBlobs {
|
|
dbBlobMap[blob.Hash] = blob.CompressedSize
|
|
}
|
|
|
|
// Build map of manifest blobs
|
|
manifestBlobMap := make(map[string]int64)
|
|
for _, blob := range manifest.Blobs {
|
|
manifestBlobMap[blob.Hash] = blob.CompressedSize
|
|
}
|
|
|
|
// Check counts match
|
|
if len(dbBlobMap) != len(manifestBlobMap) {
|
|
log.Warn("Manifest blob count mismatch",
|
|
"database_blobs", len(dbBlobMap),
|
|
"manifest_blobs", len(manifestBlobMap),
|
|
)
|
|
// This is a warning, not an error - database is authoritative
|
|
}
|
|
|
|
// Check each manifest blob exists in database with correct size
|
|
for hash, manifestSize := range manifestBlobMap {
|
|
dbSize, exists := dbBlobMap[hash]
|
|
if !exists {
|
|
return fmt.Errorf("manifest contains blob %s not in database", hash)
|
|
}
|
|
if dbSize != manifestSize {
|
|
return fmt.Errorf("blob %s size mismatch: database has %d bytes, manifest has %d bytes",
|
|
hash, dbSize, manifestSize)
|
|
}
|
|
}
|
|
|
|
log.Info("✓ Manifest verified against database",
|
|
"manifest_blobs", len(manifestBlobMap),
|
|
"database_blobs", len(dbBlobMap),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// verifyBlobExistenceFromDB checks that all blobs from database exist in S3
|
|
func (v *Vaultik) verifyBlobExistenceFromDB(blobs []snapshot.BlobInfo) error {
|
|
log.Info("Verifying blob existence in S3", "blob_count", len(blobs))
|
|
|
|
for i, blob := range blobs {
|
|
// Construct blob path
|
|
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blob.Hash[:2], blob.Hash[2:4], blob.Hash)
|
|
|
|
// Check blob exists
|
|
stat, err := v.Storage.Stat(v.ctx, blobPath)
|
|
if err != nil {
|
|
return fmt.Errorf("blob %s missing from storage: %w", blob.Hash, err)
|
|
}
|
|
|
|
// Verify size matches
|
|
if stat.Size != blob.CompressedSize {
|
|
return fmt.Errorf("blob %s size mismatch: S3 has %d bytes, database has %d bytes",
|
|
blob.Hash, stat.Size, blob.CompressedSize)
|
|
}
|
|
|
|
// Progress update every 100 blobs
|
|
if (i+1)%100 == 0 || i == len(blobs)-1 {
|
|
log.Info("Blob existence check progress",
|
|
"checked", i+1,
|
|
"total", len(blobs),
|
|
"percent", fmt.Sprintf("%.1f%%", float64(i+1)/float64(len(blobs))*100),
|
|
)
|
|
}
|
|
}
|
|
|
|
log.Info("✓ All blobs exist in storage")
|
|
return nil
|
|
}
|
|
|
|
// performDeepVerificationFromDB downloads and verifies the content of each blob using database as source
|
|
func (v *Vaultik) performDeepVerificationFromDB(blobs []snapshot.BlobInfo, db *sql.DB, opts *VerifyOptions) error {
|
|
// Calculate total bytes for ETA
|
|
var totalBytesExpected int64
|
|
for _, b := range blobs {
|
|
totalBytesExpected += b.CompressedSize
|
|
}
|
|
|
|
log.Info("Starting deep verification - downloading and verifying all blobs",
|
|
"blob_count", len(blobs),
|
|
"total_size", humanize.Bytes(uint64(totalBytesExpected)),
|
|
)
|
|
|
|
startTime := time.Now()
|
|
bytesProcessed := int64(0)
|
|
|
|
for i, blobInfo := range blobs {
|
|
// Verify individual blob
|
|
if err := v.verifyBlob(blobInfo, db); err != nil {
|
|
return fmt.Errorf("blob %s verification failed: %w", blobInfo.Hash, err)
|
|
}
|
|
|
|
bytesProcessed += blobInfo.CompressedSize
|
|
elapsed := time.Since(startTime)
|
|
remaining := len(blobs) - (i + 1)
|
|
|
|
// Calculate ETA based on bytes processed
|
|
var eta time.Duration
|
|
if bytesProcessed > 0 {
|
|
bytesPerSec := float64(bytesProcessed) / elapsed.Seconds()
|
|
bytesRemaining := totalBytesExpected - bytesProcessed
|
|
if bytesPerSec > 0 {
|
|
eta = time.Duration(float64(bytesRemaining)/bytesPerSec) * time.Second
|
|
}
|
|
}
|
|
|
|
log.Info("Verification progress",
|
|
"blobs_done", i+1,
|
|
"blobs_total", len(blobs),
|
|
"blobs_remaining", remaining,
|
|
"bytes_done", bytesProcessed,
|
|
"bytes_done_human", humanize.Bytes(uint64(bytesProcessed)),
|
|
"bytes_total", totalBytesExpected,
|
|
"bytes_total_human", humanize.Bytes(uint64(totalBytesExpected)),
|
|
"elapsed", elapsed.Round(time.Second),
|
|
"eta", eta.Round(time.Second),
|
|
)
|
|
|
|
if !opts.JSON {
|
|
v.printfStdout(" Verified %d/%d blobs (%d remaining) - %s/%s - elapsed %s, eta %s\n",
|
|
i+1, len(blobs), remaining,
|
|
humanize.Bytes(uint64(bytesProcessed)),
|
|
humanize.Bytes(uint64(totalBytesExpected)),
|
|
elapsed.Round(time.Second),
|
|
eta.Round(time.Second))
|
|
}
|
|
}
|
|
|
|
totalElapsed := time.Since(startTime)
|
|
log.Info("✓ Deep verification completed successfully",
|
|
"blobs_verified", len(blobs),
|
|
"total_bytes", bytesProcessed,
|
|
"total_bytes_human", humanize.Bytes(uint64(bytesProcessed)),
|
|
"duration", totalElapsed.Round(time.Second),
|
|
)
|
|
|
|
return nil
|
|
}
|