diff --git a/internal/vaultik/blobcache.go b/internal/vaultik/blobcache.go index 397d366..553b187 100644 --- a/internal/vaultik/blobcache.go +++ b/internal/vaultik/blobcache.go @@ -18,6 +18,11 @@ type blobDiskCacheEntry struct { // blobDiskCache is an LRU cache that stores blobs on disk instead of in memory. // Blobs are written to a temp directory keyed by their hash. When total size // exceeds maxBytes, the least-recently-used entries are evicted (deleted from disk). +// +// The Get/ReadAt/peak-Len counters are debugging instrumentation used by +// tests to assert that the restore code path uses ReadAt (which reads +// only the requested slice of a blob) rather than Get (which reads the +// full blob into memory). type blobDiskCache struct { mu sync.Mutex dir string @@ -26,6 +31,11 @@ type blobDiskCache struct { items map[string]*blobDiskCacheEntry head *blobDiskCacheEntry // most recent tail *blobDiskCacheEntry // least recent + + // Instrumentation. Mutated under mu; readable via the methods below. + getCalls int + readAtCalls int + peakLen int } // newBlobDiskCache creates a new disk-based blob cache with the given max size. @@ -115,12 +125,17 @@ func (c *blobDiskCache) Put(key string, data []byte) error { c.evictLRU() } + if n := len(c.items); n > c.peakLen { + c.peakLen = n + } + return nil } // Get reads a cached blob from disk. Returns data and true on hit. func (c *blobDiskCache) Get(key string) ([]byte, bool) { c.mu.Lock() + c.getCalls++ e, ok := c.items[key] if !ok { c.mu.Unlock() @@ -147,6 +162,7 @@ func (c *blobDiskCache) Get(key string) ([]byte, bool) { // ReadAt reads a slice of a cached blob without loading the entire blob into memory. func (c *blobDiskCache) ReadAt(key string, offset, length int64) ([]byte, error) { c.mu.Lock() + c.readAtCalls++ e, ok := c.items[key] if !ok { c.mu.Unlock() @@ -223,6 +239,28 @@ func (c *blobDiskCache) Len() int { return len(c.items) } +// GetCalls returns the number of times Get has been called. +func (c *blobDiskCache) GetCalls() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.getCalls +} + +// ReadAtCalls returns the number of times ReadAt has been called. +func (c *blobDiskCache) ReadAtCalls() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.readAtCalls +} + +// PeakLen returns the maximum number of cached entries ever held at +// once during this cache's lifetime. +func (c *blobDiskCache) PeakLen() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.peakLen +} + // Close removes the cache directory and all cached blobs. func (c *blobDiskCache) Close() error { c.mu.Lock() diff --git a/internal/vaultik/restore.go b/internal/vaultik/restore.go index 5adaf9e..fb04d7a 100644 --- a/internal/vaultik/restore.go +++ b/internal/vaultik/restore.go @@ -177,7 +177,15 @@ func (v *Vaultik) restoreAllFiles( if err != nil { return nil, fmt.Errorf("creating blob cache: %w", err) } - defer func() { _ = blobCache.Close() }() + if v.restoreCacheObserver != nil { + v.restoreCacheObserver(blobCache) + } + defer func() { + if v.restoreCacheObserver != nil { + v.restoreCacheObserver(blobCache) + } + _ = blobCache.Close() + }() // Per-restore sweep state: every blob_size_limit/100 bytes written, // scan the cache and delete any blob whose remaining file references diff --git a/internal/vaultik/restore_locality_test.go b/internal/vaultik/restore_locality_test.go new file mode 100644 index 0000000..4a29bc2 --- /dev/null +++ b/internal/vaultik/restore_locality_test.go @@ -0,0 +1,315 @@ +package vaultik + +import ( + "bytes" + "context" + "crypto/rand" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "sync" + "testing" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sneak.berlin/go/vaultik/internal/config" + "sneak.berlin/go/vaultik/internal/database" + "sneak.berlin/go/vaultik/internal/log" + "sneak.berlin/go/vaultik/internal/snapshot" + "sneak.berlin/go/vaultik/internal/storage" + "sneak.berlin/go/vaultik/internal/ui" +) + +// TestRestoreLocalityAndReadAt asserts three properties of the restore +// hot path that together produce acceptable throughput on real-world +// snapshots. All three currently fail on main: +// +// 1. Peak blob cache occupancy ≤ 1. +// Restore order must respect blob locality: every file fully +// contained within the currently cached blob should be restored +// before any other blob is downloaded. The sweeper then frees +// each blob as soon as its file set is exhausted. Without smart +// ordering, path-order interleaves blobs and the cache holds +// every touched blob until the last file referencing it lands. +// +// 2. Each remote blob is fetched exactly once. +// Counted via wrapping the Storer. +// +// 3. blobDiskCache.Get is never called during restore. +// Chunk extraction from a cached blob must go through ReadAt, +// which reads only the chunk's bytes from disk. Get reads the +// entire blob (up to 50 GB in production) into memory just to +// slice out a few KB — currently the dominant cost in restore. +// +// The test deliberately constructs an adversarial scenario: three +// blobs A/B/C of ~6 MB each, nine files distributed across them, and +// path-ordered names that interleave the blobs (a1, b1, c1, a2, b2, +// c2, …) so naive path-order processing would touch every blob before +// finishing any of them. +func TestRestoreLocalityAndReadAt(t *testing.T) { + log.Initialize(log.Config{}) + + fs := afero.NewOsFs() + tempDir, err := os.MkdirTemp("", "vaultik-locality-") + require.NoError(t, err) + defer func() { _ = os.RemoveAll(tempDir) }() + + dataDir := filepath.Join(tempDir, "source") + storeDir := filepath.Join(tempDir, "remote") + restoreDir := filepath.Join(tempDir, "restored") + dbPath := filepath.Join(tempDir, "index.sqlite") + + require.NoError(t, fs.MkdirAll(dataDir, 0o755)) + + // Layout: 15 source files of exactly 1 MiB each. With + // chunkSize (avg) = 4 MiB the chunker's minSize is 1 MiB, so any + // file of 1 MiB becomes a single chunk. With a 5 MiB blob limit + // the packer fits exactly 5 chunks per blob, producing 3 blobs + // containing src-001..005, src-006..010, src-011..015. + // + // Then add 9 "copy" files — byte-for-byte clones of three of the + // sources (one from each blob group) — with interleaved names + // (cp-001-A, cp-002-B, cp-003-C, cp-004-A, …) so a naive + // path-ordered restore would touch all three blobs before + // finishing any of them. + const ( + srcBytes = 1024 * 1024 + srcCount = 15 + blobsCount = 3 + perBlob = srcCount / blobsCount + ) + + type source struct { + path string + data []byte + } + sources := make([]*source, srcCount) + for i := 0; i < srcCount; i++ { + s := &source{ + path: fmt.Sprintf("src-%03d.bin", i+1), + data: randomBytes(t, srcBytes), + } + sources[i] = s + require.NoError(t, afero.WriteFile(fs, filepath.Join(dataDir, s.path), s.data, 0o644)) + } + + // Pick one representative source per blob group (src-001 → blob + // 1, src-006 → blob 2, src-011 → blob 3) and create 3 copies of + // each with interleaved alphabetical names. + type copyFile struct { + path string + data []byte + sourceBlob int // 0, 1, or 2 + sourceIndex int // index into sources slice + } + groupReps := []int{0, perBlob, 2 * perBlob} // 0, 5, 10 + letters := []byte{'A', 'B', 'C'} + var copies []copyFile + for i := 0; i < 3; i++ { + for j := 0; j < blobsCount; j++ { + seq := i*blobsCount + j + 1 + name := fmt.Sprintf("cp-%03d-%c.bin", seq, letters[j]) + path := filepath.Join(dataDir, name) + src := sources[groupReps[j]] + require.NoError(t, afero.WriteFile(fs, path, src.data, 0o644)) + copies = append(copies, copyFile{path: path, data: src.data, sourceBlob: j, sourceIndex: groupReps[j]}) + } + } + + // chunkSize avg = 4 MiB makes minSize = 1 MiB, so a 1 MiB file + // becomes one chunk. maxBlobSize = 5 MiB packs exactly 5 chunks + // per blob, yielding 3 blobs from 15 source files. + chunkSize := int64(4 * 1024 * 1024) + maxBlobSize := int64(5 * 1024 * 1024) + + storer, err := storage.NewFileStorer(storeDir) + require.NoError(t, err) + + agePublicKey := "age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg" + ageSecretKey := "AGE-SECRET-KEY-19CR5YSFW59HM4TLD6GXVEDMZFTVVF7PPHKUT68TXSFPK7APHXA2QS2NJA5" + + cfg := &config.Config{ + AgeRecipients: []string{agePublicKey}, + AgeSecretKey: ageSecretKey, + CompressionLevel: 3, + Hostname: "test-host", + BlobSizeLimit: config.Size(maxBlobSize), + } + + ctx := context.Background() + + db, err := database.New(ctx, dbPath) + require.NoError(t, err) + defer func() { _ = db.Close() }() + + repos := database.NewRepositories(db) + + sm := snapshot.NewSnapshotManager(snapshot.SnapshotManagerParams{ + Repos: repos, + Storage: storer, + Config: cfg, + }) + sm.SetFilesystem(fs) + + scanner := snapshot.NewScanner(snapshot.ScannerConfig{ + FS: fs, + Storage: storer, + ChunkSize: chunkSize, + MaxBlobSize: maxBlobSize, + CompressionLevel: cfg.CompressionLevel, + AgeRecipients: cfg.AgeRecipients, + Repositories: repos, + }) + + snapshotID, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "locality", "test-version", "test-git") + require.NoError(t, err) + + _, err = scanner.Scan(ctx, dataDir, snapshotID) + require.NoError(t, err) + + require.NoError(t, sm.CompleteSnapshot(ctx, snapshotID)) + require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, snapshotID)) + + blobsOnDisk := listBlobKeys(t, storeDir) + t.Logf("backup produced %d blobs", len(blobsOnDisk)) + require.GreaterOrEqual(t, len(blobsOnDisk), 3, "expected at least 3 blobs from 3 filler groups") + + require.NoError(t, db.Close()) + + // Wrap the storer so we can count downloads per blob key. + counter := newCountingStorer(storer) + + // Capture the restore-side cache for instrumentation inspection. + // The observer fires twice (immediately after creation and + // immediately before close) so we read PeakLen and call counters + // from the same instance the production code used. + var cacheRef *blobDiskCache + v := &Vaultik{ + Config: cfg, + Storage: counter, + Fs: fs, + Stdout: io.Discard, + Stderr: io.Discard, + UI: ui.NewWithColor(io.Discard, false), + restoreCacheObserver: func(c *blobDiskCache) { + cacheRef = c + }, + } + v.SetContext(ctx) + + require.NoError(t, v.Restore(&RestoreOptions{ + SnapshotID: snapshotID, + TargetDir: restoreDir, + })) + + require.NotNil(t, cacheRef, "restoreCacheObserver must fire during restore") + + // Verify restored content matches. + for _, s := range sources { + restored := filepath.Join(restoreDir, dataDir, s.path) + got, err := afero.ReadFile(fs, restored) + require.NoErrorf(t, err, "source missing after restore: %s", s.path) + require.Truef(t, bytes.Equal(got, s.data), "byte mismatch for source %s", s.path) + } + for _, c := range copies { + restored := filepath.Join(restoreDir, c.path) + got, err := afero.ReadFile(fs, restored) + require.NoErrorf(t, err, "copy missing after restore: %s", c.path) + require.Truef(t, bytes.Equal(got, c.data), "byte mismatch for copy %s", c.path) + } + + // (1) Each blob fetched exactly once. + for key, n := range counter.snapshot() { + if !filterBlobKey(key) { + continue + } + assert.Equalf(t, 1, n, "blob %s fetched %d times, want exactly 1", key, n) + } + + // (2) Peak cache size ≤ 1. The sweeper plus locality-aware + // ordering should free each blob before the next one downloads. + assert.LessOrEqualf(t, cacheRef.PeakLen(), 1, + "peak cached blobs was %d; expected ≤ 1 with locality-ordered restore", cacheRef.PeakLen()) + + // (3) Cache.Get must never be called during restore — chunk + // extraction has to go through ReadAt so we never read the whole + // blob from disk to grab a few KB slice. + assert.Equalf(t, 0, cacheRef.GetCalls(), + "blobDiskCache.Get was called %d times during restore; restore must use ReadAt exclusively", cacheRef.GetCalls()) + + t.Logf("blob cache stats: peak_len=%d get_calls=%d readat_calls=%d", + cacheRef.PeakLen(), cacheRef.GetCalls(), cacheRef.ReadAtCalls()) +} + +// randomBytes returns n bytes of random data. Used to make sure the +// chunker picks non-degenerate FastCDC boundaries. +func randomBytes(t *testing.T, n int) []byte { + t.Helper() + b := make([]byte, n) + _, err := rand.Read(b) + require.NoError(t, err) + return b +} + +// listBlobKeys walks the FileStorer blobs/ tree and returns the +// relative keys for every blob file present. +func listBlobKeys(t *testing.T, storeDir string) []string { + t.Helper() + var keys []string + root := filepath.Join(storeDir, "blobs") + err := filepath.Walk(root, func(p string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + rel, _ := filepath.Rel(storeDir, p) + keys = append(keys, rel) + return nil + }) + require.NoError(t, err) + sort.Strings(keys) + return keys +} + +// filterBlobKey returns true when key looks like a blob storage path +// (rather than a snapshot metadata path). +func filterBlobKey(key string) bool { + return len(key) > 6 && key[:6] == "blobs/" +} + +// countingStorerInternal wraps a storage.Storer and records the number +// of Get calls per key, so the locality test can assert each blob is +// fetched exactly once. Defined here (rather than reusing the one in +// the integration_test package) because this test lives in package +// vaultik for access to unexported cache internals. +type countingStorerInternal struct { + storage.Storer + mu sync.Mutex + counts map[string]int +} + +func newCountingStorer(inner storage.Storer) *countingStorerInternal { + return &countingStorerInternal{Storer: inner, counts: make(map[string]int)} +} + +func (c *countingStorerInternal) Get(ctx context.Context, key string) (io.ReadCloser, error) { + c.mu.Lock() + c.counts[key]++ + c.mu.Unlock() + return c.Storer.Get(ctx, key) +} + +func (c *countingStorerInternal) snapshot() map[string]int { + c.mu.Lock() + defer c.mu.Unlock() + out := make(map[string]int, len(c.counts)) + for k, v := range c.counts { + out[k] = v + } + return out +} diff --git a/internal/vaultik/vaultik.go b/internal/vaultik/vaultik.go index 7ffbfee..91c93c1 100644 --- a/internal/vaultik/vaultik.go +++ b/internal/vaultik/vaultik.go @@ -44,6 +44,13 @@ type Vaultik struct { // writer wrapping Stdout; the cli layer replaces it with a discarding // writer in --cron mode. UI *ui.Writer + + // restoreCacheObserver, if non-nil, is invoked once with the + // restore-side blob disk cache immediately after the cache is + // created and again immediately before it is closed. Only + // internal-package tests set this; the type is unexported so + // callers outside this package can't reach it. + restoreCacheObserver func(*blobDiskCache) } // VaultikParams contains all parameters for New that can be provided by fx