vaultik/internal/vaultik/verify.go
sneak 470bf648c4 Add deterministic deduplication, rclone backend, and database purge command
- Implement deterministic blob hashing using double SHA256 of uncompressed
  plaintext data, enabling deduplication even after local DB is cleared
- Add Stat() check before blob upload to skip existing blobs in storage
- Add rclone storage backend for additional remote storage options
- Add 'vaultik database purge' command to erase local state DB
- Add 'vaultik remote check' command to verify remote connectivity
- Show configured snapshots in 'vaultik snapshot list' output
- Skip macOS resource fork files (._*) when listing remote snapshots
- Use multi-threaded zstd compression (CPUs - 2 threads)
- Add writer tests for double hashing behavior
2026-01-28 15:50:17 -08:00

591 lines
17 KiB
Go

package vaultik
import (
"crypto/sha256"
"database/sql"
"encoding/hex"
"fmt"
"io"
"os"
"time"
"git.eeqj.de/sneak/vaultik/internal/log"
"git.eeqj.de/sneak/vaultik/internal/snapshot"
"github.com/dustin/go-humanize"
"github.com/klauspost/compress/zstd"
_ "github.com/mattn/go-sqlite3"
)
// VerifyOptions contains options for the verify command
type VerifyOptions struct {
Deep bool
JSON bool
}
// VerifyResult contains the result of a snapshot verification
type VerifyResult struct {
SnapshotID string `json:"snapshot_id"`
Status string `json:"status"` // "ok" or "failed"
Mode string `json:"mode"` // "shallow" or "deep"
BlobCount int `json:"blob_count"`
TotalSize int64 `json:"total_size"`
Verified int `json:"verified"`
Missing int `json:"missing"`
MissingSize int64 `json:"missing_size,omitempty"`
ErrorMessage string `json:"error,omitempty"`
}
// RunDeepVerify executes deep verification operation
func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
result := &VerifyResult{
SnapshotID: snapshotID,
Mode: "deep",
}
// Check for decryption capability
if !v.CanDecrypt() {
result.Status = "failed"
result.ErrorMessage = "VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification"
if opts.JSON {
return v.outputVerifyJSON(result)
}
return fmt.Errorf("VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification")
}
log.Info("Starting snapshot verification",
"snapshot_id", snapshotID,
"mode", "deep",
)
if !opts.JSON {
v.Outputf("Deep verification of snapshot: %s\n\n", snapshotID)
}
// Step 1: Download manifest
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
log.Info("Downloading manifest", "path", manifestPath)
if !opts.JSON {
v.Outputf("Downloading manifest...\n")
}
manifestReader, err := v.Storage.Get(v.ctx, manifestPath)
if err != nil {
result.Status = "failed"
result.ErrorMessage = fmt.Sprintf("failed to download manifest: %v", err)
if opts.JSON {
return v.outputVerifyJSON(result)
}
return fmt.Errorf("failed to download manifest: %w", err)
}
defer func() { _ = manifestReader.Close() }()
// Decompress manifest
manifest, err := snapshot.DecodeManifest(manifestReader)
if err != nil {
result.Status = "failed"
result.ErrorMessage = fmt.Sprintf("failed to decode manifest: %v", err)
if opts.JSON {
return v.outputVerifyJSON(result)
}
return fmt.Errorf("failed to decode manifest: %w", err)
}
log.Info("Manifest loaded",
"manifest_blob_count", manifest.BlobCount,
"manifest_total_size", humanize.Bytes(uint64(manifest.TotalCompressedSize)),
)
if !opts.JSON {
v.Outputf("Manifest loaded: %d blobs (%s)\n", manifest.BlobCount, humanize.Bytes(uint64(manifest.TotalCompressedSize)))
}
// Step 2: Download and decrypt database (authoritative source)
dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
log.Info("Downloading encrypted database", "path", dbPath)
if !opts.JSON {
v.Outputf("Downloading and decrypting database...\n")
}
dbReader, err := v.Storage.Get(v.ctx, dbPath)
if err != nil {
result.Status = "failed"
result.ErrorMessage = fmt.Sprintf("failed to download database: %v", err)
if opts.JSON {
return v.outputVerifyJSON(result)
}
return fmt.Errorf("failed to download database: %w", err)
}
defer func() { _ = dbReader.Close() }()
// Decrypt and decompress database
tempDB, err := v.decryptAndLoadDatabase(dbReader, v.Config.AgeSecretKey)
if err != nil {
result.Status = "failed"
result.ErrorMessage = fmt.Sprintf("failed to decrypt database: %v", err)
if opts.JSON {
return v.outputVerifyJSON(result)
}
return fmt.Errorf("failed to decrypt database: %w", err)
}
defer func() {
if tempDB != nil {
_ = tempDB.Close()
}
}()
// Step 3: Get authoritative blob list from database
dbBlobs, err := v.getBlobsFromDatabase(snapshotID, tempDB.DB)
if err != nil {
result.Status = "failed"
result.ErrorMessage = fmt.Sprintf("failed to get blobs from database: %v", err)
if opts.JSON {
return v.outputVerifyJSON(result)
}
return fmt.Errorf("failed to get blobs from database: %w", err)
}
result.BlobCount = len(dbBlobs)
var totalSize int64
for _, blob := range dbBlobs {
totalSize += blob.CompressedSize
}
result.TotalSize = totalSize
log.Info("Database loaded",
"db_blob_count", len(dbBlobs),
"db_total_size", humanize.Bytes(uint64(totalSize)),
)
if !opts.JSON {
v.Outputf("Database loaded: %d blobs (%s)\n", len(dbBlobs), humanize.Bytes(uint64(totalSize)))
v.Outputf("Verifying manifest against database...\n")
}
// Step 4: Verify manifest matches database
if err := v.verifyManifestAgainstDatabase(manifest, dbBlobs); err != nil {
result.Status = "failed"
result.ErrorMessage = err.Error()
if opts.JSON {
return v.outputVerifyJSON(result)
}
return err
}
// Step 5: Verify all blobs exist in S3 (using database as source)
if !opts.JSON {
v.Outputf("Manifest verified.\n")
v.Outputf("Checking blob existence in remote storage...\n")
}
if err := v.verifyBlobExistenceFromDB(dbBlobs); err != nil {
result.Status = "failed"
result.ErrorMessage = err.Error()
if opts.JSON {
return v.outputVerifyJSON(result)
}
return err
}
// Step 6: Deep verification - download and verify blob contents
if !opts.JSON {
v.Outputf("All blobs exist.\n")
v.Outputf("Downloading and verifying blob contents (%d blobs, %s)...\n", len(dbBlobs), humanize.Bytes(uint64(totalSize)))
}
if err := v.performDeepVerificationFromDB(dbBlobs, tempDB.DB, opts); err != nil {
result.Status = "failed"
result.ErrorMessage = err.Error()
if opts.JSON {
return v.outputVerifyJSON(result)
}
return err
}
// Success
result.Status = "ok"
result.Verified = len(dbBlobs)
if opts.JSON {
return v.outputVerifyJSON(result)
}
log.Info("✓ Verification completed successfully",
"snapshot_id", snapshotID,
"mode", "deep",
"blobs_verified", len(dbBlobs),
)
v.Outputf("\n✓ Verification completed successfully\n")
v.Outputf(" Snapshot: %s\n", snapshotID)
v.Outputf(" Blobs verified: %d\n", len(dbBlobs))
v.Outputf(" Total size: %s\n", humanize.Bytes(uint64(totalSize)))
return nil
}
// tempDB wraps sql.DB with cleanup
type tempDB struct {
*sql.DB
tempPath string
}
func (t *tempDB) Close() error {
err := t.DB.Close()
_ = os.Remove(t.tempPath)
return err
}
// decryptAndLoadDatabase decrypts and loads the binary SQLite database from the encrypted stream
func (v *Vaultik) decryptAndLoadDatabase(reader io.ReadCloser, secretKey string) (*tempDB, error) {
// Get decryptor
decryptor, err := v.GetDecryptor()
if err != nil {
return nil, fmt.Errorf("failed to get decryptor: %w", err)
}
// Decrypt the stream
decryptedReader, err := decryptor.DecryptStream(reader)
if err != nil {
return nil, fmt.Errorf("failed to decrypt database: %w", err)
}
// Decompress the binary database
decompressor, err := zstd.NewReader(decryptedReader)
if err != nil {
return nil, fmt.Errorf("failed to create decompressor: %w", err)
}
defer decompressor.Close()
// Create temporary file for the database
tempFile, err := os.CreateTemp("", "vaultik-verify-*.db")
if err != nil {
return nil, fmt.Errorf("failed to create temp file: %w", err)
}
tempPath := tempFile.Name()
// Stream decompress directly to file
log.Info("Decompressing database...")
written, err := io.Copy(tempFile, decompressor)
if err != nil {
_ = tempFile.Close()
_ = os.Remove(tempPath)
return nil, fmt.Errorf("failed to decompress database: %w", err)
}
_ = tempFile.Close()
log.Info("Database decompressed", "size", humanize.Bytes(uint64(written)))
// Open the database
db, err := sql.Open("sqlite3", tempPath)
if err != nil {
_ = os.Remove(tempPath)
return nil, fmt.Errorf("failed to open database: %w", err)
}
return &tempDB{
DB: db,
tempPath: tempPath,
}, nil
}
// verifyBlob downloads and verifies a single blob
func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
// Download blob using shared fetch method
reader, _, err := v.FetchBlob(v.ctx, blobInfo.Hash, blobInfo.CompressedSize)
if err != nil {
return fmt.Errorf("failed to download: %w", err)
}
defer func() { _ = reader.Close() }()
// Get decryptor
decryptor, err := v.GetDecryptor()
if err != nil {
return fmt.Errorf("failed to get decryptor: %w", err)
}
// Hash the encrypted blob data as it streams through to decryption
blobHasher := sha256.New()
teeReader := io.TeeReader(reader, blobHasher)
// Decrypt blob (reading through teeReader to hash encrypted data)
decryptedReader, err := decryptor.DecryptStream(teeReader)
if err != nil {
return fmt.Errorf("failed to decrypt: %w", err)
}
// Decompress blob
decompressor, err := zstd.NewReader(decryptedReader)
if err != nil {
return fmt.Errorf("failed to decompress: %w", err)
}
defer decompressor.Close()
// Query blob chunks from database to get offsets and lengths
query := `
SELECT bc.chunk_hash, bc.offset, bc.length
FROM blob_chunks bc
JOIN blobs b ON bc.blob_id = b.id
WHERE b.blob_hash = ?
ORDER BY bc.offset
`
rows, err := db.QueryContext(v.ctx, query, blobInfo.Hash)
if err != nil {
return fmt.Errorf("failed to query blob chunks: %w", err)
}
defer func() { _ = rows.Close() }()
var lastOffset int64 = -1
chunkCount := 0
totalRead := int64(0)
// Verify each chunk in the blob
for rows.Next() {
var chunkHash string
var offset, length int64
if err := rows.Scan(&chunkHash, &offset, &length); err != nil {
return fmt.Errorf("failed to scan chunk row: %w", err)
}
// Verify chunk ordering
if offset <= lastOffset {
return fmt.Errorf("chunks out of order: offset %d after %d", offset, lastOffset)
}
lastOffset = offset
// Read chunk data from decompressed stream
if offset > totalRead {
// Skip to the correct offset
skipBytes := offset - totalRead
if _, err := io.CopyN(io.Discard, decompressor, skipBytes); err != nil {
return fmt.Errorf("failed to skip to offset %d: %w", offset, err)
}
totalRead = offset
}
// Read chunk data
chunkData := make([]byte, length)
if _, err := io.ReadFull(decompressor, chunkData); err != nil {
return fmt.Errorf("failed to read chunk at offset %d: %w", offset, err)
}
totalRead += length
// Verify chunk hash
hasher := sha256.New()
hasher.Write(chunkData)
calculatedHash := hex.EncodeToString(hasher.Sum(nil))
if calculatedHash != chunkHash {
return fmt.Errorf("chunk hash mismatch at offset %d: calculated %s, expected %s",
offset, calculatedHash, chunkHash)
}
chunkCount++
}
if err := rows.Err(); err != nil {
return fmt.Errorf("error iterating blob chunks: %w", err)
}
// Verify no remaining data in blob - if chunk list is accurate, blob should be fully consumed
remaining, err := io.Copy(io.Discard, decompressor)
if err != nil {
return fmt.Errorf("failed to check for remaining blob data: %w", err)
}
if remaining > 0 {
return fmt.Errorf("blob has %d unexpected trailing bytes not covered by chunk list", remaining)
}
// Verify blob hash matches the encrypted data we downloaded
calculatedBlobHash := hex.EncodeToString(blobHasher.Sum(nil))
if calculatedBlobHash != blobInfo.Hash {
return fmt.Errorf("blob hash mismatch: calculated %s, expected %s",
calculatedBlobHash, blobInfo.Hash)
}
log.Info("Blob verified",
"hash", blobInfo.Hash[:16]+"...",
"chunks", chunkCount,
"size", humanize.Bytes(uint64(blobInfo.CompressedSize)),
)
return nil
}
// getBlobsFromDatabase gets all blobs for the snapshot from the database
func (v *Vaultik) getBlobsFromDatabase(snapshotID string, db *sql.DB) ([]snapshot.BlobInfo, error) {
query := `
SELECT b.blob_hash, b.compressed_size
FROM snapshot_blobs sb
JOIN blobs b ON sb.blob_hash = b.blob_hash
WHERE sb.snapshot_id = ?
ORDER BY b.blob_hash
`
rows, err := db.QueryContext(v.ctx, query, snapshotID)
if err != nil {
return nil, fmt.Errorf("failed to query snapshot blobs: %w", err)
}
defer func() { _ = rows.Close() }()
var blobs []snapshot.BlobInfo
for rows.Next() {
var hash string
var size int64
if err := rows.Scan(&hash, &size); err != nil {
return nil, fmt.Errorf("failed to scan blob row: %w", err)
}
blobs = append(blobs, snapshot.BlobInfo{
Hash: hash,
CompressedSize: size,
})
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating blobs: %w", err)
}
return blobs, nil
}
// verifyManifestAgainstDatabase verifies the manifest matches the authoritative database
func (v *Vaultik) verifyManifestAgainstDatabase(manifest *snapshot.Manifest, dbBlobs []snapshot.BlobInfo) error {
log.Info("Verifying manifest against database")
// Build map of database blobs
dbBlobMap := make(map[string]int64)
for _, blob := range dbBlobs {
dbBlobMap[blob.Hash] = blob.CompressedSize
}
// Build map of manifest blobs
manifestBlobMap := make(map[string]int64)
for _, blob := range manifest.Blobs {
manifestBlobMap[blob.Hash] = blob.CompressedSize
}
// Check counts match
if len(dbBlobMap) != len(manifestBlobMap) {
log.Warn("Manifest blob count mismatch",
"database_blobs", len(dbBlobMap),
"manifest_blobs", len(manifestBlobMap),
)
// This is a warning, not an error - database is authoritative
}
// Check each manifest blob exists in database with correct size
for hash, manifestSize := range manifestBlobMap {
dbSize, exists := dbBlobMap[hash]
if !exists {
return fmt.Errorf("manifest contains blob %s not in database", hash)
}
if dbSize != manifestSize {
return fmt.Errorf("blob %s size mismatch: database has %d bytes, manifest has %d bytes",
hash, dbSize, manifestSize)
}
}
log.Info("✓ Manifest verified against database",
"manifest_blobs", len(manifestBlobMap),
"database_blobs", len(dbBlobMap),
)
return nil
}
// verifyBlobExistenceFromDB checks that all blobs from database exist in S3
func (v *Vaultik) verifyBlobExistenceFromDB(blobs []snapshot.BlobInfo) error {
log.Info("Verifying blob existence in S3", "blob_count", len(blobs))
for i, blob := range blobs {
// Construct blob path
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blob.Hash[:2], blob.Hash[2:4], blob.Hash)
// Check blob exists
stat, err := v.Storage.Stat(v.ctx, blobPath)
if err != nil {
return fmt.Errorf("blob %s missing from storage: %w", blob.Hash, err)
}
// Verify size matches
if stat.Size != blob.CompressedSize {
return fmt.Errorf("blob %s size mismatch: S3 has %d bytes, database has %d bytes",
blob.Hash, stat.Size, blob.CompressedSize)
}
// Progress update every 100 blobs
if (i+1)%100 == 0 || i == len(blobs)-1 {
log.Info("Blob existence check progress",
"checked", i+1,
"total", len(blobs),
"percent", fmt.Sprintf("%.1f%%", float64(i+1)/float64(len(blobs))*100),
)
}
}
log.Info("✓ All blobs exist in storage")
return nil
}
// performDeepVerificationFromDB downloads and verifies the content of each blob using database as source
func (v *Vaultik) performDeepVerificationFromDB(blobs []snapshot.BlobInfo, db *sql.DB, opts *VerifyOptions) error {
// Calculate total bytes for ETA
var totalBytesExpected int64
for _, b := range blobs {
totalBytesExpected += b.CompressedSize
}
log.Info("Starting deep verification - downloading and verifying all blobs",
"blob_count", len(blobs),
"total_size", humanize.Bytes(uint64(totalBytesExpected)),
)
startTime := time.Now()
bytesProcessed := int64(0)
for i, blobInfo := range blobs {
// Verify individual blob
if err := v.verifyBlob(blobInfo, db); err != nil {
return fmt.Errorf("blob %s verification failed: %w", blobInfo.Hash, err)
}
bytesProcessed += blobInfo.CompressedSize
elapsed := time.Since(startTime)
remaining := len(blobs) - (i + 1)
// Calculate ETA based on bytes processed
var eta time.Duration
if bytesProcessed > 0 {
bytesPerSec := float64(bytesProcessed) / elapsed.Seconds()
bytesRemaining := totalBytesExpected - bytesProcessed
if bytesPerSec > 0 {
eta = time.Duration(float64(bytesRemaining)/bytesPerSec) * time.Second
}
}
log.Info("Verification progress",
"blobs_done", i+1,
"blobs_total", len(blobs),
"blobs_remaining", remaining,
"bytes_done", bytesProcessed,
"bytes_done_human", humanize.Bytes(uint64(bytesProcessed)),
"bytes_total", totalBytesExpected,
"bytes_total_human", humanize.Bytes(uint64(totalBytesExpected)),
"elapsed", elapsed.Round(time.Second),
"eta", eta.Round(time.Second),
)
if !opts.JSON {
v.Outputf(" Verified %d/%d blobs (%d remaining) - %s/%s - elapsed %s, eta %s\n",
i+1, len(blobs), remaining,
humanize.Bytes(uint64(bytesProcessed)),
humanize.Bytes(uint64(totalBytesExpected)),
elapsed.Round(time.Second),
eta.Round(time.Second))
}
}
totalElapsed := time.Since(startTime)
log.Info("✓ Deep verification completed successfully",
"blobs_verified", len(blobs),
"total_bytes", bytesProcessed,
"total_bytes_human", humanize.Bytes(uint64(bytesProcessed)),
"duration", totalElapsed.Round(time.Second),
)
return nil
}