Compare commits
4 Commits
301ea217e8
...
64c69cd8e3
| Author | SHA1 | Date | |
|---|---|---|---|
| 64c69cd8e3 | |||
| 132f7149ca | |||
| f1ce085972 | |||
| d8edf90fac |
@@ -95,7 +95,7 @@ vaultik [--config <path>] config init
|
||||
vaultik [--config <path>] config edit
|
||||
vaultik [--config <path>] config get <key>
|
||||
vaultik [--config <path>] config set <key> <value>
|
||||
vaultik [--config <path>] snapshot create [snapshot-names...] [--cron] [--prune] [--keep-newer-than <duration>] [--skip-errors]
|
||||
vaultik [--config <path>] snapshot create [snapshot-names...] [--cron] [--prune] [--keep-newer-than <duration>]
|
||||
vaultik [--config <path>] snapshot list [--json]
|
||||
vaultik [--config <path>] snapshot verify <snapshot-id> [--deep] [--json]
|
||||
vaultik [--config <path>] snapshot purge [--keep-latest | --older-than <duration>] [--snapshot <name>...] [--force]
|
||||
@@ -117,7 +117,8 @@ vaultik version
|
||||
* `--config <path>`: Path to config file (default: `$VAULTIK_CONFIG`, then platform config dir, then `/etc/vaultik/config.yml`)
|
||||
* `--verbose`, `-v`: Enable verbose output
|
||||
* `--debug`: Enable debug output
|
||||
* `--quiet`, `-q`: Suppress non-error output
|
||||
* `--quiet`, `-q`: Suppress non-error output (also suppresses startup banner)
|
||||
* `--skip-errors`: Continue past per-file errors instead of aborting (applies to `snapshot create` and `restore`)
|
||||
|
||||
### environment variables
|
||||
|
||||
@@ -173,7 +174,6 @@ in the file are preserved; intermediate maps are created as needed.
|
||||
snapshot per name; use `--keep-newer-than` for a rolling window.
|
||||
* `--keep-newer-than <duration>`: With `--prune`, keep snapshots newer than
|
||||
this duration instead of only the latest (e.g. `4w`, `30d`, `6mo`, `1y`)
|
||||
* `--skip-errors`: Skip file read errors (log them loudly but continue)
|
||||
|
||||
**`snapshot list`**: List all snapshots with their timestamps and sizes.
|
||||
* `--json`: Output in JSON format
|
||||
|
||||
@@ -127,6 +127,7 @@ func buildRestoreInvokes(snapshotID string, opts *RestoreOptions) []fx.Option {
|
||||
TargetDir: opts.TargetDir,
|
||||
Paths: opts.Paths,
|
||||
Verify: opts.Verify,
|
||||
SkipErrors: GetRootFlags().SkipErrors,
|
||||
}
|
||||
if err := app.Vaultik.Restore(restoreOpts); err != nil {
|
||||
if err != context.Canceled {
|
||||
|
||||
@@ -45,6 +45,7 @@ type RootFlags struct {
|
||||
Verbose bool
|
||||
Debug bool
|
||||
Quiet bool
|
||||
SkipErrors bool
|
||||
}
|
||||
|
||||
var rootFlags RootFlags
|
||||
@@ -84,6 +85,7 @@ on the source system.`,
|
||||
cmd.PersistentFlags().BoolVarP(&rootFlags.Verbose, "verbose", "v", false, "Enable verbose output")
|
||||
cmd.PersistentFlags().BoolVar(&rootFlags.Debug, "debug", false, "Enable debug output")
|
||||
cmd.PersistentFlags().BoolVarP(&rootFlags.Quiet, "quiet", "q", false, "Suppress non-error output")
|
||||
cmd.PersistentFlags().BoolVar(&rootFlags.SkipErrors, "skip-errors", false, "Continue past per-file errors instead of aborting (applies to snapshot create and restore)")
|
||||
|
||||
// Add subcommands
|
||||
cmd.AddCommand(
|
||||
|
||||
@@ -49,6 +49,8 @@ specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// Pass snapshot names from args
|
||||
opts.Snapshots = args
|
||||
// --skip-errors is a global flag on the root command.
|
||||
opts.SkipErrors = rootFlags.SkipErrors
|
||||
// Use unified config resolution
|
||||
configPath, err := ResolveConfigPath()
|
||||
if err != nil {
|
||||
@@ -103,7 +105,6 @@ specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
||||
cmd.Flags().BoolVar(&opts.Cron, "cron", false, "Run in cron mode (silent unless error)")
|
||||
cmd.Flags().BoolVar(&opts.Prune, "prune", false, "After backup, drop older snapshots of the same name and remove orphaned blobs")
|
||||
cmd.Flags().StringVar(&opts.KeepNewerThan, "keep-newer-than", "", "With --prune: keep snapshots newer than this duration (e.g. 4w, 30d, 6mo) instead of only the latest")
|
||||
cmd.Flags().BoolVar(&opts.SkipErrors, "skip-errors", false, "Skip file read errors (log them loudly but continue)")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
@@ -331,6 +331,43 @@ func (r *SnapshotRepository) AddFilesByIDBatch(ctx context.Context, tx *sql.Tx,
|
||||
return nil
|
||||
}
|
||||
|
||||
// PopulateReferencedBlobs ensures snapshot_blobs contains an entry for
|
||||
// every blob that holds a chunk referenced by any file in the snapshot.
|
||||
// This is necessary because the AddBlob hook only runs when a blob is
|
||||
// newly uploaded during a snapshot — fully-deduplicated snapshots (where
|
||||
// every chunk already exists in storage from a prior run) would otherwise
|
||||
// have an empty snapshot_blobs set and be impossible to restore.
|
||||
//
|
||||
// Returns the number of rows inserted (i.e. blobs that were previously
|
||||
// referenced indirectly via file_chunks but not yet recorded in
|
||||
// snapshot_blobs for this snapshot).
|
||||
func (r *SnapshotRepository) PopulateReferencedBlobs(ctx context.Context, tx *sql.Tx, snapshotID string) (int64, error) {
|
||||
query := `
|
||||
INSERT OR IGNORE INTO snapshot_blobs (snapshot_id, blob_id, blob_hash)
|
||||
SELECT DISTINCT ?, blobs.id, blobs.blob_hash
|
||||
FROM blobs
|
||||
JOIN blob_chunks ON blob_chunks.blob_id = blobs.id
|
||||
JOIN file_chunks ON file_chunks.chunk_hash = blob_chunks.chunk_hash
|
||||
JOIN snapshot_files ON snapshot_files.file_id = file_chunks.file_id
|
||||
WHERE snapshot_files.snapshot_id = ?
|
||||
AND blobs.blob_hash IS NOT NULL
|
||||
`
|
||||
|
||||
var result sql.Result
|
||||
var err error
|
||||
if tx != nil {
|
||||
result, err = tx.ExecContext(ctx, query, snapshotID, snapshotID)
|
||||
} else {
|
||||
result, err = r.db.ExecWithLog(ctx, query, snapshotID, snapshotID)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("populating referenced blobs: %w", err)
|
||||
}
|
||||
|
||||
n, _ := result.RowsAffected()
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// AddBlob adds a blob to a snapshot
|
||||
func (r *SnapshotRepository) AddBlob(ctx context.Context, tx *sql.Tx, snapshotID string, blobID types.BlobID, blobHash types.BlobHash) error {
|
||||
query := `
|
||||
|
||||
@@ -180,10 +180,20 @@ func (sm *SnapshotManager) UpdateSnapshotStatsExtended(ctx context.Context, snap
|
||||
})
|
||||
}
|
||||
|
||||
// CompleteSnapshot marks a snapshot as completed and exports its metadata
|
||||
// CompleteSnapshot marks a snapshot as completed and ensures snapshot_blobs
|
||||
// is populated with every blob holding any chunk referenced by the
|
||||
// snapshot's files (including deduplicated blobs uploaded by prior
|
||||
// snapshots). Without this, fully-deduplicated snapshots are unrestorable.
|
||||
func (sm *SnapshotManager) CompleteSnapshot(ctx context.Context, snapshotID string) error {
|
||||
// Mark the snapshot as completed
|
||||
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
added, err := sm.repos.Snapshots.PopulateReferencedBlobs(ctx, tx, snapshotID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if added > 0 {
|
||||
log.Info("Populated snapshot_blobs with dedup-referenced blobs",
|
||||
"snapshot_id", snapshotID, "added", added)
|
||||
}
|
||||
return sm.repos.Snapshots.MarkComplete(ctx, tx, snapshotID)
|
||||
})
|
||||
|
||||
|
||||
@@ -703,6 +703,123 @@ func TestEndToEndFileStorage(t *testing.T) {
|
||||
assert.Equal(t, "small.txt", target, "symlink target should be preserved")
|
||||
}
|
||||
|
||||
// TestDedupOnlySnapshotRestores backs up the same directory twice without
|
||||
// touching it between runs, then restores the SECOND (fully-deduplicated)
|
||||
// snapshot. The second snapshot uploads no new blobs — every chunk is
|
||||
// already in storage from the first run. This test guards against the
|
||||
// regression where snapshot_blobs was populated only for blobs uploaded
|
||||
// during the snapshot, leaving fully-deduplicated snapshots unrestorable
|
||||
// with "chunk X not found in any blob" errors.
|
||||
func TestDedupOnlySnapshotRestores(t *testing.T) {
|
||||
log.Initialize(log.Config{})
|
||||
|
||||
fs := afero.NewOsFs()
|
||||
tempDir, err := os.MkdirTemp("", "vaultik-dedup-")
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = os.RemoveAll(tempDir) }()
|
||||
|
||||
dataDir := filepath.Join(tempDir, "source")
|
||||
storeDir := filepath.Join(tempDir, "remote")
|
||||
restoreDir := filepath.Join(tempDir, "restored")
|
||||
dbPath := filepath.Join(tempDir, "index.sqlite")
|
||||
|
||||
chunkSize := int64(64 * 1024)
|
||||
maxBlobSize := int64(512 * 1024)
|
||||
|
||||
testFiles := map[string][]byte{
|
||||
filepath.Join(dataDir, "a.bin"): bytesPattern("a-", int(chunkSize*3)),
|
||||
filepath.Join(dataDir, "b.bin"): bytesPattern("b-", int(chunkSize*2)),
|
||||
}
|
||||
for path, content := range testFiles {
|
||||
require.NoError(t, fs.MkdirAll(filepath.Dir(path), 0o755))
|
||||
require.NoError(t, afero.WriteFile(fs, path, content, 0o644))
|
||||
}
|
||||
|
||||
storer, err := storage.NewFileStorer(storeDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
agePublicKey := "age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"
|
||||
ageSecretKey := "AGE-SECRET-KEY-19CR5YSFW59HM4TLD6GXVEDMZFTVVF7PPHKUT68TXSFPK7APHXA2QS2NJA5"
|
||||
|
||||
cfg := &config.Config{
|
||||
AgeRecipients: []string{agePublicKey},
|
||||
AgeSecretKey: ageSecretKey,
|
||||
CompressionLevel: 3,
|
||||
Hostname: "test-host",
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
db, err := database.New(ctx, dbPath)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = db.Close() }()
|
||||
repos := database.NewRepositories(db)
|
||||
|
||||
makeScanner := func() *snapshot.Scanner {
|
||||
return snapshot.NewScanner(snapshot.ScannerConfig{
|
||||
FS: fs,
|
||||
Storage: storer,
|
||||
ChunkSize: chunkSize,
|
||||
MaxBlobSize: maxBlobSize,
|
||||
CompressionLevel: cfg.CompressionLevel,
|
||||
AgeRecipients: cfg.AgeRecipients,
|
||||
Repositories: repos,
|
||||
})
|
||||
}
|
||||
sm := snapshot.NewSnapshotManager(snapshot.SnapshotManagerParams{
|
||||
Repos: repos, Storage: storer, Config: cfg,
|
||||
})
|
||||
sm.SetFilesystem(fs)
|
||||
|
||||
// First snapshot — uploads all blobs.
|
||||
id1, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "dedup", "v", "g")
|
||||
require.NoError(t, err)
|
||||
r1, err := makeScanner().Scan(ctx, dataDir, id1)
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, r1.BlobsCreated, 0, "first snapshot should upload at least one blob")
|
||||
require.NoError(t, sm.CompleteSnapshot(ctx, id1))
|
||||
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, id1))
|
||||
|
||||
// Second snapshot — same data, every chunk dedups. Sleep past the
|
||||
// second-precision timestamp so the snapshot IDs differ.
|
||||
time.Sleep(1100 * time.Millisecond)
|
||||
id2, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "dedup", "v", "g")
|
||||
require.NoError(t, err)
|
||||
r2, err := makeScanner().Scan(ctx, dataDir, id2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, r2.BlobsCreated, "second snapshot should upload zero new blobs (fully dedup'd)")
|
||||
require.NoError(t, sm.CompleteSnapshot(ctx, id2))
|
||||
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, id2))
|
||||
|
||||
// snapshot_blobs for id2 must be populated despite no uploads.
|
||||
blobHashes, err := repos.Snapshots.GetBlobHashes(ctx, id2)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, blobHashes, "snapshot_blobs for fully-dedup'd snapshot must reference blobs uploaded by prior snapshot")
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
restoreVaultik := &vaultik.Vaultik{
|
||||
Config: cfg,
|
||||
Storage: storer,
|
||||
Fs: fs,
|
||||
Stdout: io.Discard,
|
||||
Stderr: io.Discard,
|
||||
}
|
||||
restoreVaultik.SetContext(ctx)
|
||||
|
||||
require.NoError(t, restoreVaultik.Restore(&vaultik.RestoreOptions{
|
||||
SnapshotID: id2,
|
||||
TargetDir: restoreDir,
|
||||
Verify: true,
|
||||
}))
|
||||
|
||||
for origPath, expected := range testFiles {
|
||||
restoredPath := filepath.Join(restoreDir, origPath)
|
||||
got, err := afero.ReadFile(fs, restoredPath)
|
||||
require.NoError(t, err, "restored file missing: %s", restoredPath)
|
||||
require.Equalf(t, expected, got, "byte-equality failed for %s", origPath)
|
||||
}
|
||||
}
|
||||
|
||||
// bytesPattern returns a deterministic byte slice of length n with a tag prefix,
|
||||
// useful for forcing chunker behavior with reproducible content.
|
||||
func bytesPattern(tag string, n int) []byte {
|
||||
|
||||
@@ -35,6 +35,7 @@ type RestoreOptions struct {
|
||||
TargetDir string
|
||||
Paths []string // Optional paths to restore (empty = all)
|
||||
Verify bool // Verify restored files by checking chunk hashes
|
||||
SkipErrors bool // Continue past file-restore errors instead of aborting
|
||||
}
|
||||
|
||||
// RestoreResult contains statistics from a restore operation
|
||||
@@ -195,6 +196,10 @@ func (v *Vaultik) restoreAllFiles(
|
||||
|
||||
if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, result); err != nil {
|
||||
log.Error("Failed to restore file", "path", file.Path, "error", err)
|
||||
if !opts.SkipErrors {
|
||||
return nil, fmt.Errorf("restoring %s: %w (pass --skip-errors to continue past restore failures)", file.Path, err)
|
||||
}
|
||||
v.UI.Error("Failed to restore %s: %v. Skipping (--skip-errors).", v.UI.Path(file.Path.String()), err)
|
||||
result.FilesFailed++
|
||||
result.FailedFiles = append(result.FailedFiles, file.Path.String())
|
||||
// Update progress bar even on failure
|
||||
|
||||
Reference in New Issue
Block a user