Compare commits
9 Commits
39d5d21d48
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 017ad7d3a6 | |||
| fd759a921a | |||
| a84b911155 | |||
| 5ce1dfa39e | |||
| aa3e8f081b | |||
| 1f22b9c603 | |||
| 60abeb636a | |||
| 7ae49a1b2c | |||
| a92b1a82ad |
104
README.md
104
README.md
@@ -100,9 +100,8 @@ vaultik [--config <path>] snapshot list [--json]
|
||||
vaultik [--config <path>] snapshot verify <snapshot-id> [--deep] [--json]
|
||||
vaultik [--config <path>] snapshot purge [--keep-latest | --older-than <duration>] [--snapshot <name>...] [--force]
|
||||
vaultik [--config <path>] snapshot remove <snapshot-id|--all> [--dry-run] [--force] [--remote] [--json]
|
||||
vaultik [--config <path>] snapshot prune
|
||||
vaultik [--config <path>] snapshot cleanup
|
||||
vaultik [--config <path>] restore <snapshot-id> <target-dir> [paths...] [--verify]
|
||||
vaultik [--config <path>] snapshot restore <snapshot-id> <target-dir> [paths...] [--verify]
|
||||
vaultik [--config <path>] prune [--force] [--json]
|
||||
vaultik [--config <path>] info
|
||||
vaultik [--config <path>] remote info [--json]
|
||||
@@ -123,7 +122,7 @@ vaultik version
|
||||
|
||||
### environment variables
|
||||
|
||||
* `VAULTIK_AGE_SECRET_KEY`: Age private key for decryption (required for `restore` and `verify --deep`)
|
||||
* `VAULTIK_AGE_SECRET_KEY`: Age private key for decryption (required for `snapshot restore` and `snapshot verify --deep`)
|
||||
* `VAULTIK_CONFIG`: Path to config file (overridden by `--config`)
|
||||
* `VAULTIK_INDEX_PATH`: Override local SQLite index path
|
||||
|
||||
@@ -157,11 +156,13 @@ existing file. Created with mode `0600` since it will contain credentials.
|
||||
**`config edit`**: Open the config file in `$EDITOR` (falls back to `vi`).
|
||||
|
||||
**`config get`**: Print a config value addressed by dotted YAML path
|
||||
(e.g. `vaultik config get s3.bucket`). Non-scalar values print as YAML.
|
||||
(e.g. `vaultik config get storage_url`). Non-scalar values print as YAML.
|
||||
|
||||
**`config set`**: Set a scalar config value by dotted YAML path
|
||||
(e.g. `vaultik config set compression_level 9`). Comments and formatting
|
||||
in the file are preserved; intermediate maps are created as needed.
|
||||
(e.g. `vaultik config set compression_level 9`,
|
||||
`vaultik config set storage_url "file:///mnt/backups"`). Comments and
|
||||
formatting in the file are preserved; intermediate maps are created as
|
||||
needed.
|
||||
|
||||
**`snapshot create`**: Perform incremental backup of configured snapshots.
|
||||
* Optional snapshot names argument to create specific snapshots (default: all)
|
||||
@@ -176,7 +177,11 @@ in the file are preserved; intermediate maps are created as needed.
|
||||
* `--keep-newer-than <duration>`: With `--prune`, keep snapshots newer than
|
||||
this duration instead of only the latest (e.g. `4w`, `30d`, `6mo`, `1y`)
|
||||
|
||||
**`snapshot list`**: List all snapshots with their timestamps and sizes.
|
||||
**`snapshot list`**: Show every snapshot known to the destination
|
||||
store with timestamps and three sizes per snapshot (compressed
|
||||
remote size; total uncompressed chunk size; size of chunks newly
|
||||
referenced by that snapshot). The uncompressed and "new chunk"
|
||||
columns show `<remote only>` for snapshots not in the local index.
|
||||
* `--json`: Output in JSON format
|
||||
|
||||
**`snapshot verify`**: Verify snapshot integrity.
|
||||
@@ -194,28 +199,31 @@ latest globally).
|
||||
* `--force`: Skip confirmation prompt
|
||||
|
||||
**`snapshot remove`**: Remove a specific snapshot from the local database.
|
||||
Automatically cleans up local rows (files, chunks, blobs) that the removed
|
||||
snapshot was the last referrer for — you don't need a separate prune step
|
||||
after removal.
|
||||
* `--remote`: Also remove snapshot metadata from remote storage
|
||||
* `--all`: Remove all snapshots (requires `--force`)
|
||||
* `--dry-run`: Show what would be deleted without deleting
|
||||
* `--force`: Skip confirmation prompt
|
||||
* `--json`: Output result as JSON
|
||||
|
||||
**`snapshot prune`**: Clean orphaned data from the local database (files,
|
||||
chunks, blobs not referenced by any snapshot).
|
||||
|
||||
**`snapshot cleanup`**: Remove stale local snapshot records that have no
|
||||
corresponding metadata in remote storage. These are typically left behind
|
||||
by incomplete or interrupted backups. Does not touch remote storage.
|
||||
|
||||
**`restore`**: Restore files from a backup snapshot.
|
||||
**`snapshot restore`**: Restore files from a backup snapshot.
|
||||
* Requires `VAULTIK_AGE_SECRET_KEY` environment variable
|
||||
* Optional path arguments to restore specific files/directories (default: all)
|
||||
* Preserves file permissions, timestamps, ownership (ownership requires root),
|
||||
symlinks, and empty directories
|
||||
* `--verify`: After restoring, verify every file's chunk hashes match
|
||||
|
||||
**`prune`**: Remove unreferenced blobs from remote storage.
|
||||
* Scans all snapshot manifests for referenced blobs, deletes any blob not referenced
|
||||
**`prune`**: Tidy up everything that isn't needed. Removes orphaned local
|
||||
database rows (files, chunks, blobs no longer referenced by any completed
|
||||
snapshot) AND deletes unreferenced blobs from remote storage. `snapshot
|
||||
create --prune`, `snapshot remove`, and `snapshot purge` run the same
|
||||
cleanup automatically; this is the manual entry point for the same work.
|
||||
* `--force`: Skip confirmation prompt
|
||||
* `--json`: Output stats as JSON
|
||||
|
||||
@@ -385,13 +393,71 @@ Key fields:
|
||||
|
||||
## roadmap
|
||||
|
||||
Items for future releases:
|
||||
Items still to do before / shortly after 1.0. Loosely ordered by
|
||||
priority.
|
||||
|
||||
* Error-condition tests (network failures, disk full, corrupted/missing blobs)
|
||||
* Parallel blob downloads during restore
|
||||
* Bandwidth limiting (`--bwlimit`)
|
||||
* Security audit of encryption implementation
|
||||
* Man pages and richer `--help` examples
|
||||
### correctness and operability
|
||||
|
||||
* **Security audit of the encryption implementation.** Pre-1.0
|
||||
blocker if we're advertising "secure" at the top of this README.
|
||||
age + zstd + content-defined chunking is mostly off-the-shelf
|
||||
pieces, but the seams (key handling, recipient parsing, manifest
|
||||
trust boundary, restore-time identity validation) need an outside
|
||||
read.
|
||||
* **Error-condition tests.** Today's coverage is the happy path
|
||||
plus a few specific regressions. Need fault-injection coverage:
|
||||
network failures mid-blob, disk-full during restore, corrupted /
|
||||
truncated / missing blobs, partial uploads, kill -9 between
|
||||
manifest and db.zst.age writes.
|
||||
* **Verify restored content end-to-end in CI.** The current
|
||||
integration test does this for a small synthetic snapshot but
|
||||
not at scale. A nightly job against a multi-GB representative
|
||||
snapshot would catch silent regressions in the chunker, packer,
|
||||
or restore planner.
|
||||
|
||||
### performance
|
||||
|
||||
* **Parallel blob downloads during restore.** Single-stream right
|
||||
now. With a fast S3 endpoint and a multi-core machine restore is
|
||||
bound by per-blob fetch + decrypt + decompress; running N of
|
||||
those in parallel against the disk cache would close most of the
|
||||
remaining gap. Needs to interact correctly with the locality
|
||||
planner and sweeper.
|
||||
* **Bandwidth limiting (`--bwlimit`).** Both upload and download.
|
||||
Useful for backing up over a shared link. Tricky to make work
|
||||
correctly with the parallel-download story.
|
||||
* **Restart of interrupted restore.** Today restore is restartable
|
||||
in the sense that re-running it overwrites partial output; it
|
||||
doesn't resume from where it stopped or skip already-present
|
||||
files. A `--resume` mode that checks targets before fetching
|
||||
blobs would matter for very large restores.
|
||||
|
||||
### usability
|
||||
|
||||
* **Man pages and richer `--help` examples.** Cobra generates
|
||||
basic help; man pages would be a separate target.
|
||||
* **`--bwlimit` style human-readable size flags** across the
|
||||
command surface where they're currently raw integers.
|
||||
* **`vaultik snapshot diff <a> <b>`** — show which files changed
|
||||
between two snapshots without restoring either.
|
||||
* **Status reporting hook for `--cron`.** When a backup fails
|
||||
silently in cron, the user has no idea. A configurable
|
||||
webhook / email / `notify-send` hook on completion (success and
|
||||
failure) would close the loop.
|
||||
|
||||
### infrastructure
|
||||
|
||||
* **Cross-machine restore documentation.** The "restore from
|
||||
another host" workflow works but isn't documented as a
|
||||
first-class operation in this README. Worth a dedicated section
|
||||
once it's settled.
|
||||
* **Schema migrations.** Currently nonexistent — pre-1.0 schema
|
||||
changes are handled by `vaultik database purge` plus a full
|
||||
re-scan. Post-1.0 we'll need a migration story to keep existing
|
||||
index databases usable across upgrades.
|
||||
* **Storage backend coverage tests.** S3, file://, and rclone://
|
||||
all share the Storer interface but the rclone path is the least
|
||||
exercised in CI.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -285,7 +285,7 @@ func newConfigEditCommand() *cobra.Command {
|
||||
func newConfigGetCommand() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "get <key>",
|
||||
Short: "Print a config value by dotted path (e.g. s3.bucket)",
|
||||
Short: "Print a config value by dotted path (e.g. storage_url, compression_level)",
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
path, err := ResolveConfigPath()
|
||||
@@ -328,9 +328,10 @@ the file back, preserving comments and formatting. Intermediate maps
|
||||
are created as needed.
|
||||
|
||||
Examples:
|
||||
vaultik config set storage_url "file:///mnt/backups"
|
||||
vaultik config set storage_url "s3://bucket/prefix?endpoint=host®ion=us-east-1"
|
||||
vaultik config set compression_level 9
|
||||
vaultik config set s3.bucket mybucket
|
||||
vaultik config set storage_url "file:///mnt/backups"`,
|
||||
vaultik config set s3.bucket mybucket # legacy S3 fields still supported`,
|
||||
Args: cobra.ExactArgs(2),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
path, err := ResolveConfigPath()
|
||||
|
||||
@@ -16,14 +16,19 @@ func NewPruneCommand() *cobra.Command {
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "prune",
|
||||
Short: "Remove unreferenced blobs",
|
||||
Long: `Removes blobs that are not referenced by any snapshot.
|
||||
Short: "Tidy local database and remote storage",
|
||||
Long: `Removes orphaned data from both the local index database and
|
||||
unreferenced blobs from the backup destination store.
|
||||
|
||||
This command scans all snapshots and their manifests to build a list of
|
||||
referenced blobs, then removes any blobs in storage that are not in this list.
|
||||
Local cleanup drops incomplete snapshots and any files, chunks, or
|
||||
blobs no longer referenced by a completed snapshot. Remote cleanup
|
||||
scans every snapshot manifest in the destination store, builds the
|
||||
set of still-referenced blob hashes, and deletes any blob not in that
|
||||
set.
|
||||
|
||||
Use this command after deleting snapshots with 'vaultik purge' to reclaim
|
||||
storage space.`,
|
||||
Snapshot create --prune and snapshot remove run the same cleanup
|
||||
automatically; this command is the manual entry point for the same
|
||||
work (e.g. after a crashed backup or to reclaim storage).`,
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// Use unified config resolution
|
||||
@@ -49,7 +54,7 @@ storage space.`,
|
||||
// Start the prune operation in a goroutine
|
||||
go func() {
|
||||
// Run the prune operation
|
||||
if err := v.PruneBlobs(opts); err != nil {
|
||||
if err := v.Prune(opts); err != nil {
|
||||
if err != context.Canceled {
|
||||
if !opts.JSON {
|
||||
log.Error("Prune operation failed", "error", err)
|
||||
|
||||
@@ -25,7 +25,6 @@ func NewSnapshotCommand() *cobra.Command {
|
||||
cmd.AddCommand(newSnapshotPurgeCommand())
|
||||
cmd.AddCommand(newSnapshotVerifyCommand())
|
||||
cmd.AddCommand(newSnapshotRemoveCommand())
|
||||
cmd.AddCommand(newSnapshotPruneCommand())
|
||||
cmd.AddCommand(newSnapshotCleanupCommand())
|
||||
cmd.AddCommand(newSnapshotRestoreCommand())
|
||||
|
||||
@@ -415,64 +414,6 @@ Use --all --force to remove all snapshots.`,
|
||||
return cmd
|
||||
}
|
||||
|
||||
// newSnapshotPruneCommand creates the 'snapshot prune' subcommand
|
||||
func newSnapshotPruneCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "prune",
|
||||
Short: "Remove orphaned data from local database",
|
||||
Long: `Removes orphaned files, chunks, and blobs from the local database.
|
||||
|
||||
This cleans up data that is no longer referenced by any snapshot, which can
|
||||
accumulate from incomplete backups or deleted snapshots.`,
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// Use unified config resolution
|
||||
configPath, err := ResolveConfigPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rootFlags := GetRootFlags()
|
||||
return RunWithApp(cmd.Context(), AppOptions{
|
||||
ConfigPath: configPath,
|
||||
LogOptions: log.LogOptions{
|
||||
Verbose: rootFlags.Verbose,
|
||||
Debug: rootFlags.Debug,
|
||||
Quiet: rootFlags.Quiet,
|
||||
},
|
||||
Modules: []fx.Option{},
|
||||
Invokes: []fx.Option{
|
||||
fx.Invoke(func(v *vaultik.Vaultik, lc fx.Lifecycle) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
go func() {
|
||||
if _, err := v.PruneDatabase(); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Failed to prune database", "error", err)
|
||||
ReportError("Failed to prune database: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err := v.Shutdowner.Shutdown(); err != nil {
|
||||
log.Error("Failed to shutdown", "error", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
v.Cancel()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// newSnapshotCleanupCommand creates the 'snapshot cleanup' subcommand
|
||||
func newSnapshotCleanupCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
|
||||
@@ -130,6 +130,51 @@ func (r *BlobRepository) GetByID(ctx context.Context, id string) (*Blob, error)
|
||||
return &blob, nil
|
||||
}
|
||||
|
||||
// GetAll returns every blob row keyed by blob ID. Useful at restore
|
||||
// start to translate the per-chunk blob_id references in chunkToBlobMap
|
||||
// into blob hashes without doing one GetByID query per chunk.
|
||||
func (r *BlobRepository) GetAll(ctx context.Context) (map[string]*Blob, error) {
|
||||
query := `
|
||||
SELECT id, blob_hash, created_ts, finished_ts, uncompressed_size, compressed_size, uploaded_ts
|
||||
FROM blobs
|
||||
`
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying blobs: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
out := make(map[string]*Blob)
|
||||
for rows.Next() {
|
||||
var blob Blob
|
||||
var createdTSUnix int64
|
||||
var finishedTSUnix, uploadedTSUnix sql.NullInt64
|
||||
if err := rows.Scan(
|
||||
&blob.ID,
|
||||
&blob.Hash,
|
||||
&createdTSUnix,
|
||||
&finishedTSUnix,
|
||||
&blob.UncompressedSize,
|
||||
&blob.CompressedSize,
|
||||
&uploadedTSUnix,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("scanning blob: %w", err)
|
||||
}
|
||||
blob.CreatedTS = time.Unix(createdTSUnix, 0).UTC()
|
||||
if finishedTSUnix.Valid {
|
||||
ts := time.Unix(finishedTSUnix.Int64, 0).UTC()
|
||||
blob.FinishedTS = &ts
|
||||
}
|
||||
if uploadedTSUnix.Valid {
|
||||
ts := time.Unix(uploadedTSUnix.Int64, 0).UTC()
|
||||
blob.UploadedTS = &ts
|
||||
}
|
||||
out[blob.ID.String()] = &blob
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// UpdateFinished updates a blob when it's finalized
|
||||
func (r *BlobRepository) UpdateFinished(ctx context.Context, tx *sql.Tx, id string, hash string, uncompressedSize, compressedSize int64) error {
|
||||
query := `
|
||||
|
||||
@@ -46,8 +46,12 @@ func Initialize(cfg Config) {
|
||||
var level slog.Level
|
||||
|
||||
if cfg.Cron || cfg.Quiet {
|
||||
// In quiet/cron mode, only show errors
|
||||
level = slog.LevelError
|
||||
// In cron/quiet mode keep warnings and errors visible — the
|
||||
// whole point of --cron is to stay silent only on total
|
||||
// success, so that anything cron emails to root is genuinely
|
||||
// "something went wrong, look at it." A backup with stuck
|
||||
// permission errors or skipped files should NOT be silent.
|
||||
level = slog.LevelWarn
|
||||
} else if cfg.Debug || strings.Contains(os.Getenv("GODEBUG"), "vaultik") {
|
||||
level = slog.LevelDebug
|
||||
} else if cfg.Verbose {
|
||||
|
||||
40
internal/snapshot/remotekey.go
Normal file
40
internal/snapshot/remotekey.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package snapshot
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
)
|
||||
|
||||
// remoteKeyPrefix is mixed into the snapshot ID hash so the resulting
|
||||
// hex digest is domain-separated from any other "double SHA256 of a
|
||||
// string" identifier the user might also use. Keeping this stable is a
|
||||
// hard compatibility requirement: changing it invalidates every
|
||||
// existing snapshot's remote storage path.
|
||||
const remoteKeyPrefix = "vaultik|"
|
||||
|
||||
// RemoteSnapshotKey returns the storage-side identifier for a snapshot
|
||||
// given its human snapshot ID. It is hex(SHA256(SHA256(prefix + id))).
|
||||
// The two SHA256 rounds match Bitcoin's "hash256" convention so the
|
||||
// output looks like a 64-character hex blob with no exploitable
|
||||
// structure visible to a remote observer.
|
||||
//
|
||||
// We use this in three places:
|
||||
//
|
||||
// - the "metadata/<remote-key>/..." subdirectory on the storage
|
||||
// backend so a directory listing of the bucket / file:// dest
|
||||
// doesn't reveal hostnames, configured snapshot names, or backup
|
||||
// timestamps;
|
||||
// - the `snapshot_id` field of the unencrypted manifest.json.zst
|
||||
// for the same reason;
|
||||
// - any code path that needs to translate a known local snapshot ID
|
||||
// into the path it would occupy on remote storage.
|
||||
//
|
||||
// The human ID stays the user-visible handle everywhere else — local
|
||||
// database joins, CLI arguments, summary lines, log fields — because
|
||||
// it's never written to the public bytes once this function gates
|
||||
// every storage-path construction.
|
||||
func RemoteSnapshotKey(snapshotID string) string {
|
||||
first := sha256.Sum256([]byte(remoteKeyPrefix + snapshotID))
|
||||
second := sha256.Sum256(first[:])
|
||||
return hex.EncodeToString(second[:])
|
||||
}
|
||||
@@ -1177,16 +1177,17 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW
|
||||
finishedBlob := blobWithReader.FinishedBlob
|
||||
|
||||
// Check if blob already exists (deduplication after restart)
|
||||
destination := s.storage.Info().Location
|
||||
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
||||
log.Info("Blob already exists in storage, skipping upload",
|
||||
"hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
s.ui.Info("Blob %s (%s) already exists in backup destination store. Skipping upload.",
|
||||
s.ui.Hex(finishedBlob.Hash), s.ui.Size(finishedBlob.Compressed))
|
||||
s.ui.Info("Blob %s (%s) already exists at %s. Skipping upload.",
|
||||
s.ui.Hex(finishedBlob.Hash), s.ui.Size(finishedBlob.Compressed), s.ui.Path(destination))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
s.ui.Begin("Uploading blob %s (%s) to backup destination store.",
|
||||
s.ui.Hex(finishedBlob.Hash), s.ui.Size(finishedBlob.Compressed))
|
||||
s.ui.Begin("Uploading blob %s (%s) to %s.",
|
||||
s.ui.Hex(finishedBlob.Hash), s.ui.Size(finishedBlob.Compressed), s.ui.Path(destination))
|
||||
|
||||
progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob, startTime)
|
||||
|
||||
|
||||
@@ -314,10 +314,17 @@ func (sm *SnapshotManager) prepareExportDB(ctx context.Context, dbPath, snapshot
|
||||
return finalData, tempDBPath, nil
|
||||
}
|
||||
|
||||
// uploadSnapshotArtifacts uploads the database backup and blob manifest to S3
|
||||
// uploadSnapshotArtifacts uploads the database backup and blob manifest
|
||||
// to remote storage at metadata/<remote-key>/, where remote-key is the
|
||||
// double-SHA256 derivation of the snapshot ID (see RemoteSnapshotKey).
|
||||
// We never write the human-readable snapshot ID into any unencrypted
|
||||
// part of remote storage so a listing of the destination bucket leaks
|
||||
// no host, configuration, or scheduling information.
|
||||
func (sm *SnapshotManager) uploadSnapshotArtifacts(ctx context.Context, snapshotID string, dbData, manifestData []byte) error {
|
||||
remoteKey := RemoteSnapshotKey(snapshotID)
|
||||
|
||||
// Upload database backup (compressed and encrypted)
|
||||
dbKey := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
||||
dbKey := fmt.Sprintf("metadata/%s/db.zst.age", remoteKey)
|
||||
|
||||
dbUploadStart := time.Now()
|
||||
if err := sm.storage.Put(ctx, dbKey, bytes.NewReader(dbData)); err != nil {
|
||||
@@ -332,7 +339,7 @@ func (sm *SnapshotManager) uploadSnapshotArtifacts(ctx context.Context, snapshot
|
||||
"speed", humanize.SI(dbUploadSpeed, "bps"))
|
||||
|
||||
// Upload blob manifest (compressed only, not encrypted)
|
||||
manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", remoteKey)
|
||||
manifestUploadStart := time.Now()
|
||||
if err := sm.storage.Put(ctx, manifestKey, bytes.NewReader(manifestData)); err != nil {
|
||||
return fmt.Errorf("uploading blob manifest: %w", err)
|
||||
@@ -607,9 +614,11 @@ func (sm *SnapshotManager) generateBlobManifest(ctx context.Context, dbPath stri
|
||||
}
|
||||
}
|
||||
|
||||
// Create manifest
|
||||
// Create manifest. SnapshotID in the unencrypted manifest is the
|
||||
// double-SHA256 remote key, not the human ID, so the public bytes
|
||||
// don't reveal hostname/snapshot-name/timestamp metadata.
|
||||
manifest := &Manifest{
|
||||
SnapshotID: snapshotID,
|
||||
SnapshotID: RemoteSnapshotKey(snapshotID),
|
||||
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
||||
BlobCount: len(blobs),
|
||||
TotalCompressedSize: totalCompressedSize,
|
||||
@@ -680,8 +689,9 @@ func (sm *SnapshotManager) CleanupIncompleteSnapshots(ctx context.Context, hostn
|
||||
|
||||
// Check each incomplete snapshot for metadata in storage
|
||||
for _, snapshot := range incompleteSnapshots {
|
||||
// Check if metadata exists in storage
|
||||
metadataKey := fmt.Sprintf("metadata/%s/db.zst", snapshot.ID)
|
||||
// Check if metadata exists in storage (paths use the hashed
|
||||
// remote key so we don't leak host info to the listing).
|
||||
metadataKey := fmt.Sprintf("metadata/%s/db.zst", RemoteSnapshotKey(snapshot.ID.String()))
|
||||
_, err := sm.storage.Stat(ctx, metadataKey)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -19,15 +19,20 @@ type FileStorer struct {
|
||||
}
|
||||
|
||||
// NewFileStorer creates a new filesystem storage backend.
|
||||
// The basePath directory will be created if it doesn't exist.
|
||||
// Uses the real OS filesystem by default; call SetFilesystem to override for testing.
|
||||
//
|
||||
// Construction is intentionally cheap and does not touch the filesystem.
|
||||
// The basePath is recorded; the directory is created lazily on first
|
||||
// write. Reads (Get/Stat/List) tolerate a missing basePath — a missing
|
||||
// or unmounted destination during `snapshot list` should NOT block the
|
||||
// command, it should degrade to "no remote snapshots reachable" with a
|
||||
// warning. Write operations (Put/PutWithProgress) call MkdirAll for the
|
||||
// per-blob parent directory, which also covers basePath on first use.
|
||||
//
|
||||
// Uses the real OS filesystem by default; call SetFilesystem to
|
||||
// override for testing.
|
||||
func NewFileStorer(basePath string) (*FileStorer, error) {
|
||||
fs := afero.NewOsFs()
|
||||
if err := fs.MkdirAll(basePath, 0755); err != nil {
|
||||
return nil, fmt.Errorf("file:// storage: cannot create or access %s: %w (check that the volume is mounted and writable)", basePath, err)
|
||||
}
|
||||
return &FileStorer{
|
||||
fs: fs,
|
||||
fs: afero.NewOsFs(),
|
||||
basePath: basePath,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package vaultik
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
@@ -15,9 +16,22 @@ type blobDiskCacheEntry struct {
|
||||
next *blobDiskCacheEntry
|
||||
}
|
||||
|
||||
// blobDiskCache is an LRU cache that stores blobs on disk instead of in memory.
|
||||
// Blobs are written to a temp directory keyed by their hash. When total size
|
||||
// exceeds maxBytes, the least-recently-used entries are evicted (deleted from disk).
|
||||
// blobDiskCache stores blobs on disk keyed by hash. It exposes ReadAt
|
||||
// for slice reads (the restore path uses this so chunk extraction
|
||||
// never reads a whole blob into memory) plus Get/Put for whole-blob
|
||||
// access.
|
||||
//
|
||||
// Eviction policy is caller-controlled. The cache keeps an LRU list
|
||||
// internally and will fall back to LRU eviction if curBytes exceeds
|
||||
// maxBytes. Restore passes math.MaxInt64 as maxBytes and drives
|
||||
// eviction itself via Delete() through restoreSweeper, which deletes
|
||||
// each blob the moment every file that references its chunks has been
|
||||
// written. LRU never fires under that configuration; it is kept as a
|
||||
// safety net for callers that don't manage eviction themselves.
|
||||
//
|
||||
// Get/ReadAt/peak-Len counters are debugging instrumentation used by
|
||||
// tests to assert that the restore code path uses ReadAt rather than
|
||||
// Get and to bound peak disk-cache occupancy.
|
||||
type blobDiskCache struct {
|
||||
mu sync.Mutex
|
||||
dir string
|
||||
@@ -26,6 +40,11 @@ type blobDiskCache struct {
|
||||
items map[string]*blobDiskCacheEntry
|
||||
head *blobDiskCacheEntry // most recent
|
||||
tail *blobDiskCacheEntry // least recent
|
||||
|
||||
// Instrumentation. Mutated under mu; readable via the methods below.
|
||||
getCalls int
|
||||
readAtCalls int
|
||||
peakLen int
|
||||
}
|
||||
|
||||
// newBlobDiskCache creates a new disk-based blob cache with the given max size.
|
||||
@@ -115,12 +134,77 @@ func (c *blobDiskCache) Put(key string, data []byte) error {
|
||||
c.evictLRU()
|
||||
}
|
||||
|
||||
if n := len(c.items); n > c.peakLen {
|
||||
c.peakLen = n
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutFromReader streams r into the cache file for key, returning the
|
||||
// total number of bytes written. Unlike Put, the data never has to
|
||||
// reside fully in memory at any point — io.Copy uses an internal
|
||||
// 32 KiB buffer. Used by restore to land a freshly decrypted blob on
|
||||
// disk without buffering its entire plaintext (which may be tens of GB)
|
||||
// in RAM.
|
||||
func (c *blobDiskCache) PutFromReader(key string, r io.Reader) (int64, error) {
|
||||
c.mu.Lock()
|
||||
// Remove any prior entry first; we'll re-link after the file is
|
||||
// written successfully.
|
||||
if e, ok := c.items[key]; ok {
|
||||
c.unlink(e)
|
||||
c.curBytes -= e.size
|
||||
_ = os.Remove(c.path(key))
|
||||
delete(c.items, key)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
f, err := os.OpenFile(c.path(key), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("creating cache file: %w", err)
|
||||
}
|
||||
written, copyErr := io.Copy(f, r)
|
||||
closeErr := f.Close()
|
||||
if copyErr != nil {
|
||||
_ = os.Remove(c.path(key))
|
||||
return written, fmt.Errorf("streaming to cache file: %w", copyErr)
|
||||
}
|
||||
if closeErr != nil {
|
||||
_ = os.Remove(c.path(key))
|
||||
return written, fmt.Errorf("closing cache file: %w", closeErr)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// If the entry would exceed maxBytes outright, drop it on the
|
||||
// floor — but the restore path passes math.MaxInt64 as maxBytes
|
||||
// so this branch is effectively unreachable there.
|
||||
if written > c.maxBytes {
|
||||
_ = os.Remove(c.path(key))
|
||||
return written, nil
|
||||
}
|
||||
|
||||
e := &blobDiskCacheEntry{key: key, size: written}
|
||||
c.pushFront(e)
|
||||
c.items[key] = e
|
||||
c.curBytes += written
|
||||
|
||||
for c.curBytes > c.maxBytes && c.tail != nil {
|
||||
c.evictLRU()
|
||||
}
|
||||
|
||||
if n := len(c.items); n > c.peakLen {
|
||||
c.peakLen = n
|
||||
}
|
||||
|
||||
return written, nil
|
||||
}
|
||||
|
||||
// Get reads a cached blob from disk. Returns data and true on hit.
|
||||
func (c *blobDiskCache) Get(key string) ([]byte, bool) {
|
||||
c.mu.Lock()
|
||||
c.getCalls++
|
||||
e, ok := c.items[key]
|
||||
if !ok {
|
||||
c.mu.Unlock()
|
||||
@@ -147,6 +231,7 @@ func (c *blobDiskCache) Get(key string) ([]byte, bool) {
|
||||
// ReadAt reads a slice of a cached blob without loading the entire blob into memory.
|
||||
func (c *blobDiskCache) ReadAt(key string, offset, length int64) ([]byte, error) {
|
||||
c.mu.Lock()
|
||||
c.readAtCalls++
|
||||
e, ok := c.items[key]
|
||||
if !ok {
|
||||
c.mu.Unlock()
|
||||
@@ -223,6 +308,28 @@ func (c *blobDiskCache) Len() int {
|
||||
return len(c.items)
|
||||
}
|
||||
|
||||
// GetCalls returns the number of times Get has been called.
|
||||
func (c *blobDiskCache) GetCalls() int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.getCalls
|
||||
}
|
||||
|
||||
// ReadAtCalls returns the number of times ReadAt has been called.
|
||||
func (c *blobDiskCache) ReadAtCalls() int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.readAtCalls
|
||||
}
|
||||
|
||||
// PeakLen returns the maximum number of cached entries ever held at
|
||||
// once during this cache's lifetime.
|
||||
func (c *blobDiskCache) PeakLen() int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.peakLen
|
||||
}
|
||||
|
||||
// Close removes the cache directory and all cached blobs.
|
||||
func (c *blobDiskCache) Close() error {
|
||||
c.mu.Lock()
|
||||
|
||||
@@ -22,14 +22,29 @@ func (v *Vaultik) ShowInfo() error {
|
||||
v.printfStdout("Go Version: %s\n", runtime.Version())
|
||||
v.printlnStdout()
|
||||
|
||||
// Storage Configuration
|
||||
// Storage Configuration. The backend is selected by storage_url
|
||||
// (s3://, file://, rclone://); the legacy s3.* fields are only
|
||||
// printed when they're actually populated, since the URL scheme
|
||||
// is the primary configuration.
|
||||
v.printfStdout("=== Storage Configuration ===\n")
|
||||
v.printfStdout("S3 Bucket: %s\n", v.Config.S3.Bucket)
|
||||
storageInfo := v.Storage.Info()
|
||||
v.printfStdout("Type: %s\n", storageInfo.Type)
|
||||
v.printfStdout("Location: %s\n", storageInfo.Location)
|
||||
if v.Config.StorageURL != "" {
|
||||
v.printfStdout("Storage URL: %s\n", v.Config.StorageURL)
|
||||
}
|
||||
if v.Config.S3.Bucket != "" {
|
||||
v.printfStdout("S3 Bucket: %s\n", v.Config.S3.Bucket)
|
||||
}
|
||||
if v.Config.S3.Prefix != "" {
|
||||
v.printfStdout("S3 Prefix: %s\n", v.Config.S3.Prefix)
|
||||
}
|
||||
v.printfStdout("S3 Endpoint: %s\n", v.Config.S3.Endpoint)
|
||||
v.printfStdout("S3 Region: %s\n", v.Config.S3.Region)
|
||||
if v.Config.S3.Endpoint != "" {
|
||||
v.printfStdout("S3 Endpoint: %s\n", v.Config.S3.Endpoint)
|
||||
}
|
||||
if v.Config.S3.Region != "" {
|
||||
v.printfStdout("S3 Region: %s\n", v.Config.S3.Region)
|
||||
}
|
||||
v.printlnStdout()
|
||||
|
||||
// Backup Settings
|
||||
@@ -337,7 +352,7 @@ func (v *Vaultik) printRemoteInfoTable(result *RemoteInfoResult) {
|
||||
humanize.Comma(int64(result.OrphanedBlobCount)), humanize.Bytes(uint64(result.OrphanedBlobSize)))
|
||||
|
||||
if result.OrphanedBlobCount > 0 {
|
||||
v.printfStdout("\nRun 'vaultik prune --remote' to remove orphaned blobs.\n")
|
||||
v.printfStdout("\nRun 'vaultik prune' to remove orphaned blobs.\n")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -651,10 +651,12 @@ func TestEndToEndFileStorage(t *testing.T) {
|
||||
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, snapshotID))
|
||||
|
||||
// Verify the backup actually landed on disk under blobs/ and metadata/.
|
||||
// The metadata subdirectory uses the hashed remote key, not the human
|
||||
// snapshot ID, so the on-disk structure doesn't leak hostname/name/time.
|
||||
blobInfo, err := os.Stat(filepath.Join(storeDir, "blobs"))
|
||||
require.NoError(t, err)
|
||||
require.True(t, blobInfo.IsDir())
|
||||
metaInfo, err := os.Stat(filepath.Join(storeDir, "metadata", snapshotID))
|
||||
metaInfo, err := os.Stat(filepath.Join(storeDir, "metadata", snapshot.RemoteSnapshotKey(snapshotID)))
|
||||
require.NoError(t, err)
|
||||
require.True(t, metaInfo.IsDir())
|
||||
|
||||
|
||||
@@ -48,6 +48,19 @@ type PruneBlobsResult struct {
|
||||
BytesFreed int64 `json:"bytes_freed"`
|
||||
}
|
||||
|
||||
// Prune removes orphaned data from the local index database AND
|
||||
// unreferenced blobs from the backup destination store. This is the
|
||||
// single user-facing prune entry point — the split between local and
|
||||
// remote cleanup is an implementation detail. Calling code should
|
||||
// prefer this method over PruneDatabase or PruneBlobs individually
|
||||
// unless it specifically wants one half.
|
||||
func (v *Vaultik) Prune(opts *PruneOptions) error {
|
||||
if _, err := v.PruneDatabase(); err != nil {
|
||||
return fmt.Errorf("pruning local database: %w", err)
|
||||
}
|
||||
return v.PruneBlobs(opts)
|
||||
}
|
||||
|
||||
// PruneBlobs removes unreferenced blobs from storage
|
||||
func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
log.Info("Starting prune operation")
|
||||
@@ -110,20 +123,22 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
// collectReferencedBlobs downloads all manifests and returns the set of referenced blob hashes
|
||||
func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) {
|
||||
log.Info("Listing remote snapshots")
|
||||
snapshotIDs, err := v.listUniqueSnapshotIDs()
|
||||
// IDs returned by listUniqueSnapshotIDs are remote keys (hashed
|
||||
// subdirectories under metadata/), not human snapshot IDs.
|
||||
remoteKeys, err := v.listUniqueSnapshotIDs()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing snapshot IDs: %w", err)
|
||||
return nil, fmt.Errorf("listing snapshot keys: %w", err)
|
||||
}
|
||||
log.Info("Found manifests in remote storage", "count", len(snapshotIDs))
|
||||
log.Info("Found manifests in remote storage", "count", len(remoteKeys))
|
||||
|
||||
allBlobsReferenced := make(map[string]bool)
|
||||
manifestCount := 0
|
||||
|
||||
for _, snapshotID := range snapshotIDs {
|
||||
log.Debug("Processing manifest", "snapshot_id", snapshotID)
|
||||
manifest, err := v.downloadManifest(snapshotID)
|
||||
for _, remoteKey := range remoteKeys {
|
||||
log.Debug("Processing manifest", "remote_key", remoteKey)
|
||||
manifest, err := v.downloadManifestByKey(remoteKey)
|
||||
if err != nil {
|
||||
log.Error("Failed to download manifest", "snapshot_id", snapshotID, "error", err)
|
||||
log.Error("Failed to download manifest", "remote_key", remoteKey, "error", err)
|
||||
continue
|
||||
}
|
||||
for _, blob := range manifest.Blobs {
|
||||
|
||||
@@ -132,7 +132,9 @@ func (s *testStorer) Info() storage.StorageInfo {
|
||||
}
|
||||
}
|
||||
|
||||
// addManifest creates a compressed manifest in storage
|
||||
// addManifest creates a compressed manifest in storage at the same
|
||||
// hashed path the production code uses. snapshotID is the human ID;
|
||||
// the storage path uses RemoteSnapshotKey(id).
|
||||
func addManifest(t *testing.T, store *testStorer, snapshotID string, blobHashes []string) {
|
||||
t.Helper()
|
||||
|
||||
@@ -144,8 +146,9 @@ func addManifest(t *testing.T, store *testStorer, snapshotID string, blobHashes
|
||||
}
|
||||
}
|
||||
|
||||
remoteKey := snapshot.RemoteSnapshotKey(snapshotID)
|
||||
manifest := &snapshot.Manifest{
|
||||
SnapshotID: snapshotID,
|
||||
SnapshotID: remoteKey,
|
||||
BlobCount: len(blobs),
|
||||
Blobs: blobs,
|
||||
}
|
||||
@@ -153,11 +156,19 @@ func addManifest(t *testing.T, store *testStorer, snapshotID string, blobHashes
|
||||
data, err := snapshot.EncodeManifest(manifest, 3)
|
||||
require.NoError(t, err)
|
||||
|
||||
key := "metadata/" + snapshotID + "/manifest.json.zst"
|
||||
key := "metadata/" + remoteKey + "/manifest.json.zst"
|
||||
err = store.Put(context.Background(), key, bytes.NewReader(data))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// remoteKeyPath returns the storage-relative path to a snapshot's
|
||||
// metadata directory or manifest under the hashed remote-key scheme.
|
||||
// Tests use this in hasKey/asserts to avoid scattering RemoteSnapshotKey
|
||||
// calls throughout.
|
||||
func remoteKeyPath(snapshotID, suffix string) string {
|
||||
return "metadata/" + snapshot.RemoteSnapshotKey(snapshotID) + "/" + suffix
|
||||
}
|
||||
|
||||
// addBlob adds a fake blob to storage
|
||||
func addBlob(t *testing.T, store *testStorer, hash string) {
|
||||
t.Helper()
|
||||
@@ -198,7 +209,7 @@ func TestRemoveSnapshot_LocalOnly(t *testing.T) {
|
||||
// Blobs should NOT be deleted (that's what prune is for)
|
||||
assert.True(t, store.hasKey("blobs/aa/aa/"+blobA))
|
||||
// Remote metadata should NOT be deleted (no --remote flag)
|
||||
assert.True(t, store.hasKey("metadata/snapshot-001/manifest.json.zst"))
|
||||
assert.True(t, store.hasKey(remoteKeyPath("snapshot-001", "manifest.json.zst")))
|
||||
|
||||
// Verify output
|
||||
assert.Contains(t, tv.Stdout.String(), "Removed snapshot 'snapshot-001' from local database")
|
||||
@@ -225,7 +236,7 @@ func TestRemoveSnapshot_WithRemote(t *testing.T) {
|
||||
// Blobs should NOT be deleted
|
||||
assert.True(t, store.hasKey("blobs/aa/aa/"+blobA))
|
||||
// Remote metadata SHOULD be deleted
|
||||
assert.False(t, store.hasKey("metadata/snapshot-001/manifest.json.zst"))
|
||||
assert.False(t, store.hasKey(remoteKeyPath("snapshot-001", "manifest.json.zst")))
|
||||
|
||||
// Verify output mentions prune
|
||||
assert.Contains(t, tv.Stdout.String(), "Removed snapshot 'snapshot-001' from local database")
|
||||
@@ -255,7 +266,7 @@ func TestRemoveSnapshot_DryRun(t *testing.T) {
|
||||
// Nothing should be deleted
|
||||
assert.Equal(t, initialCount, store.keyCount())
|
||||
assert.True(t, store.hasKey("blobs/aa/aa/"+blobA))
|
||||
assert.True(t, store.hasKey("metadata/snapshot-001/manifest.json.zst"))
|
||||
assert.True(t, store.hasKey(remoteKeyPath("snapshot-001", "manifest.json.zst")))
|
||||
|
||||
// Verify dry run message
|
||||
assert.Contains(t, tv.Stdout.String(), "[Dry run - no changes made]")
|
||||
@@ -299,8 +310,8 @@ func TestRemoveAllSnapshots_WithForce(t *testing.T) {
|
||||
// Blobs should NOT be deleted
|
||||
assert.True(t, store.hasKey("blobs/aa/aa/"+blobA))
|
||||
// Remote metadata SHOULD be deleted
|
||||
assert.False(t, store.hasKey("metadata/snapshot-001/manifest.json.zst"))
|
||||
assert.False(t, store.hasKey("metadata/snapshot-002/manifest.json.zst"))
|
||||
assert.False(t, store.hasKey(remoteKeyPath("snapshot-001", "manifest.json.zst")))
|
||||
assert.False(t, store.hasKey(remoteKeyPath("snapshot-002", "manifest.json.zst")))
|
||||
|
||||
// Verify output
|
||||
assert.Contains(t, tv.Stdout.String(), "Removed 2 snapshot(s)")
|
||||
@@ -318,7 +329,10 @@ func TestRemoveAllSnapshots_DryRun(t *testing.T) {
|
||||
|
||||
tv := vaultik.NewForTesting(store)
|
||||
|
||||
opts := &vaultik.RemoveOptions{All: true, Force: true, DryRun: true}
|
||||
// --remote is required to enumerate orphan remote keys; without
|
||||
// it, RemoveAll only acts on local snapshots, and NewForTesting
|
||||
// has no local DB.
|
||||
opts := &vaultik.RemoveOptions{All: true, Force: true, DryRun: true, Remote: true}
|
||||
result, err := tv.RemoveAllSnapshots(opts)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"sneak.berlin/go/vaultik/internal/blobgen"
|
||||
"sneak.berlin/go/vaultik/internal/database"
|
||||
"sneak.berlin/go/vaultik/internal/log"
|
||||
"sneak.berlin/go/vaultik/internal/snapshot"
|
||||
"sneak.berlin/go/vaultik/internal/types"
|
||||
)
|
||||
|
||||
@@ -159,7 +160,12 @@ func (v *Vaultik) prepareRestoreIdentity() (age.Identity, error) {
|
||||
return identity, nil
|
||||
}
|
||||
|
||||
// restoreAllFiles iterates over files and restores each one, tracking progress and failures
|
||||
// restoreAllFiles processes files in blob-locality order: drain every
|
||||
// file whose blob set is on disk, download the missing blobs for the
|
||||
// pending file with the smallest uncached count, repeat. This keeps
|
||||
// peak cache occupancy near 1 even on snapshots whose path order
|
||||
// interleaves blobs, and lets the sweeper free each blob the moment
|
||||
// its file set is exhausted.
|
||||
func (v *Vaultik) restoreAllFiles(
|
||||
files []*database.File,
|
||||
repos *database.Repositories,
|
||||
@@ -177,13 +183,47 @@ func (v *Vaultik) restoreAllFiles(
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating blob cache: %w", err)
|
||||
}
|
||||
defer func() { _ = blobCache.Close() }()
|
||||
if v.restoreCacheObserver != nil {
|
||||
v.restoreCacheObserver(blobCache)
|
||||
}
|
||||
defer func() {
|
||||
if v.restoreCacheObserver != nil {
|
||||
v.restoreCacheObserver(blobCache)
|
||||
}
|
||||
_ = blobCache.Close()
|
||||
}()
|
||||
|
||||
// Per-restore sweep state: every blob_size_limit/100 bytes written,
|
||||
// scan the cache and delete any blob whose remaining file references
|
||||
// are all already restored.
|
||||
sweeper := newRestoreSweeper(v.ctx, repos, blobCache, v.Config.BlobSizeLimit.Int64()/100)
|
||||
|
||||
// Pre-fetch every blob row once so chunk extraction can map a
|
||||
// blob_id to its hash without a DB round-trip per chunk.
|
||||
blobsByID, err := repos.Blobs.GetAll(v.ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetching blob index: %w", err)
|
||||
}
|
||||
blobIDToHash := make(map[string]string, len(blobsByID))
|
||||
blobByHash := make(map[string]*database.Blob, len(blobsByID))
|
||||
for id, blob := range blobsByID {
|
||||
hash := blob.Hash.String()
|
||||
blobIDToHash[id] = hash
|
||||
blobByHash[hash] = blob
|
||||
}
|
||||
|
||||
plan, err := newRestorePlan(v.ctx, repos, files, chunkToBlobMap, blobIDToHash)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("building restore plan: %w", err)
|
||||
}
|
||||
|
||||
// Index files by ID so the loop can look them up by the IDs the
|
||||
// plan hands back.
|
||||
filesByID := make(map[types.FileID]*database.File, len(files))
|
||||
for _, f := range files {
|
||||
filesByID[f.ID] = f
|
||||
}
|
||||
|
||||
// Calculate total bytes expected for percentage / ETA arithmetic.
|
||||
var totalBytesExpected int64
|
||||
for _, file := range files {
|
||||
@@ -195,17 +235,65 @@ func (v *Vaultik) restoreAllFiles(
|
||||
v.UI.Size(totalBytesExpected),
|
||||
v.UI.Path(opts.TargetDir))
|
||||
|
||||
session := &restoreSession{
|
||||
v: v,
|
||||
ctx: v.ctx,
|
||||
repos: repos,
|
||||
opts: opts,
|
||||
identity: identity,
|
||||
chunkToBlobMap: chunkToBlobMap,
|
||||
blobByHash: blobByHash,
|
||||
blobIDToHash: blobIDToHash,
|
||||
blobCache: blobCache,
|
||||
sweeper: sweeper,
|
||||
result: result,
|
||||
}
|
||||
|
||||
// Periodic progress output, matching the snapshot create cadence.
|
||||
startTime := time.Now()
|
||||
lastStatusTime := startTime
|
||||
const statusInterval = 15 * time.Second
|
||||
|
||||
for i, file := range files {
|
||||
processed := 0
|
||||
for plan.hasPending() {
|
||||
if v.ctx.Err() != nil {
|
||||
return nil, v.ctx.Err()
|
||||
}
|
||||
|
||||
if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, sweeper, result); err != nil {
|
||||
fileID, ready := plan.popReady()
|
||||
if !ready {
|
||||
// No file is fully cache-served. First free any blobs
|
||||
// whose file sets are exhausted — without this, the
|
||||
// blob whose last file we just finished would still be
|
||||
// cached when we Put the next one, briefly pushing
|
||||
// peak occupancy from 1 to 2.
|
||||
sweeper.sweep()
|
||||
|
||||
// Pick the pending file with the smallest uncached
|
||||
// blob set and download its blobs. After each blob
|
||||
// lands, the plan moves any pending file whose set
|
||||
// just emptied onto the ready queue.
|
||||
next := plan.pickNextDownload()
|
||||
if next.IsZero() {
|
||||
break
|
||||
}
|
||||
for _, hash := range plan.blobsNeeded(next) {
|
||||
blob, ok := blobByHash[hash]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("blob hash %s missing from blob index", hash[:16])
|
||||
}
|
||||
if err := session.downloadBlobToCache(hash, blob.CompressedSize); err != nil {
|
||||
return nil, fmt.Errorf("downloading blob %s: %w", hash[:16], err)
|
||||
}
|
||||
result.BlobsDownloaded++
|
||||
result.BytesDownloaded += blob.CompressedSize
|
||||
plan.markBlobCached(hash)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
file := filesByID[fileID]
|
||||
if err := session.restoreFile(file); err != nil {
|
||||
log.Error("Failed to restore file", "path", file.Path, "error", err)
|
||||
if !opts.SkipErrors {
|
||||
return nil, fmt.Errorf("restoring %s: %w (pass --skip-errors to continue past restore failures)", file.Path, err)
|
||||
@@ -213,22 +301,26 @@ func (v *Vaultik) restoreAllFiles(
|
||||
v.UI.Error("Failed to restore %s: %v. Skipping (--skip-errors).", v.UI.Path(file.Path.String()), err)
|
||||
result.FilesFailed++
|
||||
result.FailedFiles = append(result.FailedFiles, file.Path.String())
|
||||
plan.finishFile(fileID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Record the file as restored so the sweeper can free blobs once
|
||||
// all referencing files are done.
|
||||
sweeper.fileRestored(file.ID.String())
|
||||
// Record the file as restored so the sweeper can free blobs
|
||||
// once all referencing files are done, and drop it from the
|
||||
// plan's indexes so future picks ignore it.
|
||||
sweeper.fileRestored(fileID.String())
|
||||
plan.finishFile(fileID)
|
||||
processed++
|
||||
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
v.printRestoreProgress(i+1, len(files), result.BytesRestored, totalBytesExpected, startTime)
|
||||
v.printRestoreProgress(processed, len(files), result.BytesRestored, totalBytesExpected, startTime)
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
|
||||
// Structured progress log for --verbose / JSON consumers.
|
||||
if (i+1)%100 == 0 || i+1 == len(files) {
|
||||
if processed%100 == 0 || processed == len(files) {
|
||||
log.Info("Restore progress",
|
||||
"files", fmt.Sprintf("%d/%d", i+1, len(files)),
|
||||
"files", fmt.Sprintf("%d/%d", processed, len(files)),
|
||||
"bytes", humanize.Bytes(uint64(result.BytesRestored)),
|
||||
)
|
||||
}
|
||||
@@ -303,10 +395,12 @@ func (v *Vaultik) handleRestoreVerification(
|
||||
return nil
|
||||
}
|
||||
|
||||
// downloadSnapshotDB downloads and decrypts the snapshot metadata database
|
||||
// downloadSnapshotDB downloads and decrypts the snapshot metadata
|
||||
// database. The snapshotID is the human ID; we hash it to the remote
|
||||
// key for the storage path.
|
||||
func (v *Vaultik) downloadSnapshotDB(snapshotID string, identity age.Identity) (*database.DB, error) {
|
||||
// Download encrypted database from storage
|
||||
dbKey := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
||||
dbKey := fmt.Sprintf("metadata/%s/db.zst.age", snapshot.RemoteSnapshotKey(snapshotID))
|
||||
|
||||
reader, err := v.Storage.Get(v.ctx, dbKey)
|
||||
if err != nil {
|
||||
@@ -424,183 +518,128 @@ func (v *Vaultik) buildChunkToBlobMap(ctx context.Context, repos *database.Repos
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// restoreFile restores a single file
|
||||
func (v *Vaultik) restoreFile(
|
||||
ctx context.Context,
|
||||
repos *database.Repositories,
|
||||
file *database.File,
|
||||
targetDir string,
|
||||
identity age.Identity,
|
||||
chunkToBlobMap map[string]*database.BlobChunk,
|
||||
blobCache *blobDiskCache,
|
||||
sweeper *restoreSweeper,
|
||||
result *RestoreResult,
|
||||
) error {
|
||||
// Calculate target path - use full original path under target directory
|
||||
targetPath := filepath.Join(targetDir, file.Path.String())
|
||||
|
||||
// Create parent directories
|
||||
parentDir := filepath.Dir(targetPath)
|
||||
if err := v.Fs.MkdirAll(parentDir, 0755); err != nil {
|
||||
return fmt.Errorf("creating parent directory: %w", err)
|
||||
}
|
||||
|
||||
// Handle symlinks
|
||||
if file.IsSymlink() {
|
||||
return v.restoreSymlink(file, targetPath, result)
|
||||
}
|
||||
|
||||
// Handle directories
|
||||
if file.Mode&uint32(os.ModeDir) != 0 {
|
||||
return v.restoreDirectory(file, targetPath, result)
|
||||
}
|
||||
|
||||
// Handle regular files
|
||||
return v.restoreRegularFile(ctx, repos, file, targetPath, identity, chunkToBlobMap, blobCache, sweeper, result)
|
||||
// restoreSession holds every piece of per-restore state shared by the
|
||||
// restore-time methods. Each restore builds one of these from the
|
||||
// snapshot's metadata and then drives the file loop through methods on
|
||||
// it. Keeping this state on the struct rather than threading it
|
||||
// through every function signature keeps the inner-loop call sites
|
||||
// readable: restoreFile(file) instead of a ten-argument helper.
|
||||
type restoreSession struct {
|
||||
v *Vaultik
|
||||
ctx context.Context
|
||||
repos *database.Repositories
|
||||
opts *RestoreOptions
|
||||
identity age.Identity
|
||||
chunkToBlobMap map[string]*database.BlobChunk
|
||||
blobByHash map[string]*database.Blob
|
||||
blobIDToHash map[string]string
|
||||
blobCache *blobDiskCache
|
||||
sweeper *restoreSweeper
|
||||
result *RestoreResult
|
||||
}
|
||||
|
||||
// restoreSymlink restores a symbolic link
|
||||
func (v *Vaultik) restoreSymlink(file *database.File, targetPath string, result *RestoreResult) error {
|
||||
// Remove existing file if it exists
|
||||
_ = v.Fs.Remove(targetPath)
|
||||
// restoreFile dispatches to the right per-kind restorer.
|
||||
func (s *restoreSession) restoreFile(file *database.File) error {
|
||||
targetPath := filepath.Join(s.opts.TargetDir, file.Path.String())
|
||||
parentDir := filepath.Dir(targetPath)
|
||||
if err := s.v.Fs.MkdirAll(parentDir, 0755); err != nil {
|
||||
return fmt.Errorf("creating parent directory: %w", err)
|
||||
}
|
||||
if file.IsSymlink() {
|
||||
return s.restoreSymlink(file, targetPath)
|
||||
}
|
||||
if file.Mode&uint32(os.ModeDir) != 0 {
|
||||
return s.restoreDirectory(file, targetPath)
|
||||
}
|
||||
return s.restoreRegularFile(file, targetPath)
|
||||
}
|
||||
|
||||
// Create symlink
|
||||
// Note: afero.MemMapFs doesn't support symlinks, so we use os for real filesystems
|
||||
if osFs, ok := v.Fs.(*afero.OsFs); ok {
|
||||
_ = osFs // silence unused variable warning
|
||||
// restoreSymlink restores a symbolic link.
|
||||
func (s *restoreSession) restoreSymlink(file *database.File, targetPath string) error {
|
||||
_ = s.v.Fs.Remove(targetPath)
|
||||
// afero.MemMapFs doesn't support symlinks, so route real-FS
|
||||
// symlinks through os.
|
||||
if _, ok := s.v.Fs.(*afero.OsFs); ok {
|
||||
if err := os.Symlink(file.LinkTarget.String(), targetPath); err != nil {
|
||||
return fmt.Errorf("creating symlink: %w", err)
|
||||
}
|
||||
} else {
|
||||
log.Debug("Symlink creation not supported on this filesystem", "path", file.Path, "target", file.LinkTarget)
|
||||
}
|
||||
|
||||
result.FilesRestored++
|
||||
s.result.FilesRestored++
|
||||
log.Debug("Restored symlink", "path", file.Path, "target", file.LinkTarget)
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreDirectory restores a directory with proper permissions
|
||||
func (v *Vaultik) restoreDirectory(file *database.File, targetPath string, result *RestoreResult) error {
|
||||
// Create directory
|
||||
if err := v.Fs.MkdirAll(targetPath, os.FileMode(file.Mode)); err != nil {
|
||||
// restoreDirectory restores a directory with its permissions, mtime,
|
||||
// and (on real filesystems, with sufficient privileges) ownership.
|
||||
func (s *restoreSession) restoreDirectory(file *database.File, targetPath string) error {
|
||||
if err := s.v.Fs.MkdirAll(targetPath, os.FileMode(file.Mode)); err != nil {
|
||||
return fmt.Errorf("creating directory: %w", err)
|
||||
}
|
||||
|
||||
// Set permissions
|
||||
if err := v.Fs.Chmod(targetPath, os.FileMode(file.Mode)); err != nil {
|
||||
if err := s.v.Fs.Chmod(targetPath, os.FileMode(file.Mode)); err != nil {
|
||||
log.Debug("Failed to set directory permissions", "path", targetPath, "error", err)
|
||||
}
|
||||
|
||||
// Set ownership (requires root)
|
||||
if osFs, ok := v.Fs.(*afero.OsFs); ok {
|
||||
_ = osFs
|
||||
if _, ok := s.v.Fs.(*afero.OsFs); ok {
|
||||
if err := os.Chown(targetPath, int(file.UID), int(file.GID)); err != nil {
|
||||
log.Debug("Failed to set directory ownership", "path", targetPath, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Set mtime
|
||||
if err := v.Fs.Chtimes(targetPath, file.MTime, file.MTime); err != nil {
|
||||
if err := s.v.Fs.Chtimes(targetPath, file.MTime, file.MTime); err != nil {
|
||||
log.Debug("Failed to set directory mtime", "path", targetPath, "error", err)
|
||||
}
|
||||
|
||||
result.FilesRestored++
|
||||
s.result.FilesRestored++
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreRegularFile restores a regular file by reconstructing it from chunks
|
||||
func (v *Vaultik) restoreRegularFile(
|
||||
ctx context.Context,
|
||||
repos *database.Repositories,
|
||||
file *database.File,
|
||||
targetPath string,
|
||||
identity age.Identity,
|
||||
chunkToBlobMap map[string]*database.BlobChunk,
|
||||
blobCache *blobDiskCache,
|
||||
sweeper *restoreSweeper,
|
||||
result *RestoreResult,
|
||||
) error {
|
||||
// restoreRegularFile reconstructs a regular file by reading chunks
|
||||
// directly out of cached blobs via ReadAt. The expectation when this
|
||||
// method runs is that every blob this file needs is already in the
|
||||
// disk cache — the planner guarantees that by only marking files
|
||||
// "ready" once their full blob set is on disk.
|
||||
func (s *restoreSession) restoreRegularFile(file *database.File, targetPath string) error {
|
||||
fileStart := time.Now()
|
||||
|
||||
// Get file chunks in order
|
||||
t0 := time.Now()
|
||||
fileChunks, err := repos.FileChunks.GetByFileID(ctx, file.ID)
|
||||
fileChunks, err := s.repos.FileChunks.GetByFileID(s.ctx, file.ID)
|
||||
fileChunksQueryDur := time.Since(t0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting file chunks: %w", err)
|
||||
}
|
||||
|
||||
// Create output file
|
||||
t0 = time.Now()
|
||||
outFile, err := v.Fs.Create(targetPath)
|
||||
outFile, err := s.v.Fs.Create(targetPath)
|
||||
createDur := time.Since(t0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating output file: %w", err)
|
||||
}
|
||||
defer func() { _ = outFile.Close() }()
|
||||
|
||||
// Per-file timing buckets so --debug shows exactly where seconds go.
|
||||
var (
|
||||
blobDBLookupDur time.Duration
|
||||
cacheGetDur time.Duration
|
||||
downloadDur time.Duration
|
||||
cachePutDur time.Duration
|
||||
writeDur time.Duration
|
||||
sweeperDur time.Duration
|
||||
downloadCount int
|
||||
cacheHitCount int
|
||||
bytesWritten int64
|
||||
readAtDur time.Duration
|
||||
writeDur time.Duration
|
||||
sweeperDur time.Duration
|
||||
bytesWritten int64
|
||||
)
|
||||
|
||||
for _, fc := range fileChunks {
|
||||
// Find which blob contains this chunk
|
||||
chunkHashStr := fc.ChunkHash.String()
|
||||
blobChunk, ok := chunkToBlobMap[chunkHashStr]
|
||||
blobChunk, ok := s.chunkToBlobMap[chunkHashStr]
|
||||
if !ok {
|
||||
return fmt.Errorf("chunk %s not found in any blob", chunkHashStr[:16])
|
||||
}
|
||||
|
||||
// Get the blob's hash from the database (runs per chunk).
|
||||
t0 = time.Now()
|
||||
blob, err := repos.Blobs.GetByID(ctx, blobChunk.BlobID.String())
|
||||
blobDBLookupDur += time.Since(t0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting blob %s: %w", blobChunk.BlobID, err)
|
||||
}
|
||||
|
||||
// Download and decrypt blob if not cached
|
||||
blobHashStr := blob.Hash.String()
|
||||
t0 = time.Now()
|
||||
blobData, ok := blobCache.Get(blobHashStr)
|
||||
cacheGetDur += time.Since(t0)
|
||||
blobHash, ok := s.blobIDToHash[blobChunk.BlobID.String()]
|
||||
if !ok {
|
||||
t0 = time.Now()
|
||||
blobData, err = v.downloadBlob(ctx, blobHashStr, blob.CompressedSize, identity)
|
||||
downloadDur += time.Since(t0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("downloading blob %s: %w", blobHashStr[:16], err)
|
||||
}
|
||||
t0 = time.Now()
|
||||
if putErr := blobCache.Put(blobHashStr, blobData); putErr != nil {
|
||||
log.Debug("Failed to cache blob on disk", "hash", blobHashStr[:16], "error", putErr)
|
||||
}
|
||||
cachePutDur += time.Since(t0)
|
||||
downloadCount++
|
||||
result.BlobsDownloaded++
|
||||
result.BytesDownloaded += blob.CompressedSize
|
||||
} else {
|
||||
cacheHitCount++
|
||||
return fmt.Errorf("blob id %s missing from hash index", blobChunk.BlobID)
|
||||
}
|
||||
|
||||
// Extract chunk from blob
|
||||
if blobChunk.Offset+blobChunk.Length > int64(len(blobData)) {
|
||||
return fmt.Errorf("chunk %s extends beyond blob data (offset=%d, length=%d, blob_size=%d)",
|
||||
fc.ChunkHash[:16], blobChunk.Offset, blobChunk.Length, len(blobData))
|
||||
t0 = time.Now()
|
||||
chunkData, err := s.blobCache.ReadAt(blobHash, blobChunk.Offset, blobChunk.Length)
|
||||
readAtDur += time.Since(t0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading chunk %s from cached blob %s: %w", fc.ChunkHash[:16], blobHash[:16], err)
|
||||
}
|
||||
chunkData := blobData[blobChunk.Offset : blobChunk.Offset+blobChunk.Length]
|
||||
|
||||
// Write chunk to output file
|
||||
t0 = time.Now()
|
||||
n, err := outFile.Write(chunkData)
|
||||
writeDur += time.Since(t0)
|
||||
@@ -609,11 +648,8 @@ func (v *Vaultik) restoreRegularFile(
|
||||
}
|
||||
bytesWritten += int64(n)
|
||||
|
||||
// Tell the sweeper about the bytes we just restored so it can
|
||||
// run an eviction sweep once the accumulated total crosses its
|
||||
// threshold (config.BlobSizeLimit/100).
|
||||
t0 = time.Now()
|
||||
sweeper.chunkRestored(int64(n))
|
||||
s.sweeper.chunkRestored(int64(n))
|
||||
sweeperDur += time.Since(t0)
|
||||
}
|
||||
|
||||
@@ -621,89 +657,72 @@ func (v *Vaultik) restoreRegularFile(
|
||||
"path", file.Path,
|
||||
"chunks", len(fileChunks),
|
||||
"bytes_written", bytesWritten,
|
||||
"downloads", downloadCount,
|
||||
"cache_hits", cacheHitCount,
|
||||
"ms_total", time.Since(fileStart).Milliseconds(),
|
||||
"ms_file_chunks_query", fileChunksQueryDur.Milliseconds(),
|
||||
"ms_create", createDur.Milliseconds(),
|
||||
"ms_blob_db_lookups", blobDBLookupDur.Milliseconds(),
|
||||
"ms_cache_gets", cacheGetDur.Milliseconds(),
|
||||
"ms_cache_puts", cachePutDur.Milliseconds(),
|
||||
"ms_downloads", downloadDur.Milliseconds(),
|
||||
"ms_readat", readAtDur.Milliseconds(),
|
||||
"ms_writes", writeDur.Milliseconds(),
|
||||
"ms_sweeper", sweeperDur.Milliseconds(),
|
||||
)
|
||||
|
||||
// Close file before setting metadata
|
||||
if err := outFile.Close(); err != nil {
|
||||
return fmt.Errorf("closing output file: %w", err)
|
||||
}
|
||||
|
||||
// Set permissions
|
||||
if err := v.Fs.Chmod(targetPath, os.FileMode(file.Mode)); err != nil {
|
||||
if err := s.v.Fs.Chmod(targetPath, os.FileMode(file.Mode)); err != nil {
|
||||
log.Debug("Failed to set file permissions", "path", targetPath, "error", err)
|
||||
}
|
||||
|
||||
// Set ownership (requires root)
|
||||
if osFs, ok := v.Fs.(*afero.OsFs); ok {
|
||||
_ = osFs
|
||||
if _, ok := s.v.Fs.(*afero.OsFs); ok {
|
||||
if err := os.Chown(targetPath, int(file.UID), int(file.GID)); err != nil {
|
||||
log.Debug("Failed to set file ownership", "path", targetPath, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Set mtime
|
||||
if err := v.Fs.Chtimes(targetPath, file.MTime, file.MTime); err != nil {
|
||||
if err := s.v.Fs.Chtimes(targetPath, file.MTime, file.MTime); err != nil {
|
||||
log.Debug("Failed to set file mtime", "path", targetPath, "error", err)
|
||||
}
|
||||
|
||||
result.FilesRestored++
|
||||
result.BytesRestored += bytesWritten
|
||||
s.result.FilesRestored++
|
||||
s.result.BytesRestored += bytesWritten
|
||||
|
||||
log.Debug("Restored file", "path", file.Path, "size", humanize.Bytes(uint64(bytesWritten)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// downloadBlob downloads and decrypts a blob, returning the plaintext.
|
||||
// Emits a debug log line splitting time spent in the network fetch (Get
|
||||
// + Stat round-trips) from the streaming decrypt/decompress/read phase
|
||||
// so --debug shows which side of the wire is the bottleneck.
|
||||
func (v *Vaultik) downloadBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) ([]byte, error) {
|
||||
// downloadBlobToCache streams a blob from remote storage straight into
|
||||
// the disk cache, decrypting and decompressing on the fly. The
|
||||
// plaintext never lives fully in memory — io.Copy through
|
||||
// blobDiskCache.PutFromReader uses a 32 KiB buffer regardless of blob
|
||||
// size, which is what makes multi-GB blobs tractable on machines with
|
||||
// less RAM than the blob.
|
||||
func (s *restoreSession) downloadBlobToCache(blobHash string, expectedSize int64) error {
|
||||
start := time.Now()
|
||||
|
||||
t0 := time.Now()
|
||||
rc, err := v.FetchAndDecryptBlob(ctx, blobHash, expectedSize, identity)
|
||||
rc, err := s.v.FetchAndDecryptBlob(s.ctx, blobHash, expectedSize, s.identity)
|
||||
fetchSetupDur := time.Since(t0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
t0 = time.Now()
|
||||
data, err := io.ReadAll(rc)
|
||||
readAllDur := time.Since(t0)
|
||||
if err != nil {
|
||||
_ = rc.Close()
|
||||
return nil, fmt.Errorf("reading blob data: %w", err)
|
||||
written, copyErr := s.blobCache.PutFromReader(blobHash, rc)
|
||||
streamDur := time.Since(t0)
|
||||
closeErr := rc.Close()
|
||||
if copyErr != nil {
|
||||
return copyErr
|
||||
}
|
||||
if closeErr != nil {
|
||||
return closeErr
|
||||
}
|
||||
|
||||
// Close triggers hash verification
|
||||
t0 = time.Now()
|
||||
if err := rc.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
closeDur := time.Since(t0)
|
||||
|
||||
log.Debug("Downloaded and decrypted blob (timings)",
|
||||
log.Debug("Streamed blob into disk cache",
|
||||
"hash", blobHash[:16],
|
||||
"compressed_bytes", expectedSize,
|
||||
"plaintext_bytes", len(data),
|
||||
"plaintext_bytes", written,
|
||||
"ms_total", time.Since(start).Milliseconds(),
|
||||
"ms_fetch_setup", fetchSetupDur.Milliseconds(),
|
||||
"ms_read_decrypt_decompress", readAllDur.Milliseconds(),
|
||||
"ms_close_verify", closeDur.Milliseconds(),
|
||||
"ms_stream_decrypt_decompress", streamDur.Milliseconds(),
|
||||
)
|
||||
|
||||
return data, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyRestoredFiles verifies that all restored files match their expected chunk hashes
|
||||
|
||||
315
internal/vaultik/restore_locality_test.go
Normal file
315
internal/vaultik/restore_locality_test.go
Normal file
@@ -0,0 +1,315 @@
|
||||
package vaultik
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/afero"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"sneak.berlin/go/vaultik/internal/config"
|
||||
"sneak.berlin/go/vaultik/internal/database"
|
||||
"sneak.berlin/go/vaultik/internal/log"
|
||||
"sneak.berlin/go/vaultik/internal/snapshot"
|
||||
"sneak.berlin/go/vaultik/internal/storage"
|
||||
"sneak.berlin/go/vaultik/internal/ui"
|
||||
)
|
||||
|
||||
// TestRestoreLocalityAndReadAt asserts three properties of the restore
|
||||
// hot path that together produce acceptable throughput on real-world
|
||||
// snapshots. All three currently fail on main:
|
||||
//
|
||||
// 1. Peak blob cache occupancy ≤ 1.
|
||||
// Restore order must respect blob locality: every file fully
|
||||
// contained within the currently cached blob should be restored
|
||||
// before any other blob is downloaded. The sweeper then frees
|
||||
// each blob as soon as its file set is exhausted. Without smart
|
||||
// ordering, path-order interleaves blobs and the cache holds
|
||||
// every touched blob until the last file referencing it lands.
|
||||
//
|
||||
// 2. Each remote blob is fetched exactly once.
|
||||
// Counted via wrapping the Storer.
|
||||
//
|
||||
// 3. blobDiskCache.Get is never called during restore.
|
||||
// Chunk extraction from a cached blob must go through ReadAt,
|
||||
// which reads only the chunk's bytes from disk. Get reads the
|
||||
// entire blob (up to 50 GB in production) into memory just to
|
||||
// slice out a few KB — currently the dominant cost in restore.
|
||||
//
|
||||
// The test deliberately constructs an adversarial scenario: three
|
||||
// blobs A/B/C of ~6 MB each, nine files distributed across them, and
|
||||
// path-ordered names that interleave the blobs (a1, b1, c1, a2, b2,
|
||||
// c2, …) so naive path-order processing would touch every blob before
|
||||
// finishing any of them.
|
||||
func TestRestoreLocalityAndReadAt(t *testing.T) {
|
||||
log.Initialize(log.Config{})
|
||||
|
||||
fs := afero.NewOsFs()
|
||||
tempDir, err := os.MkdirTemp("", "vaultik-locality-")
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = os.RemoveAll(tempDir) }()
|
||||
|
||||
dataDir := filepath.Join(tempDir, "source")
|
||||
storeDir := filepath.Join(tempDir, "remote")
|
||||
restoreDir := filepath.Join(tempDir, "restored")
|
||||
dbPath := filepath.Join(tempDir, "index.sqlite")
|
||||
|
||||
require.NoError(t, fs.MkdirAll(dataDir, 0o755))
|
||||
|
||||
// Layout: 15 source files of exactly 1 MiB each. With
|
||||
// chunkSize (avg) = 4 MiB the chunker's minSize is 1 MiB, so any
|
||||
// file of 1 MiB becomes a single chunk. With a 5 MiB blob limit
|
||||
// the packer fits exactly 5 chunks per blob, producing 3 blobs
|
||||
// containing src-001..005, src-006..010, src-011..015.
|
||||
//
|
||||
// Then add 9 "copy" files — byte-for-byte clones of three of the
|
||||
// sources (one from each blob group) — with interleaved names
|
||||
// (cp-001-A, cp-002-B, cp-003-C, cp-004-A, …) so a naive
|
||||
// path-ordered restore would touch all three blobs before
|
||||
// finishing any of them.
|
||||
const (
|
||||
srcBytes = 1024 * 1024
|
||||
srcCount = 15
|
||||
blobsCount = 3
|
||||
perBlob = srcCount / blobsCount
|
||||
)
|
||||
|
||||
type source struct {
|
||||
path string
|
||||
data []byte
|
||||
}
|
||||
sources := make([]*source, srcCount)
|
||||
for i := 0; i < srcCount; i++ {
|
||||
s := &source{
|
||||
path: fmt.Sprintf("src-%03d.bin", i+1),
|
||||
data: randomBytes(t, srcBytes),
|
||||
}
|
||||
sources[i] = s
|
||||
require.NoError(t, afero.WriteFile(fs, filepath.Join(dataDir, s.path), s.data, 0o644))
|
||||
}
|
||||
|
||||
// Pick one representative source per blob group (src-001 → blob
|
||||
// 1, src-006 → blob 2, src-011 → blob 3) and create 3 copies of
|
||||
// each with interleaved alphabetical names.
|
||||
type copyFile struct {
|
||||
path string
|
||||
data []byte
|
||||
sourceBlob int // 0, 1, or 2
|
||||
sourceIndex int // index into sources slice
|
||||
}
|
||||
groupReps := []int{0, perBlob, 2 * perBlob} // 0, 5, 10
|
||||
letters := []byte{'A', 'B', 'C'}
|
||||
var copies []copyFile
|
||||
for i := 0; i < 3; i++ {
|
||||
for j := 0; j < blobsCount; j++ {
|
||||
seq := i*blobsCount + j + 1
|
||||
name := fmt.Sprintf("cp-%03d-%c.bin", seq, letters[j])
|
||||
path := filepath.Join(dataDir, name)
|
||||
src := sources[groupReps[j]]
|
||||
require.NoError(t, afero.WriteFile(fs, path, src.data, 0o644))
|
||||
copies = append(copies, copyFile{path: path, data: src.data, sourceBlob: j, sourceIndex: groupReps[j]})
|
||||
}
|
||||
}
|
||||
|
||||
// chunkSize avg = 4 MiB makes minSize = 1 MiB, so a 1 MiB file
|
||||
// becomes one chunk. maxBlobSize = 5 MiB packs exactly 5 chunks
|
||||
// per blob, yielding 3 blobs from 15 source files.
|
||||
chunkSize := int64(4 * 1024 * 1024)
|
||||
maxBlobSize := int64(5 * 1024 * 1024)
|
||||
|
||||
storer, err := storage.NewFileStorer(storeDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
agePublicKey := "age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"
|
||||
ageSecretKey := "AGE-SECRET-KEY-19CR5YSFW59HM4TLD6GXVEDMZFTVVF7PPHKUT68TXSFPK7APHXA2QS2NJA5"
|
||||
|
||||
cfg := &config.Config{
|
||||
AgeRecipients: []string{agePublicKey},
|
||||
AgeSecretKey: ageSecretKey,
|
||||
CompressionLevel: 3,
|
||||
Hostname: "test-host",
|
||||
BlobSizeLimit: config.Size(maxBlobSize),
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
db, err := database.New(ctx, dbPath)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = db.Close() }()
|
||||
|
||||
repos := database.NewRepositories(db)
|
||||
|
||||
sm := snapshot.NewSnapshotManager(snapshot.SnapshotManagerParams{
|
||||
Repos: repos,
|
||||
Storage: storer,
|
||||
Config: cfg,
|
||||
})
|
||||
sm.SetFilesystem(fs)
|
||||
|
||||
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
||||
FS: fs,
|
||||
Storage: storer,
|
||||
ChunkSize: chunkSize,
|
||||
MaxBlobSize: maxBlobSize,
|
||||
CompressionLevel: cfg.CompressionLevel,
|
||||
AgeRecipients: cfg.AgeRecipients,
|
||||
Repositories: repos,
|
||||
})
|
||||
|
||||
snapshotID, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "locality", "test-version", "test-git")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = scanner.Scan(ctx, dataDir, snapshotID)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, sm.CompleteSnapshot(ctx, snapshotID))
|
||||
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, snapshotID))
|
||||
|
||||
blobsOnDisk := listBlobKeys(t, storeDir)
|
||||
t.Logf("backup produced %d blobs", len(blobsOnDisk))
|
||||
require.GreaterOrEqual(t, len(blobsOnDisk), 3, "expected at least 3 blobs from 3 filler groups")
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
// Wrap the storer so we can count downloads per blob key.
|
||||
counter := newCountingStorer(storer)
|
||||
|
||||
// Capture the restore-side cache for instrumentation inspection.
|
||||
// The observer fires twice (immediately after creation and
|
||||
// immediately before close) so we read PeakLen and call counters
|
||||
// from the same instance the production code used.
|
||||
var cacheRef *blobDiskCache
|
||||
v := &Vaultik{
|
||||
Config: cfg,
|
||||
Storage: counter,
|
||||
Fs: fs,
|
||||
Stdout: io.Discard,
|
||||
Stderr: io.Discard,
|
||||
UI: ui.NewWithColor(io.Discard, false),
|
||||
restoreCacheObserver: func(c *blobDiskCache) {
|
||||
cacheRef = c
|
||||
},
|
||||
}
|
||||
v.SetContext(ctx)
|
||||
|
||||
require.NoError(t, v.Restore(&RestoreOptions{
|
||||
SnapshotID: snapshotID,
|
||||
TargetDir: restoreDir,
|
||||
}))
|
||||
|
||||
require.NotNil(t, cacheRef, "restoreCacheObserver must fire during restore")
|
||||
|
||||
// Verify restored content matches.
|
||||
for _, s := range sources {
|
||||
restored := filepath.Join(restoreDir, dataDir, s.path)
|
||||
got, err := afero.ReadFile(fs, restored)
|
||||
require.NoErrorf(t, err, "source missing after restore: %s", s.path)
|
||||
require.Truef(t, bytes.Equal(got, s.data), "byte mismatch for source %s", s.path)
|
||||
}
|
||||
for _, c := range copies {
|
||||
restored := filepath.Join(restoreDir, c.path)
|
||||
got, err := afero.ReadFile(fs, restored)
|
||||
require.NoErrorf(t, err, "copy missing after restore: %s", c.path)
|
||||
require.Truef(t, bytes.Equal(got, c.data), "byte mismatch for copy %s", c.path)
|
||||
}
|
||||
|
||||
// (1) Each blob fetched exactly once.
|
||||
for key, n := range counter.snapshot() {
|
||||
if !filterBlobKey(key) {
|
||||
continue
|
||||
}
|
||||
assert.Equalf(t, 1, n, "blob %s fetched %d times, want exactly 1", key, n)
|
||||
}
|
||||
|
||||
// (2) Peak cache size ≤ 1. The sweeper plus locality-aware
|
||||
// ordering should free each blob before the next one downloads.
|
||||
assert.LessOrEqualf(t, cacheRef.PeakLen(), 1,
|
||||
"peak cached blobs was %d; expected ≤ 1 with locality-ordered restore", cacheRef.PeakLen())
|
||||
|
||||
// (3) Cache.Get must never be called during restore — chunk
|
||||
// extraction has to go through ReadAt so we never read the whole
|
||||
// blob from disk to grab a few KB slice.
|
||||
assert.Equalf(t, 0, cacheRef.GetCalls(),
|
||||
"blobDiskCache.Get was called %d times during restore; restore must use ReadAt exclusively", cacheRef.GetCalls())
|
||||
|
||||
t.Logf("blob cache stats: peak_len=%d get_calls=%d readat_calls=%d",
|
||||
cacheRef.PeakLen(), cacheRef.GetCalls(), cacheRef.ReadAtCalls())
|
||||
}
|
||||
|
||||
// randomBytes returns n bytes of random data. Used to make sure the
|
||||
// chunker picks non-degenerate FastCDC boundaries.
|
||||
func randomBytes(t *testing.T, n int) []byte {
|
||||
t.Helper()
|
||||
b := make([]byte, n)
|
||||
_, err := rand.Read(b)
|
||||
require.NoError(t, err)
|
||||
return b
|
||||
}
|
||||
|
||||
// listBlobKeys walks the FileStorer blobs/ tree and returns the
|
||||
// relative keys for every blob file present.
|
||||
func listBlobKeys(t *testing.T, storeDir string) []string {
|
||||
t.Helper()
|
||||
var keys []string
|
||||
root := filepath.Join(storeDir, "blobs")
|
||||
err := filepath.Walk(root, func(p string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
rel, _ := filepath.Rel(storeDir, p)
|
||||
keys = append(keys, rel)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
sort.Strings(keys)
|
||||
return keys
|
||||
}
|
||||
|
||||
// filterBlobKey returns true when key looks like a blob storage path
|
||||
// (rather than a snapshot metadata path).
|
||||
func filterBlobKey(key string) bool {
|
||||
return len(key) > 6 && key[:6] == "blobs/"
|
||||
}
|
||||
|
||||
// countingStorerInternal wraps a storage.Storer and records the number
|
||||
// of Get calls per key, so the locality test can assert each blob is
|
||||
// fetched exactly once. Defined here (rather than reusing the one in
|
||||
// the integration_test package) because this test lives in package
|
||||
// vaultik for access to unexported cache internals.
|
||||
type countingStorerInternal struct {
|
||||
storage.Storer
|
||||
mu sync.Mutex
|
||||
counts map[string]int
|
||||
}
|
||||
|
||||
func newCountingStorer(inner storage.Storer) *countingStorerInternal {
|
||||
return &countingStorerInternal{Storer: inner, counts: make(map[string]int)}
|
||||
}
|
||||
|
||||
func (c *countingStorerInternal) Get(ctx context.Context, key string) (io.ReadCloser, error) {
|
||||
c.mu.Lock()
|
||||
c.counts[key]++
|
||||
c.mu.Unlock()
|
||||
return c.Storer.Get(ctx, key)
|
||||
}
|
||||
|
||||
func (c *countingStorerInternal) snapshot() map[string]int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
out := make(map[string]int, len(c.counts))
|
||||
for k, v := range c.counts {
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
185
internal/vaultik/restore_plan.go
Normal file
185
internal/vaultik/restore_plan.go
Normal file
@@ -0,0 +1,185 @@
|
||||
package vaultik
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
|
||||
"sneak.berlin/go/vaultik/internal/database"
|
||||
"sneak.berlin/go/vaultik/internal/types"
|
||||
)
|
||||
|
||||
// restorePlan orders restore-time file processing by blob locality. The
|
||||
// goal is to keep the blob disk cache occupancy as small as possible:
|
||||
// download one blob, drain every file referencing only that blob, let
|
||||
// the sweeper free the blob, then move on. Files that span multiple
|
||||
// blobs are processed when their full blob set is on disk.
|
||||
//
|
||||
// The plan keeps two indexes:
|
||||
//
|
||||
// - fileBlobs: for each pending file, the set of blob hashes it
|
||||
// still needs that are NOT yet in the cache. Files with an empty
|
||||
// set are "ready" — they can be restored from the current cache
|
||||
// with no further downloads.
|
||||
// - blobFiles: for each blob, the set of pending files referencing
|
||||
// it. Used to short-circuit "when this blob lands, which files
|
||||
// become ready" without a global scan.
|
||||
type restorePlan struct {
|
||||
fileBlobs map[types.FileID]map[string]struct{}
|
||||
blobFiles map[string]map[types.FileID]struct{}
|
||||
ready []types.FileID
|
||||
cached map[string]struct{}
|
||||
}
|
||||
|
||||
// newRestorePlan builds the file→blob index for the given files. Files
|
||||
// whose chunks reference no blobs (symlinks, directories) start in the
|
||||
// ready queue immediately.
|
||||
func newRestorePlan(
|
||||
ctx context.Context,
|
||||
repos *database.Repositories,
|
||||
files []*database.File,
|
||||
chunkToBlobMap map[string]*database.BlobChunk,
|
||||
blobIDToHash map[string]string,
|
||||
) (*restorePlan, error) {
|
||||
p := &restorePlan{
|
||||
fileBlobs: make(map[types.FileID]map[string]struct{}, len(files)),
|
||||
blobFiles: make(map[string]map[types.FileID]struct{}),
|
||||
ready: make([]types.FileID, 0, len(files)),
|
||||
cached: make(map[string]struct{}),
|
||||
}
|
||||
for _, f := range files {
|
||||
if f.IsSymlink() || f.Mode&uint32(os.ModeDir) != 0 {
|
||||
// No chunks to fetch — restore can run immediately.
|
||||
p.fileBlobs[f.ID] = nil
|
||||
p.ready = append(p.ready, f.ID)
|
||||
continue
|
||||
}
|
||||
fileChunks, err := repos.FileChunks.GetByFileID(ctx, f.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("planning %s: %w", f.Path, err)
|
||||
}
|
||||
blobs := make(map[string]struct{})
|
||||
for _, fc := range fileChunks {
|
||||
bc, ok := chunkToBlobMap[fc.ChunkHash.String()]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("planning %s: chunk %s missing from blob map",
|
||||
f.Path, fc.ChunkHash.String()[:16])
|
||||
}
|
||||
hash, ok := blobIDToHash[bc.BlobID.String()]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("planning %s: blob id %s missing from id-to-hash map",
|
||||
f.Path, bc.BlobID)
|
||||
}
|
||||
blobs[hash] = struct{}{}
|
||||
}
|
||||
p.fileBlobs[f.ID] = blobs
|
||||
for hash := range blobs {
|
||||
set, ok := p.blobFiles[hash]
|
||||
if !ok {
|
||||
set = make(map[types.FileID]struct{})
|
||||
p.blobFiles[hash] = set
|
||||
}
|
||||
set[f.ID] = struct{}{}
|
||||
}
|
||||
if len(blobs) == 0 {
|
||||
p.ready = append(p.ready, f.ID)
|
||||
}
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// markBlobCached records that the named blob is now resident in the
|
||||
// disk cache and moves any pending file whose remaining-uncached-blobs
|
||||
// set just dropped to empty onto the ready queue.
|
||||
func (p *restorePlan) markBlobCached(blobHash string) {
|
||||
if _, already := p.cached[blobHash]; already {
|
||||
return
|
||||
}
|
||||
p.cached[blobHash] = struct{}{}
|
||||
for fileID := range p.blobFiles[blobHash] {
|
||||
blobs := p.fileBlobs[fileID]
|
||||
delete(blobs, blobHash)
|
||||
if len(blobs) == 0 {
|
||||
p.ready = append(p.ready, fileID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// popReady returns the next ready file, removing it from the queue. If
|
||||
// no file is ready, the second return value is false.
|
||||
func (p *restorePlan) popReady() (types.FileID, bool) {
|
||||
if len(p.ready) == 0 {
|
||||
return types.FileID{}, false
|
||||
}
|
||||
id := p.ready[0]
|
||||
p.ready = p.ready[1:]
|
||||
return id, true
|
||||
}
|
||||
|
||||
// finishFile drops a restored file from both indexes so subsequent
|
||||
// planning calls don't reconsider it.
|
||||
func (p *restorePlan) finishFile(fileID types.FileID) {
|
||||
for hash := range p.fileBlobs[fileID] {
|
||||
if set, ok := p.blobFiles[hash]; ok {
|
||||
delete(set, fileID)
|
||||
if len(set) == 0 {
|
||||
delete(p.blobFiles, hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
delete(p.fileBlobs, fileID)
|
||||
// Also scrub the file from any blobFiles entries where it might
|
||||
// still appear even after its uncached-blob set was emptied.
|
||||
for hash, set := range p.blobFiles {
|
||||
if _, ok := set[fileID]; ok {
|
||||
delete(set, fileID)
|
||||
if len(set) == 0 {
|
||||
delete(p.blobFiles, hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pickNextDownload returns the pending file whose remaining-uncached
|
||||
// blob set is smallest (with ties broken by FileID string compare so
|
||||
// the choice is deterministic across runs). This file's blobs are
|
||||
// downloaded next, after which it — together with any other pending
|
||||
// files whose blob sets become empty — moves to the ready queue.
|
||||
//
|
||||
// The zero FileID return means nothing is pending.
|
||||
func (p *restorePlan) pickNextDownload() types.FileID {
|
||||
var best types.FileID
|
||||
bestCount := math.MaxInt
|
||||
var bestID string
|
||||
for id, blobs := range p.fileBlobs {
|
||||
n := len(blobs)
|
||||
if n == 0 {
|
||||
// Already-ready files should have been popped via
|
||||
// popReady; ignore here just in case.
|
||||
continue
|
||||
}
|
||||
idStr := id.String()
|
||||
if n < bestCount || (n == bestCount && (best.IsZero() || idStr < bestID)) {
|
||||
best = id
|
||||
bestCount = n
|
||||
bestID = idStr
|
||||
}
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
||||
// blobsNeeded returns the uncached blob hashes for fileID in any order.
|
||||
func (p *restorePlan) blobsNeeded(fileID types.FileID) []string {
|
||||
blobs := p.fileBlobs[fileID]
|
||||
out := make([]string, 0, len(blobs))
|
||||
for h := range blobs {
|
||||
out = append(out, h)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// hasPending reports whether any unfinished files remain.
|
||||
func (p *restorePlan) hasPending() bool {
|
||||
return len(p.fileBlobs) > 0
|
||||
}
|
||||
@@ -8,16 +8,13 @@ import (
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"sneak.berlin/go/vaultik/internal/database"
|
||||
"sneak.berlin/go/vaultik/internal/log"
|
||||
"sneak.berlin/go/vaultik/internal/snapshot"
|
||||
"sneak.berlin/go/vaultik/internal/types"
|
||||
)
|
||||
|
||||
// SnapshotCreateOptions contains options for the snapshot create command
|
||||
@@ -92,8 +89,13 @@ func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Terminus must obey the --cron invariant: silent on total
|
||||
// success only. UI.Complete is dropped in cron/quiet mode (that's
|
||||
// the success path), but if any warnings fired during the run we
|
||||
// emit the summary via UI.Warning so cron actually delivers
|
||||
// something for the user to look at.
|
||||
if v.UI.WarningCount() > 0 {
|
||||
v.UI.Complete("Finished (with %d warnings).", v.UI.WarningCount())
|
||||
v.UI.Warning("Finished with %d warning(s) — review the output above.", v.UI.WarningCount())
|
||||
} else {
|
||||
v.UI.Complete("Finished successfully.")
|
||||
}
|
||||
@@ -378,25 +380,43 @@ func (v *Vaultik) getSnapshotBlobSizes(snapshotID string) (compressed int64, unc
|
||||
return compressed, uncompressed
|
||||
}
|
||||
|
||||
// ListSnapshots lists all snapshots
|
||||
// ListSnapshots prints the table of snapshots, plus any reconciliation
|
||||
// warnings/notes between the local index and the backup destination
|
||||
// store.
|
||||
//
|
||||
// The local index database is always the primary source for the
|
||||
// table — it has the human snapshot IDs, timestamps, and per-snapshot
|
||||
// stats.
|
||||
//
|
||||
// If an age secret key is configured AND remote listing succeeds, we
|
||||
// cross-reference: any local snapshot whose hashed key isn't visible
|
||||
// remotely gets a "local-only" cleanup hint, and any remote key that
|
||||
// doesn't correspond to a known local snapshot gets reported in a
|
||||
// NOTE.
|
||||
//
|
||||
// If no age key is set the local machine is assumed write-only
|
||||
// (backup-only), so we skip remote listing entirely — there's no
|
||||
// value showing keys the user couldn't restore anyway.
|
||||
//
|
||||
// If remote listing fails (unmounted volume, permission denied,
|
||||
// network), we degrade to local-only with a warning. List never
|
||||
// fails just because the destination is unreachable.
|
||||
func (v *Vaultik) ListSnapshots(jsonOutput bool) error {
|
||||
log.Info("Listing snapshots")
|
||||
remoteSnapshots, err := v.listRemoteSnapshotIDs()
|
||||
|
||||
localSnaps, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("listing local snapshots: %w", err)
|
||||
}
|
||||
|
||||
localSnapshotMap, err := v.reconcileLocalWithRemote(remoteSnapshots)
|
||||
if err != nil {
|
||||
return err
|
||||
snapshots := make([]SnapshotInfo, 0, len(localSnaps))
|
||||
for _, ls := range localSnaps {
|
||||
if ls.CompletedAt == nil {
|
||||
continue
|
||||
}
|
||||
snapshots = append(snapshots, v.snapshotInfoFromLocal(ls))
|
||||
}
|
||||
|
||||
snapshots, err := v.buildSnapshotInfoList(remoteSnapshots, localSnapshotMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Sort by timestamp (newest first)
|
||||
sort.Slice(snapshots, func(i, j int) bool {
|
||||
return snapshots[i].Timestamp.After(snapshots[j].Timestamp)
|
||||
})
|
||||
@@ -411,173 +431,85 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Warn about local snapshots that don't exist in remote storage.
|
||||
var stale []string
|
||||
for id := range localSnapshotMap {
|
||||
if !remoteSnapshots[id] {
|
||||
stale = append(stale, id)
|
||||
if v.Config.AgeSecretKey == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
remoteKeys, err := v.listAllRemoteSnapshotKeys()
|
||||
if err != nil {
|
||||
v.UI.Warning("Could not list backup destination store: %v.", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
localKeys := make(map[string]string, len(localSnaps))
|
||||
for _, ls := range localSnaps {
|
||||
if ls.CompletedAt == nil {
|
||||
continue
|
||||
}
|
||||
localKeys[snapshot.RemoteSnapshotKey(ls.ID.String())] = ls.ID.String()
|
||||
}
|
||||
remoteSet := make(map[string]bool, len(remoteKeys))
|
||||
for _, k := range remoteKeys {
|
||||
remoteSet[k] = true
|
||||
}
|
||||
|
||||
var localOnly []string
|
||||
for key, humanID := range localKeys {
|
||||
if !remoteSet[key] {
|
||||
localOnly = append(localOnly, humanID)
|
||||
}
|
||||
}
|
||||
if len(stale) > 0 {
|
||||
v.UI.Warning("%d local snapshot record(s) not found in backup destination store:", len(stale))
|
||||
for _, id := range stale {
|
||||
var remoteOnlyCount int
|
||||
for key := range remoteSet {
|
||||
if _, ok := localKeys[key]; !ok {
|
||||
remoteOnlyCount++
|
||||
}
|
||||
}
|
||||
|
||||
if len(localOnly) > 0 {
|
||||
v.UI.Warning("%d local snapshot record(s) not found in backup destination store:", len(localOnly))
|
||||
for _, id := range localOnly {
|
||||
v.UI.Info("%s", v.UI.Snapshot(id))
|
||||
}
|
||||
v.UI.Info("Run 'vaultik snapshot cleanup' to remove stale local records.")
|
||||
}
|
||||
if remoteOnlyCount > 0 {
|
||||
v.UI.Notice("NOTE: %d remote snapshot(s) found in backup destination store but not in local database.", remoteOnlyCount)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// listRemoteSnapshotIDs returns a set of snapshot IDs found in remote storage
|
||||
func (v *Vaultik) listRemoteSnapshotIDs() (map[string]bool, error) {
|
||||
remoteSnapshots := make(map[string]bool)
|
||||
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
|
||||
// snapshotInfoFromLocal builds a SnapshotInfo row from a local snapshot
|
||||
// record. Failures from any per-snapshot stat query degrade that
|
||||
// column to its snapshot-row fallback but never fail the listing.
|
||||
func (v *Vaultik) snapshotInfoFromLocal(ls *database.Snapshot) SnapshotInfo {
|
||||
idStr := ls.ID.String()
|
||||
|
||||
for object := range objectCh {
|
||||
if object.Err != nil {
|
||||
return nil, fmt.Errorf("listing remote snapshots: %w", object.Err)
|
||||
}
|
||||
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
|
||||
if strings.HasPrefix(parts[1], ".") {
|
||||
continue
|
||||
}
|
||||
remoteSnapshots[parts[1]] = true
|
||||
}
|
||||
}
|
||||
|
||||
return remoteSnapshots, nil
|
||||
}
|
||||
|
||||
// reconcileLocalWithRemote builds a map of local snapshots keyed by ID for cross-referencing with remote
|
||||
func (v *Vaultik) reconcileLocalWithRemote(remoteSnapshots map[string]bool) (map[string]*database.Snapshot, error) {
|
||||
localSnapshots, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000)
|
||||
totalSize, err := v.Repositories.Snapshots.GetSnapshotTotalCompressedSize(v.ctx, idStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing local snapshots: %w", err)
|
||||
log.Warn("Failed to get total compressed size", "id", idStr, "error", err)
|
||||
totalSize = ls.BlobSize
|
||||
}
|
||||
|
||||
localSnapshotMap := make(map[string]*database.Snapshot)
|
||||
for _, s := range localSnapshots {
|
||||
localSnapshotMap[s.ID.String()] = s
|
||||
uncompressedSize, err := v.Repositories.Snapshots.GetSnapshotUncompressedChunkSize(v.ctx, idStr)
|
||||
if err != nil {
|
||||
log.Warn("Failed to get uncompressed chunk size", "id", idStr, "error", err)
|
||||
}
|
||||
|
||||
return localSnapshotMap, nil
|
||||
}
|
||||
|
||||
// buildSnapshotInfoList constructs SnapshotInfo entries from remote IDs and local data
|
||||
func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSnapshotMap map[string]*database.Snapshot) ([]SnapshotInfo, error) {
|
||||
snapshots := make([]SnapshotInfo, 0, len(remoteSnapshots))
|
||||
|
||||
// remoteOnly collects snapshot IDs that need a manifest download.
|
||||
var remoteOnly []string
|
||||
|
||||
for snapshotID := range remoteSnapshots {
|
||||
if localSnap, exists := localSnapshotMap[snapshotID]; exists && localSnap.CompletedAt != nil {
|
||||
totalSize, err := v.Repositories.Snapshots.GetSnapshotTotalCompressedSize(v.ctx, snapshotID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to get total compressed size", "id", snapshotID, "error", err)
|
||||
totalSize = localSnap.BlobSize
|
||||
}
|
||||
|
||||
uncompressedSize, err := v.Repositories.Snapshots.GetSnapshotUncompressedChunkSize(v.ctx, snapshotID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to get uncompressed chunk size", "id", snapshotID, "error", err)
|
||||
}
|
||||
|
||||
newChunkSize, err := v.Repositories.Snapshots.GetSnapshotNewChunkSize(v.ctx, snapshotID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to get new chunk size", "id", snapshotID, "error", err)
|
||||
}
|
||||
|
||||
snapshots = append(snapshots, SnapshotInfo{
|
||||
ID: localSnap.ID,
|
||||
Timestamp: localSnap.StartedAt,
|
||||
CompressedSize: totalSize,
|
||||
UncompressedSize: uncompressedSize,
|
||||
NewChunkSize: newChunkSize,
|
||||
LocallyTracked: true,
|
||||
})
|
||||
} else {
|
||||
timestamp, err := parseSnapshotTimestamp(snapshotID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to parse snapshot timestamp", "id", snapshotID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Pre-add with zero size; will be filled by concurrent downloads.
|
||||
snapshots = append(snapshots, SnapshotInfo{
|
||||
ID: types.SnapshotID(snapshotID),
|
||||
Timestamp: timestamp,
|
||||
CompressedSize: 0,
|
||||
LocallyTracked: false,
|
||||
})
|
||||
remoteOnly = append(remoteOnly, snapshotID)
|
||||
}
|
||||
newChunkSize, err := v.Repositories.Snapshots.GetSnapshotNewChunkSize(v.ctx, idStr)
|
||||
if err != nil {
|
||||
log.Warn("Failed to get new chunk size", "id", idStr, "error", err)
|
||||
}
|
||||
|
||||
// Download manifests concurrently for remote-only snapshots.
|
||||
if len(remoteOnly) > 0 {
|
||||
// maxConcurrentManifestDownloads bounds parallel manifest fetches to
|
||||
// avoid overwhelming the S3 endpoint while still being much faster
|
||||
// than serial downloads.
|
||||
const maxConcurrentManifestDownloads = 10
|
||||
|
||||
type manifestResult struct {
|
||||
snapshotID string
|
||||
size int64
|
||||
}
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
results []manifestResult
|
||||
)
|
||||
|
||||
g, gctx := errgroup.WithContext(v.ctx)
|
||||
g.SetLimit(maxConcurrentManifestDownloads)
|
||||
|
||||
for _, sid := range remoteOnly {
|
||||
g.Go(func() error {
|
||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", sid)
|
||||
reader, err := v.Storage.Get(gctx, manifestPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("downloading manifest for %s: %w", sid, err)
|
||||
}
|
||||
defer func() { _ = reader.Close() }()
|
||||
|
||||
manifest, err := snapshot.DecodeManifest(reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decoding manifest for %s: %w", sid, err)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
results = append(results, manifestResult{
|
||||
snapshotID: sid,
|
||||
size: manifest.TotalCompressedSize,
|
||||
})
|
||||
mu.Unlock()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, fmt.Errorf("fetching manifest sizes: %w", err)
|
||||
}
|
||||
|
||||
// Build a lookup from results and patch the pre-added entries.
|
||||
sizeMap := make(map[string]int64, len(results))
|
||||
for _, r := range results {
|
||||
sizeMap[r.snapshotID] = r.size
|
||||
}
|
||||
for i := range snapshots {
|
||||
if sz, ok := sizeMap[string(snapshots[i].ID)]; ok {
|
||||
snapshots[i].CompressedSize = sz
|
||||
}
|
||||
}
|
||||
return SnapshotInfo{
|
||||
ID: ls.ID,
|
||||
Timestamp: ls.StartedAt,
|
||||
CompressedSize: totalSize,
|
||||
UncompressedSize: uncompressedSize,
|
||||
NewChunkSize: newChunkSize,
|
||||
LocallyTracked: true,
|
||||
}
|
||||
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
// printSnapshotTable renders the snapshot list as a formatted table
|
||||
@@ -763,14 +695,23 @@ func (v *Vaultik) confirmAndExecutePurge(toDelete []SnapshotInfo, force, quiet b
|
||||
if err := v.deleteSnapshotFromLocalDB(snapshotID); err != nil {
|
||||
log.Error("Failed to delete from local database", "snapshot_id", snapshotID, "error", err)
|
||||
}
|
||||
if err := v.deleteSnapshotFromRemote(snapshotID); err != nil {
|
||||
if err := v.deleteRemoteSnapshotByKey(snapshot.RemoteSnapshotKey(snapshotID)); err != nil {
|
||||
return fmt.Errorf("deleting snapshot %s from remote: %w", snapshotID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Tidy up local DB orphans now so users don't have to run a
|
||||
// separate command after a purge. Guarded against nil for tests
|
||||
// that don't wire up a SnapshotManager.
|
||||
if v.SnapshotManager != nil {
|
||||
if err := v.SnapshotManager.CleanupOrphanedData(v.ctx); err != nil {
|
||||
log.Warn("Failed to clean up orphaned local data after purge", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !quiet {
|
||||
v.printfStdout("Deleted %d snapshot(s)\n", len(toDelete))
|
||||
v.printlnStdout("\nNote: Run 'vaultik prune' to clean up unreferenced blobs.")
|
||||
v.printlnStdout("\nNote: Run 'vaultik prune' to clean up unreferenced remote blobs.")
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -799,8 +740,9 @@ func (v *Vaultik) VerifySnapshotWithOptions(snapshotID string, opts *VerifyOptio
|
||||
|
||||
v.printVerifyHeader(snapshotID, opts)
|
||||
|
||||
// Download and parse manifest
|
||||
manifest, err := v.downloadManifest(snapshotID)
|
||||
// Download and parse manifest. The caller supplies a human
|
||||
// snapshot ID; we hash it to address remote storage.
|
||||
manifest, err := v.downloadManifestByKey(snapshot.RemoteSnapshotKey(snapshotID))
|
||||
if err != nil {
|
||||
if opts.JSON {
|
||||
result.Status = "failed"
|
||||
@@ -915,12 +857,18 @@ func (v *Vaultik) outputVerifyJSON(result *VerifyResult) error {
|
||||
|
||||
// CleanupLocalSnapshots removes local snapshot records that have no
|
||||
// corresponding metadata in remote storage. These are typically left
|
||||
// behind by incomplete or interrupted backups.
|
||||
// behind by incomplete or interrupted backups. Each local snapshot's
|
||||
// human ID is hashed via RemoteSnapshotKey and compared against the
|
||||
// remote listing.
|
||||
func (v *Vaultik) CleanupLocalSnapshots() error {
|
||||
remoteSnapshots, err := v.listRemoteSnapshotIDs()
|
||||
remoteKeys, err := v.listAllRemoteSnapshotKeys()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
remoteSet := make(map[string]bool, len(remoteKeys))
|
||||
for _, k := range remoteKeys {
|
||||
remoteSet[k] = true
|
||||
}
|
||||
|
||||
localSnapshots, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000)
|
||||
if err != nil {
|
||||
@@ -930,7 +878,7 @@ func (v *Vaultik) CleanupLocalSnapshots() error {
|
||||
var removed int
|
||||
for _, snap := range localSnapshots {
|
||||
id := snap.ID.String()
|
||||
if !remoteSnapshots[id] {
|
||||
if !remoteSet[snapshot.RemoteSnapshotKey(id)] {
|
||||
v.printfStdout("Removing stale local record: %s\n", id)
|
||||
if err := v.deleteSnapshotFromLocalDB(id); err != nil {
|
||||
log.Error("Failed to delete local snapshot", "snapshot_id", id, "error", err)
|
||||
@@ -950,8 +898,12 @@ func (v *Vaultik) CleanupLocalSnapshots() error {
|
||||
|
||||
// Helper methods that were previously on SnapshotApp
|
||||
|
||||
func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error) {
|
||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
// downloadManifestByKey fetches the manifest at
|
||||
// metadata/<remoteKey>/manifest.json.zst. The remoteKey is the double-
|
||||
// SHA256 derivation produced by snapshot.RemoteSnapshotKey, not the
|
||||
// human snapshot ID. Callers that have a human ID must hash first.
|
||||
func (v *Vaultik) downloadManifestByKey(remoteKey string) (*snapshot.Manifest, error) {
|
||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", remoteKey)
|
||||
|
||||
reader, err := v.Storage.Get(v.ctx, manifestPath)
|
||||
if err != nil {
|
||||
@@ -1086,12 +1038,22 @@ func (v *Vaultik) RemoveSnapshot(snapshotID string, opts *RemoveOptions) (*Remov
|
||||
// If --remote, also remove from remote storage
|
||||
if opts.Remote {
|
||||
log.Info("Removing snapshot metadata from remote storage", "snapshot_id", snapshotID)
|
||||
if err := v.deleteSnapshotFromRemote(snapshotID); err != nil {
|
||||
if err := v.deleteRemoteSnapshotByKey(snapshot.RemoteSnapshotKey(snapshotID)); err != nil {
|
||||
return result, fmt.Errorf("removing from remote storage: %w", err)
|
||||
}
|
||||
result.RemoteRemoved = true
|
||||
}
|
||||
|
||||
// Clean up the local rows that just became orphaned (files, chunks,
|
||||
// blob_chunks, blobs no longer referenced by any snapshot). This
|
||||
// used to be a separate `vaultik snapshot prune` step; running it
|
||||
// inline means `snapshot remove` leaves no ghost rows behind.
|
||||
if v.SnapshotManager != nil {
|
||||
if err := v.SnapshotManager.CleanupOrphanedData(v.ctx); err != nil {
|
||||
log.Warn("Failed to clean up orphaned local data after removal", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Output result
|
||||
if opts.JSON {
|
||||
return result, v.outputRemoveJSON(result)
|
||||
@@ -1101,20 +1063,48 @@ func (v *Vaultik) RemoveSnapshot(snapshotID string, opts *RemoveOptions) (*Remov
|
||||
v.printfStdout("Removed snapshot '%s' from local database\n", snapshotID)
|
||||
if opts.Remote {
|
||||
v.printlnStdout("Removed snapshot metadata from remote storage")
|
||||
v.printlnStdout("\nNote: Blobs were not removed. Run 'vaultik prune' to remove orphaned blobs.")
|
||||
v.printlnStdout("\nNote: Remote blobs were not removed. Run 'vaultik prune' to remove orphaned blobs.")
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// RemoveAllSnapshots removes all snapshots from local database and optionally from remote
|
||||
// RemoveAllSnapshots removes every snapshot known to the local
|
||||
// database from the local index, and (with --remote) every snapshot
|
||||
// metadata directory in remote storage. Both sides are processed so a
|
||||
// "remove --all" leaves nothing behind, even when the local DB and
|
||||
// remote storage have diverged.
|
||||
func (v *Vaultik) RemoveAllSnapshots(opts *RemoveOptions) (*RemoveResult, error) {
|
||||
snapshotIDs, err := v.listAllRemoteSnapshotIDs()
|
||||
localSnaps, err := v.localSnapshotIDs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("listing local snapshots: %w", err)
|
||||
}
|
||||
|
||||
if len(snapshotIDs) == 0 {
|
||||
// remoteKeys is the set of metadata/<key>/ subdirectories on the
|
||||
// destination store; failures are downgraded to a warning so a
|
||||
// permission-denied or unreachable remote can't block a local-only
|
||||
// remove.
|
||||
remoteKeys, remoteErr := v.listAllRemoteSnapshotKeys()
|
||||
if remoteErr != nil {
|
||||
log.Warn("Could not list remote snapshots", "error", remoteErr)
|
||||
v.UI.Warning("Could not list remote snapshots: %v.", remoteErr)
|
||||
}
|
||||
|
||||
// Anything visible on the remote that doesn't correspond to a
|
||||
// known local human ID is treated as an orphan key — handled only
|
||||
// when --remote is in effect.
|
||||
knownLocalKeys := make(map[string]string, len(localSnaps))
|
||||
for _, id := range localSnaps {
|
||||
knownLocalKeys[snapshot.RemoteSnapshotKey(id)] = id
|
||||
}
|
||||
var orphanRemoteKeys []string
|
||||
for _, key := range remoteKeys {
|
||||
if _, known := knownLocalKeys[key]; !known {
|
||||
orphanRemoteKeys = append(orphanRemoteKeys, key)
|
||||
}
|
||||
}
|
||||
|
||||
if len(localSnaps) == 0 && len(orphanRemoteKeys) == 0 {
|
||||
if !opts.JSON {
|
||||
v.printlnStdout("No snapshots found")
|
||||
}
|
||||
@@ -1122,19 +1112,42 @@ func (v *Vaultik) RemoveAllSnapshots(opts *RemoveOptions) (*RemoveResult, error)
|
||||
}
|
||||
|
||||
if opts.DryRun {
|
||||
return v.handleRemoveAllDryRun(snapshotIDs, opts)
|
||||
return v.handleRemoveAllDryRun(localSnaps, orphanRemoteKeys, opts)
|
||||
}
|
||||
|
||||
return v.executeRemoveAll(snapshotIDs, opts)
|
||||
return v.executeRemoveAll(localSnaps, orphanRemoteKeys, opts)
|
||||
}
|
||||
|
||||
// listAllRemoteSnapshotIDs collects all unique snapshot IDs from remote storage
|
||||
func (v *Vaultik) listAllRemoteSnapshotIDs() ([]string, error) {
|
||||
log.Info("Listing all snapshots")
|
||||
// localSnapshotIDs returns every snapshot ID present in the local
|
||||
// index database, sorted for deterministic iteration. Empty slice if
|
||||
// the database has no Repositories (e.g. tests).
|
||||
func (v *Vaultik) localSnapshotIDs() ([]string, error) {
|
||||
if v.Repositories == nil {
|
||||
return nil, nil
|
||||
}
|
||||
snaps, err := v.Repositories.Snapshots.ListRecent(v.ctx, 100000)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ids := make([]string, 0, len(snaps))
|
||||
for _, s := range snaps {
|
||||
ids = append(ids, s.ID.String())
|
||||
}
|
||||
sort.Strings(ids)
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
// listAllRemoteSnapshotKeys collects the hashed remote keys
|
||||
// (subdirectories under metadata/) currently present in the
|
||||
// destination store. Returns (nil, err) when the store cannot be
|
||||
// listed; callers must treat that as "no remote info available," not
|
||||
// fatal.
|
||||
func (v *Vaultik) listAllRemoteSnapshotKeys() ([]string, error) {
|
||||
log.Info("Listing all remote snapshots")
|
||||
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
|
||||
|
||||
seen := make(map[string]bool)
|
||||
var snapshotIDs []string
|
||||
var keys []string
|
||||
for object := range objectCh {
|
||||
if object.Err != nil {
|
||||
return nil, fmt.Errorf("listing remote snapshots: %w", object.Err)
|
||||
@@ -1147,30 +1160,36 @@ func (v *Vaultik) listAllRemoteSnapshotIDs() ([]string, error) {
|
||||
continue
|
||||
}
|
||||
if strings.HasSuffix(object.Key, "/") || strings.Contains(object.Key, "/manifest.json.zst") {
|
||||
sid := parts[1]
|
||||
if !seen[sid] {
|
||||
seen[sid] = true
|
||||
snapshotIDs = append(snapshotIDs, sid)
|
||||
key := parts[1]
|
||||
if !seen[key] {
|
||||
seen[key] = true
|
||||
keys = append(keys, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return snapshotIDs, nil
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// handleRemoveAllDryRun handles the dry-run mode for removing all snapshots
|
||||
func (v *Vaultik) handleRemoveAllDryRun(snapshotIDs []string, opts *RemoveOptions) (*RemoveResult, error) {
|
||||
result := &RemoveResult{
|
||||
DryRun: true,
|
||||
SnapshotsRemoved: snapshotIDs,
|
||||
func (v *Vaultik) handleRemoveAllDryRun(localSnaps, orphanRemoteKeys []string, opts *RemoveOptions) (*RemoveResult, error) {
|
||||
result := &RemoveResult{DryRun: true}
|
||||
result.SnapshotsRemoved = append(result.SnapshotsRemoved, localSnaps...)
|
||||
if opts.Remote {
|
||||
result.SnapshotsRemoved = append(result.SnapshotsRemoved, orphanRemoteKeys...)
|
||||
}
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Would remove %d snapshot(s):\n", len(snapshotIDs))
|
||||
for _, id := range snapshotIDs {
|
||||
v.printfStdout("Would remove %d local snapshot(s):\n", len(localSnaps))
|
||||
for _, id := range localSnaps {
|
||||
v.printfStdout(" %s\n", id)
|
||||
}
|
||||
if opts.Remote {
|
||||
if opts.Remote && len(orphanRemoteKeys) > 0 {
|
||||
v.printfStdout("Would also remove %d orphan remote snapshot key(s):\n", len(orphanRemoteKeys))
|
||||
for _, key := range orphanRemoteKeys {
|
||||
v.printfStdout(" %s\n", key)
|
||||
}
|
||||
} else if opts.Remote {
|
||||
v.printlnStdout("Would also remove from remote storage")
|
||||
}
|
||||
v.printlnStdout("[Dry run - no changes made]")
|
||||
@@ -1181,17 +1200,19 @@ func (v *Vaultik) handleRemoveAllDryRun(snapshotIDs []string, opts *RemoveOption
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// executeRemoveAll removes all snapshots from local database and optionally from remote storage
|
||||
func (v *Vaultik) executeRemoveAll(snapshotIDs []string, opts *RemoveOptions) (*RemoveResult, error) {
|
||||
// executeRemoveAll deletes every local snapshot (and, with --remote,
|
||||
// every corresponding remote metadata directory plus any orphan remote
|
||||
// keys that don't match a local snapshot).
|
||||
func (v *Vaultik) executeRemoveAll(localSnaps, orphanRemoteKeys []string, opts *RemoveOptions) (*RemoveResult, error) {
|
||||
// --all requires --force
|
||||
if !opts.Force {
|
||||
return nil, fmt.Errorf("--all requires --force")
|
||||
}
|
||||
|
||||
log.Info("Removing all snapshots", "count", len(snapshotIDs))
|
||||
log.Info("Removing all snapshots", "local_count", len(localSnaps), "orphan_remote_count", len(orphanRemoteKeys))
|
||||
|
||||
result := &RemoveResult{}
|
||||
for _, snapshotID := range snapshotIDs {
|
||||
for _, snapshotID := range localSnaps {
|
||||
log.Info("Removing snapshot", "snapshot_id", snapshotID)
|
||||
|
||||
if err := v.deleteSnapshotFromLocalDB(snapshotID); err != nil {
|
||||
@@ -1200,7 +1221,7 @@ func (v *Vaultik) executeRemoveAll(snapshotIDs []string, opts *RemoveOptions) (*
|
||||
}
|
||||
|
||||
if opts.Remote {
|
||||
if err := v.deleteSnapshotFromRemote(snapshotID); err != nil {
|
||||
if err := v.deleteRemoteSnapshotByKey(snapshot.RemoteSnapshotKey(snapshotID)); err != nil {
|
||||
log.Error("Failed to remove from remote", "snapshot_id", snapshotID, "error", err)
|
||||
continue
|
||||
}
|
||||
@@ -1209,10 +1230,29 @@ func (v *Vaultik) executeRemoveAll(snapshotIDs []string, opts *RemoveOptions) (*
|
||||
result.SnapshotsRemoved = append(result.SnapshotsRemoved, snapshotID)
|
||||
}
|
||||
|
||||
if opts.Remote {
|
||||
for _, key := range orphanRemoteKeys {
|
||||
log.Info("Removing orphan remote snapshot", "remote_key", key)
|
||||
if err := v.deleteRemoteSnapshotByKey(key); err != nil {
|
||||
log.Error("Failed to remove orphan from remote", "remote_key", key, "error", err)
|
||||
continue
|
||||
}
|
||||
result.SnapshotsRemoved = append(result.SnapshotsRemoved, key)
|
||||
}
|
||||
}
|
||||
|
||||
if opts.Remote {
|
||||
result.RemoteRemoved = true
|
||||
}
|
||||
|
||||
// Clean up everything that just became orphaned locally so the
|
||||
// index database doesn't carry 39k ghost rows after a wipe.
|
||||
if v.SnapshotManager != nil {
|
||||
if err := v.SnapshotManager.CleanupOrphanedData(v.ctx); err != nil {
|
||||
log.Warn("Failed to clean up orphaned local data after bulk removal", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if opts.JSON {
|
||||
return result, v.outputRemoveJSON(result)
|
||||
}
|
||||
@@ -1220,7 +1260,7 @@ func (v *Vaultik) executeRemoveAll(snapshotIDs []string, opts *RemoveOptions) (*
|
||||
v.printfStdout("Removed %d snapshot(s)\n", len(result.SnapshotsRemoved))
|
||||
if opts.Remote {
|
||||
v.printlnStdout("Removed snapshot metadata from remote storage")
|
||||
v.printlnStdout("\nNote: Blobs were not removed. Run 'vaultik prune' to remove orphaned blobs.")
|
||||
v.printlnStdout("\nNote: Remote blobs were not removed. Run 'vaultik prune' to remove orphaned blobs.")
|
||||
}
|
||||
|
||||
return result, nil
|
||||
@@ -1249,9 +1289,13 @@ func (v *Vaultik) deleteSnapshotFromLocalDB(snapshotID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteSnapshotFromRemote removes snapshot metadata files from remote storage
|
||||
func (v *Vaultik) deleteSnapshotFromRemote(snapshotID string) error {
|
||||
prefix := fmt.Sprintf("metadata/%s/", snapshotID)
|
||||
// deleteRemoteSnapshotByKey removes everything under
|
||||
// metadata/<remoteKey>/ on the destination store. The argument is a
|
||||
// remote key (double-SHA256 derivation), not a human snapshot ID;
|
||||
// callers that have a human ID must hash via snapshot.RemoteSnapshotKey
|
||||
// first.
|
||||
func (v *Vaultik) deleteRemoteSnapshotByKey(remoteKey string) error {
|
||||
prefix := fmt.Sprintf("metadata/%s/", remoteKey)
|
||||
objectCh := v.Storage.ListStream(v.ctx, prefix)
|
||||
|
||||
var objectsToDelete []string
|
||||
|
||||
@@ -44,6 +44,13 @@ type Vaultik struct {
|
||||
// writer wrapping Stdout; the cli layer replaces it with a discarding
|
||||
// writer in --cron mode.
|
||||
UI *ui.Writer
|
||||
|
||||
// restoreCacheObserver, if non-nil, is invoked once with the
|
||||
// restore-side blob disk cache immediately after the cache is
|
||||
// created and again immediately before it is closed. Only
|
||||
// internal-package tests set this; the type is unexported so
|
||||
// callers outside this package can't reach it.
|
||||
restoreCacheObserver func(*blobDiskCache)
|
||||
}
|
||||
|
||||
// VaultikParams contains all parameters for New that can be provided by fx
|
||||
|
||||
@@ -106,8 +106,11 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
|
||||
// loadVerificationData downloads manifest, database, and blob list for verification
|
||||
func (v *Vaultik) loadVerificationData(snapshotID string, opts *VerifyOptions, result *VerifyResult) (*snapshot.Manifest, *tempDB, []snapshot.BlobInfo, error) {
|
||||
// All remote paths use the hashed key derived from the human ID.
|
||||
remoteKey := snapshot.RemoteSnapshotKey(snapshotID)
|
||||
|
||||
// Download manifest
|
||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", remoteKey)
|
||||
log.Info("Downloading manifest", "path", manifestPath)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Downloading manifest...\n")
|
||||
@@ -136,7 +139,7 @@ func (v *Vaultik) loadVerificationData(snapshotID string, opts *VerifyOptions, r
|
||||
}
|
||||
|
||||
// Download and decrypt database
|
||||
dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
||||
dbPath := fmt.Sprintf("metadata/%s/db.zst.age", remoteKey)
|
||||
log.Info("Downloading encrypted database", "path", dbPath)
|
||||
dbReader, err := v.Storage.Get(v.ctx, dbPath)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user