This commit is contained in:
248
internal/vaultik/restore_sweeper_integration_test.go
Normal file
248
internal/vaultik/restore_sweeper_integration_test.go
Normal file
@@ -0,0 +1,248 @@
|
||||
package vaultik_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/afero"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"sneak.berlin/go/vaultik/internal/config"
|
||||
"sneak.berlin/go/vaultik/internal/database"
|
||||
"sneak.berlin/go/vaultik/internal/log"
|
||||
"sneak.berlin/go/vaultik/internal/snapshot"
|
||||
"sneak.berlin/go/vaultik/internal/storage"
|
||||
"sneak.berlin/go/vaultik/internal/ui"
|
||||
"sneak.berlin/go/vaultik/internal/vaultik"
|
||||
)
|
||||
|
||||
// TestRestoreSweeperEvictsBlobs exercises the reference-counted blob
|
||||
// disk cache eviction during restore.
|
||||
//
|
||||
// The scenario: 30 unique 1 MB random files plus 10 duplicates of those
|
||||
// (40 files total, 30 MB of unique content) get backed up with a 10 MB
|
||||
// blob_size_limit. After backup the snapshot's encrypted blobs are
|
||||
// restored through Vaultik.Restore, and per-key Get counts on the
|
||||
// storage layer are recorded. Each blob in the snapshot MUST be
|
||||
// downloaded exactly once — re-downloads would mean the sweeper either
|
||||
// evicted a blob that was still needed (LRU regression) or that the
|
||||
// cache held nothing at all (broken cache).
|
||||
//
|
||||
// The duplicates ensure deduplicated files share blobs with their
|
||||
// originals; the sweeper must keep each blob alive until BOTH the
|
||||
// original AND every duplicate referencing its chunks have been
|
||||
// restored.
|
||||
func TestRestoreSweeperEvictsBlobs(t *testing.T) {
|
||||
log.Initialize(log.Config{})
|
||||
|
||||
fs := afero.NewOsFs()
|
||||
tempDir, err := os.MkdirTemp("", "vaultik-sweeper-")
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = os.RemoveAll(tempDir) }()
|
||||
|
||||
dataDir := filepath.Join(tempDir, "source")
|
||||
storeDir := filepath.Join(tempDir, "remote")
|
||||
restoreDir := filepath.Join(tempDir, "restored")
|
||||
dbPath := filepath.Join(tempDir, "index.sqlite")
|
||||
|
||||
require.NoError(t, fs.MkdirAll(dataDir, 0o755))
|
||||
|
||||
// Generate 30 unique 1 MB random files. The PRNG seed is fixed so
|
||||
// failures are reproducible; the entropy is what matters here — the
|
||||
// FastCDC chunker needs realistic-looking data to pick chunk
|
||||
// boundaries naturally.
|
||||
const (
|
||||
uniqueFiles = 30
|
||||
duplicateFiles = 10
|
||||
fileSize = 1 * 1024 * 1024
|
||||
)
|
||||
rng := rand.New(rand.NewSource(42))
|
||||
|
||||
type sourceFile struct {
|
||||
path string
|
||||
data []byte
|
||||
}
|
||||
uniques := make([]sourceFile, 0, uniqueFiles)
|
||||
expected := make(map[string][]byte, uniqueFiles+duplicateFiles)
|
||||
|
||||
for i := 0; i < uniqueFiles; i++ {
|
||||
data := make([]byte, fileSize)
|
||||
_, err := rng.Read(data)
|
||||
require.NoError(t, err)
|
||||
path := filepath.Join(dataDir, fmt.Sprintf("unique-%02d.bin", i))
|
||||
require.NoError(t, afero.WriteFile(fs, path, data, 0o644))
|
||||
uniques = append(uniques, sourceFile{path: path, data: data})
|
||||
expected[path] = data
|
||||
}
|
||||
|
||||
// Pick 10 of the originals and copy each to a fresh path so the
|
||||
// chunker dedups them against the originals' blobs.
|
||||
for i, idx := range rng.Perm(uniqueFiles)[:duplicateFiles] {
|
||||
src := uniques[idx]
|
||||
dstPath := filepath.Join(dataDir, fmt.Sprintf("dup-%02d.bin", i))
|
||||
require.NoError(t, afero.WriteFile(fs, dstPath, src.data, 0o644))
|
||||
expected[dstPath] = src.data
|
||||
}
|
||||
|
||||
chunkSize := int64(64 * 1024)
|
||||
maxBlobSize := int64(10 * 1024 * 1024)
|
||||
|
||||
storer, err := storage.NewFileStorer(storeDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
agePublicKey := "age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"
|
||||
ageSecretKey := "AGE-SECRET-KEY-19CR5YSFW59HM4TLD6GXVEDMZFTVVF7PPHKUT68TXSFPK7APHXA2QS2NJA5"
|
||||
|
||||
cfg := &config.Config{
|
||||
AgeRecipients: []string{agePublicKey},
|
||||
AgeSecretKey: ageSecretKey,
|
||||
CompressionLevel: 3,
|
||||
Hostname: "test-host",
|
||||
BlobSizeLimit: config.Size(maxBlobSize),
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
db, err := database.New(ctx, dbPath)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = db.Close() }()
|
||||
|
||||
repos := database.NewRepositories(db)
|
||||
|
||||
sm := snapshot.NewSnapshotManager(snapshot.SnapshotManagerParams{
|
||||
Repos: repos,
|
||||
Storage: storer,
|
||||
Config: cfg,
|
||||
})
|
||||
sm.SetFilesystem(fs)
|
||||
|
||||
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
||||
FS: fs,
|
||||
Storage: storer,
|
||||
ChunkSize: chunkSize,
|
||||
MaxBlobSize: maxBlobSize,
|
||||
CompressionLevel: cfg.CompressionLevel,
|
||||
AgeRecipients: cfg.AgeRecipients,
|
||||
Repositories: repos,
|
||||
})
|
||||
|
||||
snapshotID, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "sweeper", "test-version", "test-git")
|
||||
require.NoError(t, err)
|
||||
|
||||
scanResult, err := scanner.Scan(ctx, dataDir, snapshotID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uniqueFiles+duplicateFiles, scanResult.FilesScanned)
|
||||
require.Greater(t, scanResult.BlobsCreated, 1, "30 MB of unique data at 10 MB blob size should yield multiple blobs")
|
||||
|
||||
require.NoError(t, sm.CompleteSnapshot(ctx, snapshotID))
|
||||
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, snapshotID))
|
||||
|
||||
// Count blobs actually present on disk; this is the ground-truth
|
||||
// figure each blob's GET count must equal exactly once.
|
||||
blobCount := countBlobsOnDisk(t, storeDir)
|
||||
require.Greater(t, blobCount, 1, "expected more than one blob")
|
||||
t.Logf("backup produced %d blobs from %d files (%d unique + %d duplicates)",
|
||||
blobCount, uniqueFiles+duplicateFiles, uniqueFiles, duplicateFiles)
|
||||
|
||||
// Force restore to operate without the source-side index, exactly
|
||||
// as a real restore on a fresh machine would.
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
counter := newCountingStorer(storer)
|
||||
|
||||
restoreVaultik := &vaultik.Vaultik{
|
||||
Config: cfg,
|
||||
Storage: counter,
|
||||
Fs: fs,
|
||||
Stdout: io.Discard,
|
||||
Stderr: io.Discard,
|
||||
UI: ui.NewWithColor(io.Discard, false),
|
||||
}
|
||||
restoreVaultik.SetContext(ctx)
|
||||
|
||||
require.NoError(t, restoreVaultik.Restore(&vaultik.RestoreOptions{
|
||||
SnapshotID: snapshotID,
|
||||
TargetDir: restoreDir,
|
||||
}))
|
||||
|
||||
// Verify every restored file byte-matches its source.
|
||||
for origPath, want := range expected {
|
||||
restoredPath := filepath.Join(restoreDir, origPath)
|
||||
got, err := afero.ReadFile(fs, restoredPath)
|
||||
require.NoErrorf(t, err, "restored file missing: %s", restoredPath)
|
||||
require.Equalf(t, want, got, "byte mismatch for %s", origPath)
|
||||
}
|
||||
|
||||
// Each blob must have been downloaded exactly once. >1 means the
|
||||
// sweeper evicted a still-needed blob; 0 means the cache silently
|
||||
// stopped being consulted.
|
||||
blobDownloads := 0
|
||||
for key, count := range counter.snapshot() {
|
||||
if !strings.HasPrefix(key, "blobs/") {
|
||||
continue
|
||||
}
|
||||
assert.Equalf(t, 1, count,
|
||||
"blob %s should have been downloaded exactly once during restore, got %d", key, count)
|
||||
blobDownloads++
|
||||
}
|
||||
assert.Equal(t, blobCount, blobDownloads,
|
||||
"every blob on disk should have been fetched exactly once during restore")
|
||||
t.Logf("restore downloaded %d blobs, each exactly once", blobDownloads)
|
||||
}
|
||||
|
||||
// countingStorer wraps a Storer and records the number of Get calls per
|
||||
// key. Used to verify that the restore-side blob cache + sweeper avoid
|
||||
// re-downloading blobs that are evicted while still needed.
|
||||
type countingStorer struct {
|
||||
storage.Storer
|
||||
mu sync.Mutex
|
||||
counts map[string]int
|
||||
}
|
||||
|
||||
func newCountingStorer(inner storage.Storer) *countingStorer {
|
||||
return &countingStorer{Storer: inner, counts: make(map[string]int)}
|
||||
}
|
||||
|
||||
func (c *countingStorer) Get(ctx context.Context, key string) (io.ReadCloser, error) {
|
||||
c.mu.Lock()
|
||||
c.counts[key]++
|
||||
c.mu.Unlock()
|
||||
return c.Storer.Get(ctx, key)
|
||||
}
|
||||
|
||||
func (c *countingStorer) snapshot() map[string]int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
out := make(map[string]int, len(c.counts))
|
||||
for k, v := range c.counts {
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// countBlobsOnDisk walks the blobs/ tree of a FileStorer-backed store
|
||||
// and returns the total number of blob files. Used to ground-truth the
|
||||
// expected number of restore-time downloads.
|
||||
func countBlobsOnDisk(t *testing.T, storeDir string) int {
|
||||
t.Helper()
|
||||
count := 0
|
||||
root := filepath.Join(storeDir, "blobs")
|
||||
err := filepath.Walk(root, func(_ string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
count++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
return count
|
||||
}
|
||||
Reference in New Issue
Block a user