package vaultik import ( "crypto/sha256" "database/sql" "encoding/hex" "fmt" "io" "os" "time" "git.eeqj.de/sneak/vaultik/internal/log" "git.eeqj.de/sneak/vaultik/internal/snapshot" "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" _ "github.com/mattn/go-sqlite3" ) // VerifyOptions contains options for the verify command type VerifyOptions struct { Deep bool JSON bool } // VerifyResult contains the result of a snapshot verification type VerifyResult struct { SnapshotID string `json:"snapshot_id"` Status string `json:"status"` // "ok" or "failed" Mode string `json:"mode"` // "shallow" or "deep" BlobCount int `json:"blob_count"` TotalSize int64 `json:"total_size"` Verified int `json:"verified"` Missing int `json:"missing"` MissingSize int64 `json:"missing_size,omitempty"` ErrorMessage string `json:"error,omitempty"` } // RunDeepVerify executes deep verification operation func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error { result := &VerifyResult{ SnapshotID: snapshotID, Mode: "deep", } // Check for decryption capability if !v.CanDecrypt() { result.Status = "failed" result.ErrorMessage = "VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification" if opts.JSON { return v.outputVerifyJSON(result) } return fmt.Errorf("VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification") } log.Info("Starting snapshot verification", "snapshot_id", snapshotID, "mode", "deep", ) if !opts.JSON { v.printfStdout("Deep verification of snapshot: %s\n\n", snapshotID) } // Step 1: Download manifest manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID) log.Info("Downloading manifest", "path", manifestPath) if !opts.JSON { v.printfStdout("Downloading manifest...\n") } manifestReader, err := v.Storage.Get(v.ctx, manifestPath) if err != nil { result.Status = "failed" result.ErrorMessage = fmt.Sprintf("failed to download manifest: %v", err) if opts.JSON { return v.outputVerifyJSON(result) } return fmt.Errorf("failed to download manifest: %w", err) } defer func() { _ = manifestReader.Close() }() // Decompress manifest manifest, err := snapshot.DecodeManifest(manifestReader) if err != nil { result.Status = "failed" result.ErrorMessage = fmt.Sprintf("failed to decode manifest: %v", err) if opts.JSON { return v.outputVerifyJSON(result) } return fmt.Errorf("failed to decode manifest: %w", err) } log.Info("Manifest loaded", "manifest_blob_count", manifest.BlobCount, "manifest_total_size", humanize.Bytes(uint64(manifest.TotalCompressedSize)), ) if !opts.JSON { v.printfStdout("Manifest loaded: %d blobs (%s)\n", manifest.BlobCount, humanize.Bytes(uint64(manifest.TotalCompressedSize))) } // Step 2: Download and decrypt database (authoritative source) dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID) log.Info("Downloading encrypted database", "path", dbPath) if !opts.JSON { v.printfStdout("Downloading and decrypting database...\n") } dbReader, err := v.Storage.Get(v.ctx, dbPath) if err != nil { result.Status = "failed" result.ErrorMessage = fmt.Sprintf("failed to download database: %v", err) if opts.JSON { return v.outputVerifyJSON(result) } return fmt.Errorf("failed to download database: %w", err) } defer func() { _ = dbReader.Close() }() // Decrypt and decompress database tempDB, err := v.decryptAndLoadDatabase(dbReader, v.Config.AgeSecretKey) if err != nil { result.Status = "failed" result.ErrorMessage = fmt.Sprintf("failed to decrypt database: %v", err) if opts.JSON { return v.outputVerifyJSON(result) } return fmt.Errorf("failed to decrypt database: %w", err) } defer func() { if tempDB != nil { _ = tempDB.Close() } }() // Step 3: Get authoritative blob list from database dbBlobs, err := v.getBlobsFromDatabase(snapshotID, tempDB.DB) if err != nil { result.Status = "failed" result.ErrorMessage = fmt.Sprintf("failed to get blobs from database: %v", err) if opts.JSON { return v.outputVerifyJSON(result) } return fmt.Errorf("failed to get blobs from database: %w", err) } result.BlobCount = len(dbBlobs) var totalSize int64 for _, blob := range dbBlobs { totalSize += blob.CompressedSize } result.TotalSize = totalSize log.Info("Database loaded", "db_blob_count", len(dbBlobs), "db_total_size", humanize.Bytes(uint64(totalSize)), ) if !opts.JSON { v.printfStdout("Database loaded: %d blobs (%s)\n", len(dbBlobs), humanize.Bytes(uint64(totalSize))) v.printfStdout("Verifying manifest against database...\n") } // Step 4: Verify manifest matches database if err := v.verifyManifestAgainstDatabase(manifest, dbBlobs); err != nil { result.Status = "failed" result.ErrorMessage = err.Error() if opts.JSON { return v.outputVerifyJSON(result) } return err } // Step 5: Verify all blobs exist in S3 (using database as source) if !opts.JSON { v.printfStdout("Manifest verified.\n") v.printfStdout("Checking blob existence in remote storage...\n") } if err := v.verifyBlobExistenceFromDB(dbBlobs); err != nil { result.Status = "failed" result.ErrorMessage = err.Error() if opts.JSON { return v.outputVerifyJSON(result) } return err } // Step 6: Deep verification - download and verify blob contents if !opts.JSON { v.printfStdout("All blobs exist.\n") v.printfStdout("Downloading and verifying blob contents (%d blobs, %s)...\n", len(dbBlobs), humanize.Bytes(uint64(totalSize))) } if err := v.performDeepVerificationFromDB(dbBlobs, tempDB.DB, opts); err != nil { result.Status = "failed" result.ErrorMessage = err.Error() if opts.JSON { return v.outputVerifyJSON(result) } return err } // Success result.Status = "ok" result.Verified = len(dbBlobs) if opts.JSON { return v.outputVerifyJSON(result) } log.Info("✓ Verification completed successfully", "snapshot_id", snapshotID, "mode", "deep", "blobs_verified", len(dbBlobs), ) v.printfStdout("\n✓ Verification completed successfully\n") v.printfStdout(" Snapshot: %s\n", snapshotID) v.printfStdout(" Blobs verified: %d\n", len(dbBlobs)) v.printfStdout(" Total size: %s\n", humanize.Bytes(uint64(totalSize))) return nil } // tempDB wraps sql.DB with cleanup type tempDB struct { *sql.DB tempPath string } func (t *tempDB) Close() error { err := t.DB.Close() _ = os.Remove(t.tempPath) return err } // decryptAndLoadDatabase decrypts and loads the binary SQLite database from the encrypted stream func (v *Vaultik) decryptAndLoadDatabase(reader io.ReadCloser, secretKey string) (*tempDB, error) { // Get decryptor decryptor, err := v.GetDecryptor() if err != nil { return nil, fmt.Errorf("failed to get decryptor: %w", err) } // Decrypt the stream decryptedReader, err := decryptor.DecryptStream(reader) if err != nil { return nil, fmt.Errorf("failed to decrypt database: %w", err) } // Decompress the binary database decompressor, err := zstd.NewReader(decryptedReader) if err != nil { return nil, fmt.Errorf("failed to create decompressor: %w", err) } defer decompressor.Close() // Create temporary file for the database tempFile, err := os.CreateTemp("", "vaultik-verify-*.db") if err != nil { return nil, fmt.Errorf("failed to create temp file: %w", err) } tempPath := tempFile.Name() // Stream decompress directly to file log.Info("Decompressing database...") written, err := io.Copy(tempFile, decompressor) if err != nil { _ = tempFile.Close() _ = os.Remove(tempPath) return nil, fmt.Errorf("failed to decompress database: %w", err) } _ = tempFile.Close() log.Info("Database decompressed", "size", humanize.Bytes(uint64(written))) // Open the database db, err := sql.Open("sqlite3", tempPath) if err != nil { _ = os.Remove(tempPath) return nil, fmt.Errorf("failed to open database: %w", err) } return &tempDB{ DB: db, tempPath: tempPath, }, nil } // verifyBlob downloads and verifies a single blob func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error { // Download blob using shared fetch method reader, _, err := v.FetchBlob(v.ctx, blobInfo.Hash, blobInfo.CompressedSize) if err != nil { return fmt.Errorf("failed to download: %w", err) } defer func() { _ = reader.Close() }() // Get decryptor decryptor, err := v.GetDecryptor() if err != nil { return fmt.Errorf("failed to get decryptor: %w", err) } // Hash the encrypted blob data as it streams through to decryption blobHasher := sha256.New() teeReader := io.TeeReader(reader, blobHasher) // Decrypt blob (reading through teeReader to hash encrypted data) decryptedReader, err := decryptor.DecryptStream(teeReader) if err != nil { return fmt.Errorf("failed to decrypt: %w", err) } // Decompress blob decompressor, err := zstd.NewReader(decryptedReader) if err != nil { return fmt.Errorf("failed to decompress: %w", err) } defer decompressor.Close() // Query blob chunks from database to get offsets and lengths query := ` SELECT bc.chunk_hash, bc.offset, bc.length FROM blob_chunks bc JOIN blobs b ON bc.blob_id = b.id WHERE b.blob_hash = ? ORDER BY bc.offset ` rows, err := db.QueryContext(v.ctx, query, blobInfo.Hash) if err != nil { return fmt.Errorf("failed to query blob chunks: %w", err) } defer func() { _ = rows.Close() }() var lastOffset int64 = -1 chunkCount := 0 totalRead := int64(0) // Verify each chunk in the blob for rows.Next() { var chunkHash string var offset, length int64 if err := rows.Scan(&chunkHash, &offset, &length); err != nil { return fmt.Errorf("failed to scan chunk row: %w", err) } // Verify chunk ordering if offset <= lastOffset { return fmt.Errorf("chunks out of order: offset %d after %d", offset, lastOffset) } lastOffset = offset // Read chunk data from decompressed stream if offset > totalRead { // Skip to the correct offset skipBytes := offset - totalRead if _, err := io.CopyN(io.Discard, decompressor, skipBytes); err != nil { return fmt.Errorf("failed to skip to offset %d: %w", offset, err) } totalRead = offset } // Read chunk data chunkData := make([]byte, length) if _, err := io.ReadFull(decompressor, chunkData); err != nil { return fmt.Errorf("failed to read chunk at offset %d: %w", offset, err) } totalRead += length // Verify chunk hash hasher := sha256.New() hasher.Write(chunkData) calculatedHash := hex.EncodeToString(hasher.Sum(nil)) if calculatedHash != chunkHash { return fmt.Errorf("chunk hash mismatch at offset %d: calculated %s, expected %s", offset, calculatedHash, chunkHash) } chunkCount++ } if err := rows.Err(); err != nil { return fmt.Errorf("error iterating blob chunks: %w", err) } // Verify no remaining data in blob - if chunk list is accurate, blob should be fully consumed remaining, err := io.Copy(io.Discard, decompressor) if err != nil { return fmt.Errorf("failed to check for remaining blob data: %w", err) } if remaining > 0 { return fmt.Errorf("blob has %d unexpected trailing bytes not covered by chunk list", remaining) } // Verify blob hash matches the encrypted data we downloaded calculatedBlobHash := hex.EncodeToString(blobHasher.Sum(nil)) if calculatedBlobHash != blobInfo.Hash { return fmt.Errorf("blob hash mismatch: calculated %s, expected %s", calculatedBlobHash, blobInfo.Hash) } log.Info("Blob verified", "hash", blobInfo.Hash[:16]+"...", "chunks", chunkCount, "size", humanize.Bytes(uint64(blobInfo.CompressedSize)), ) return nil } // getBlobsFromDatabase gets all blobs for the snapshot from the database func (v *Vaultik) getBlobsFromDatabase(snapshotID string, db *sql.DB) ([]snapshot.BlobInfo, error) { query := ` SELECT b.blob_hash, b.compressed_size FROM snapshot_blobs sb JOIN blobs b ON sb.blob_hash = b.blob_hash WHERE sb.snapshot_id = ? ORDER BY b.blob_hash ` rows, err := db.QueryContext(v.ctx, query, snapshotID) if err != nil { return nil, fmt.Errorf("failed to query snapshot blobs: %w", err) } defer func() { _ = rows.Close() }() var blobs []snapshot.BlobInfo for rows.Next() { var hash string var size int64 if err := rows.Scan(&hash, &size); err != nil { return nil, fmt.Errorf("failed to scan blob row: %w", err) } blobs = append(blobs, snapshot.BlobInfo{ Hash: hash, CompressedSize: size, }) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("error iterating blobs: %w", err) } return blobs, nil } // verifyManifestAgainstDatabase verifies the manifest matches the authoritative database func (v *Vaultik) verifyManifestAgainstDatabase(manifest *snapshot.Manifest, dbBlobs []snapshot.BlobInfo) error { log.Info("Verifying manifest against database") // Build map of database blobs dbBlobMap := make(map[string]int64) for _, blob := range dbBlobs { dbBlobMap[blob.Hash] = blob.CompressedSize } // Build map of manifest blobs manifestBlobMap := make(map[string]int64) for _, blob := range manifest.Blobs { manifestBlobMap[blob.Hash] = blob.CompressedSize } // Check counts match if len(dbBlobMap) != len(manifestBlobMap) { log.Warn("Manifest blob count mismatch", "database_blobs", len(dbBlobMap), "manifest_blobs", len(manifestBlobMap), ) // This is a warning, not an error - database is authoritative } // Check each manifest blob exists in database with correct size for hash, manifestSize := range manifestBlobMap { dbSize, exists := dbBlobMap[hash] if !exists { return fmt.Errorf("manifest contains blob %s not in database", hash) } if dbSize != manifestSize { return fmt.Errorf("blob %s size mismatch: database has %d bytes, manifest has %d bytes", hash, dbSize, manifestSize) } } log.Info("✓ Manifest verified against database", "manifest_blobs", len(manifestBlobMap), "database_blobs", len(dbBlobMap), ) return nil } // verifyBlobExistenceFromDB checks that all blobs from database exist in S3 func (v *Vaultik) verifyBlobExistenceFromDB(blobs []snapshot.BlobInfo) error { log.Info("Verifying blob existence in S3", "blob_count", len(blobs)) for i, blob := range blobs { // Construct blob path blobPath := fmt.Sprintf("blobs/%s/%s/%s", blob.Hash[:2], blob.Hash[2:4], blob.Hash) // Check blob exists stat, err := v.Storage.Stat(v.ctx, blobPath) if err != nil { return fmt.Errorf("blob %s missing from storage: %w", blob.Hash, err) } // Verify size matches if stat.Size != blob.CompressedSize { return fmt.Errorf("blob %s size mismatch: S3 has %d bytes, database has %d bytes", blob.Hash, stat.Size, blob.CompressedSize) } // Progress update every 100 blobs if (i+1)%100 == 0 || i == len(blobs)-1 { log.Info("Blob existence check progress", "checked", i+1, "total", len(blobs), "percent", fmt.Sprintf("%.1f%%", float64(i+1)/float64(len(blobs))*100), ) } } log.Info("✓ All blobs exist in storage") return nil } // performDeepVerificationFromDB downloads and verifies the content of each blob using database as source func (v *Vaultik) performDeepVerificationFromDB(blobs []snapshot.BlobInfo, db *sql.DB, opts *VerifyOptions) error { // Calculate total bytes for ETA var totalBytesExpected int64 for _, b := range blobs { totalBytesExpected += b.CompressedSize } log.Info("Starting deep verification - downloading and verifying all blobs", "blob_count", len(blobs), "total_size", humanize.Bytes(uint64(totalBytesExpected)), ) startTime := time.Now() bytesProcessed := int64(0) for i, blobInfo := range blobs { // Verify individual blob if err := v.verifyBlob(blobInfo, db); err != nil { return fmt.Errorf("blob %s verification failed: %w", blobInfo.Hash, err) } bytesProcessed += blobInfo.CompressedSize elapsed := time.Since(startTime) remaining := len(blobs) - (i + 1) // Calculate ETA based on bytes processed var eta time.Duration if bytesProcessed > 0 { bytesPerSec := float64(bytesProcessed) / elapsed.Seconds() bytesRemaining := totalBytesExpected - bytesProcessed if bytesPerSec > 0 { eta = time.Duration(float64(bytesRemaining)/bytesPerSec) * time.Second } } log.Info("Verification progress", "blobs_done", i+1, "blobs_total", len(blobs), "blobs_remaining", remaining, "bytes_done", bytesProcessed, "bytes_done_human", humanize.Bytes(uint64(bytesProcessed)), "bytes_total", totalBytesExpected, "bytes_total_human", humanize.Bytes(uint64(totalBytesExpected)), "elapsed", elapsed.Round(time.Second), "eta", eta.Round(time.Second), ) if !opts.JSON { v.printfStdout(" Verified %d/%d blobs (%d remaining) - %s/%s - elapsed %s, eta %s\n", i+1, len(blobs), remaining, humanize.Bytes(uint64(bytesProcessed)), humanize.Bytes(uint64(totalBytesExpected)), elapsed.Round(time.Second), eta.Round(time.Second)) } } totalElapsed := time.Since(startTime) log.Info("✓ Deep verification completed successfully", "blobs_verified", len(blobs), "total_bytes", bytesProcessed, "total_bytes_human", humanize.Bytes(uint64(bytesProcessed)), "duration", totalElapsed.Round(time.Second), ) return nil }