Merge fix/dedup-only-snapshot-restore
All checks were successful
check / check (push) Successful in 1m58s
All checks were successful
check / check (push) Successful in 1m58s
This commit is contained in:
@@ -331,6 +331,43 @@ func (r *SnapshotRepository) AddFilesByIDBatch(ctx context.Context, tx *sql.Tx,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PopulateReferencedBlobs ensures snapshot_blobs contains an entry for
|
||||||
|
// every blob that holds a chunk referenced by any file in the snapshot.
|
||||||
|
// This is necessary because the AddBlob hook only runs when a blob is
|
||||||
|
// newly uploaded during a snapshot — fully-deduplicated snapshots (where
|
||||||
|
// every chunk already exists in storage from a prior run) would otherwise
|
||||||
|
// have an empty snapshot_blobs set and be impossible to restore.
|
||||||
|
//
|
||||||
|
// Returns the number of rows inserted (i.e. blobs that were previously
|
||||||
|
// referenced indirectly via file_chunks but not yet recorded in
|
||||||
|
// snapshot_blobs for this snapshot).
|
||||||
|
func (r *SnapshotRepository) PopulateReferencedBlobs(ctx context.Context, tx *sql.Tx, snapshotID string) (int64, error) {
|
||||||
|
query := `
|
||||||
|
INSERT OR IGNORE INTO snapshot_blobs (snapshot_id, blob_id, blob_hash)
|
||||||
|
SELECT DISTINCT ?, blobs.id, blobs.blob_hash
|
||||||
|
FROM blobs
|
||||||
|
JOIN blob_chunks ON blob_chunks.blob_id = blobs.id
|
||||||
|
JOIN file_chunks ON file_chunks.chunk_hash = blob_chunks.chunk_hash
|
||||||
|
JOIN snapshot_files ON snapshot_files.file_id = file_chunks.file_id
|
||||||
|
WHERE snapshot_files.snapshot_id = ?
|
||||||
|
AND blobs.blob_hash IS NOT NULL
|
||||||
|
`
|
||||||
|
|
||||||
|
var result sql.Result
|
||||||
|
var err error
|
||||||
|
if tx != nil {
|
||||||
|
result, err = tx.ExecContext(ctx, query, snapshotID, snapshotID)
|
||||||
|
} else {
|
||||||
|
result, err = r.db.ExecWithLog(ctx, query, snapshotID, snapshotID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("populating referenced blobs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, _ := result.RowsAffected()
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
// AddBlob adds a blob to a snapshot
|
// AddBlob adds a blob to a snapshot
|
||||||
func (r *SnapshotRepository) AddBlob(ctx context.Context, tx *sql.Tx, snapshotID string, blobID types.BlobID, blobHash types.BlobHash) error {
|
func (r *SnapshotRepository) AddBlob(ctx context.Context, tx *sql.Tx, snapshotID string, blobID types.BlobID, blobHash types.BlobHash) error {
|
||||||
query := `
|
query := `
|
||||||
|
|||||||
@@ -180,10 +180,20 @@ func (sm *SnapshotManager) UpdateSnapshotStatsExtended(ctx context.Context, snap
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompleteSnapshot marks a snapshot as completed and exports its metadata
|
// CompleteSnapshot marks a snapshot as completed and ensures snapshot_blobs
|
||||||
|
// is populated with every blob holding any chunk referenced by the
|
||||||
|
// snapshot's files (including deduplicated blobs uploaded by prior
|
||||||
|
// snapshots). Without this, fully-deduplicated snapshots are unrestorable.
|
||||||
func (sm *SnapshotManager) CompleteSnapshot(ctx context.Context, snapshotID string) error {
|
func (sm *SnapshotManager) CompleteSnapshot(ctx context.Context, snapshotID string) error {
|
||||||
// Mark the snapshot as completed
|
|
||||||
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||||
|
added, err := sm.repos.Snapshots.PopulateReferencedBlobs(ctx, tx, snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if added > 0 {
|
||||||
|
log.Info("Populated snapshot_blobs with dedup-referenced blobs",
|
||||||
|
"snapshot_id", snapshotID, "added", added)
|
||||||
|
}
|
||||||
return sm.repos.Snapshots.MarkComplete(ctx, tx, snapshotID)
|
return sm.repos.Snapshots.MarkComplete(ctx, tx, snapshotID)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -703,6 +703,123 @@ func TestEndToEndFileStorage(t *testing.T) {
|
|||||||
assert.Equal(t, "small.txt", target, "symlink target should be preserved")
|
assert.Equal(t, "small.txt", target, "symlink target should be preserved")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestDedupOnlySnapshotRestores backs up the same directory twice without
|
||||||
|
// touching it between runs, then restores the SECOND (fully-deduplicated)
|
||||||
|
// snapshot. The second snapshot uploads no new blobs — every chunk is
|
||||||
|
// already in storage from the first run. This test guards against the
|
||||||
|
// regression where snapshot_blobs was populated only for blobs uploaded
|
||||||
|
// during the snapshot, leaving fully-deduplicated snapshots unrestorable
|
||||||
|
// with "chunk X not found in any blob" errors.
|
||||||
|
func TestDedupOnlySnapshotRestores(t *testing.T) {
|
||||||
|
log.Initialize(log.Config{})
|
||||||
|
|
||||||
|
fs := afero.NewOsFs()
|
||||||
|
tempDir, err := os.MkdirTemp("", "vaultik-dedup-")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() { _ = os.RemoveAll(tempDir) }()
|
||||||
|
|
||||||
|
dataDir := filepath.Join(tempDir, "source")
|
||||||
|
storeDir := filepath.Join(tempDir, "remote")
|
||||||
|
restoreDir := filepath.Join(tempDir, "restored")
|
||||||
|
dbPath := filepath.Join(tempDir, "index.sqlite")
|
||||||
|
|
||||||
|
chunkSize := int64(64 * 1024)
|
||||||
|
maxBlobSize := int64(512 * 1024)
|
||||||
|
|
||||||
|
testFiles := map[string][]byte{
|
||||||
|
filepath.Join(dataDir, "a.bin"): bytesPattern("a-", int(chunkSize*3)),
|
||||||
|
filepath.Join(dataDir, "b.bin"): bytesPattern("b-", int(chunkSize*2)),
|
||||||
|
}
|
||||||
|
for path, content := range testFiles {
|
||||||
|
require.NoError(t, fs.MkdirAll(filepath.Dir(path), 0o755))
|
||||||
|
require.NoError(t, afero.WriteFile(fs, path, content, 0o644))
|
||||||
|
}
|
||||||
|
|
||||||
|
storer, err := storage.NewFileStorer(storeDir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
agePublicKey := "age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"
|
||||||
|
ageSecretKey := "AGE-SECRET-KEY-19CR5YSFW59HM4TLD6GXVEDMZFTVVF7PPHKUT68TXSFPK7APHXA2QS2NJA5"
|
||||||
|
|
||||||
|
cfg := &config.Config{
|
||||||
|
AgeRecipients: []string{agePublicKey},
|
||||||
|
AgeSecretKey: ageSecretKey,
|
||||||
|
CompressionLevel: 3,
|
||||||
|
Hostname: "test-host",
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
db, err := database.New(ctx, dbPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() { _ = db.Close() }()
|
||||||
|
repos := database.NewRepositories(db)
|
||||||
|
|
||||||
|
makeScanner := func() *snapshot.Scanner {
|
||||||
|
return snapshot.NewScanner(snapshot.ScannerConfig{
|
||||||
|
FS: fs,
|
||||||
|
Storage: storer,
|
||||||
|
ChunkSize: chunkSize,
|
||||||
|
MaxBlobSize: maxBlobSize,
|
||||||
|
CompressionLevel: cfg.CompressionLevel,
|
||||||
|
AgeRecipients: cfg.AgeRecipients,
|
||||||
|
Repositories: repos,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sm := snapshot.NewSnapshotManager(snapshot.SnapshotManagerParams{
|
||||||
|
Repos: repos, Storage: storer, Config: cfg,
|
||||||
|
})
|
||||||
|
sm.SetFilesystem(fs)
|
||||||
|
|
||||||
|
// First snapshot — uploads all blobs.
|
||||||
|
id1, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "dedup", "v", "g")
|
||||||
|
require.NoError(t, err)
|
||||||
|
r1, err := makeScanner().Scan(ctx, dataDir, id1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Greater(t, r1.BlobsCreated, 0, "first snapshot should upload at least one blob")
|
||||||
|
require.NoError(t, sm.CompleteSnapshot(ctx, id1))
|
||||||
|
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, id1))
|
||||||
|
|
||||||
|
// Second snapshot — same data, every chunk dedups. Sleep past the
|
||||||
|
// second-precision timestamp so the snapshot IDs differ.
|
||||||
|
time.Sleep(1100 * time.Millisecond)
|
||||||
|
id2, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "dedup", "v", "g")
|
||||||
|
require.NoError(t, err)
|
||||||
|
r2, err := makeScanner().Scan(ctx, dataDir, id2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, r2.BlobsCreated, "second snapshot should upload zero new blobs (fully dedup'd)")
|
||||||
|
require.NoError(t, sm.CompleteSnapshot(ctx, id2))
|
||||||
|
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, id2))
|
||||||
|
|
||||||
|
// snapshot_blobs for id2 must be populated despite no uploads.
|
||||||
|
blobHashes, err := repos.Snapshots.GetBlobHashes(ctx, id2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, blobHashes, "snapshot_blobs for fully-dedup'd snapshot must reference blobs uploaded by prior snapshot")
|
||||||
|
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
restoreVaultik := &vaultik.Vaultik{
|
||||||
|
Config: cfg,
|
||||||
|
Storage: storer,
|
||||||
|
Fs: fs,
|
||||||
|
Stdout: io.Discard,
|
||||||
|
Stderr: io.Discard,
|
||||||
|
}
|
||||||
|
restoreVaultik.SetContext(ctx)
|
||||||
|
|
||||||
|
require.NoError(t, restoreVaultik.Restore(&vaultik.RestoreOptions{
|
||||||
|
SnapshotID: id2,
|
||||||
|
TargetDir: restoreDir,
|
||||||
|
Verify: true,
|
||||||
|
}))
|
||||||
|
|
||||||
|
for origPath, expected := range testFiles {
|
||||||
|
restoredPath := filepath.Join(restoreDir, origPath)
|
||||||
|
got, err := afero.ReadFile(fs, restoredPath)
|
||||||
|
require.NoError(t, err, "restored file missing: %s", restoredPath)
|
||||||
|
require.Equalf(t, expected, got, "byte-equality failed for %s", origPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// bytesPattern returns a deterministic byte slice of length n with a tag prefix,
|
// bytesPattern returns a deterministic byte slice of length n with a tag prefix,
|
||||||
// useful for forcing chunker behavior with reproducible content.
|
// useful for forcing chunker behavior with reproducible content.
|
||||||
func bytesPattern(tag string, n int) []byte {
|
func bytesPattern(tag string, n int) []byte {
|
||||||
|
|||||||
Reference in New Issue
Block a user