Merge test/restore-locality-and-readat
All checks were successful
check / check (push) Successful in 2m17s
All checks were successful
check / check (push) Successful in 2m17s
This commit is contained in:
@@ -130,6 +130,51 @@ func (r *BlobRepository) GetByID(ctx context.Context, id string) (*Blob, error)
|
|||||||
return &blob, nil
|
return &blob, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetAll returns every blob row keyed by blob ID. Useful at restore
|
||||||
|
// start to translate the per-chunk blob_id references in chunkToBlobMap
|
||||||
|
// into blob hashes without doing one GetByID query per chunk.
|
||||||
|
func (r *BlobRepository) GetAll(ctx context.Context) (map[string]*Blob, error) {
|
||||||
|
query := `
|
||||||
|
SELECT id, blob_hash, created_ts, finished_ts, uncompressed_size, compressed_size, uploaded_ts
|
||||||
|
FROM blobs
|
||||||
|
`
|
||||||
|
|
||||||
|
rows, err := r.db.conn.QueryContext(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("querying blobs: %w", err)
|
||||||
|
}
|
||||||
|
defer CloseRows(rows)
|
||||||
|
|
||||||
|
out := make(map[string]*Blob)
|
||||||
|
for rows.Next() {
|
||||||
|
var blob Blob
|
||||||
|
var createdTSUnix int64
|
||||||
|
var finishedTSUnix, uploadedTSUnix sql.NullInt64
|
||||||
|
if err := rows.Scan(
|
||||||
|
&blob.ID,
|
||||||
|
&blob.Hash,
|
||||||
|
&createdTSUnix,
|
||||||
|
&finishedTSUnix,
|
||||||
|
&blob.UncompressedSize,
|
||||||
|
&blob.CompressedSize,
|
||||||
|
&uploadedTSUnix,
|
||||||
|
); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning blob: %w", err)
|
||||||
|
}
|
||||||
|
blob.CreatedTS = time.Unix(createdTSUnix, 0).UTC()
|
||||||
|
if finishedTSUnix.Valid {
|
||||||
|
ts := time.Unix(finishedTSUnix.Int64, 0).UTC()
|
||||||
|
blob.FinishedTS = &ts
|
||||||
|
}
|
||||||
|
if uploadedTSUnix.Valid {
|
||||||
|
ts := time.Unix(uploadedTSUnix.Int64, 0).UTC()
|
||||||
|
blob.UploadedTS = &ts
|
||||||
|
}
|
||||||
|
out[blob.ID.String()] = &blob
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateFinished updates a blob when it's finalized
|
// UpdateFinished updates a blob when it's finalized
|
||||||
func (r *BlobRepository) UpdateFinished(ctx context.Context, tx *sql.Tx, id string, hash string, uncompressedSize, compressedSize int64) error {
|
func (r *BlobRepository) UpdateFinished(ctx context.Context, tx *sql.Tx, id string, hash string, uncompressedSize, compressedSize int64) error {
|
||||||
query := `
|
query := `
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package vaultik
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -18,6 +19,11 @@ type blobDiskCacheEntry struct {
|
|||||||
// blobDiskCache is an LRU cache that stores blobs on disk instead of in memory.
|
// blobDiskCache is an LRU cache that stores blobs on disk instead of in memory.
|
||||||
// Blobs are written to a temp directory keyed by their hash. When total size
|
// Blobs are written to a temp directory keyed by their hash. When total size
|
||||||
// exceeds maxBytes, the least-recently-used entries are evicted (deleted from disk).
|
// exceeds maxBytes, the least-recently-used entries are evicted (deleted from disk).
|
||||||
|
//
|
||||||
|
// The Get/ReadAt/peak-Len counters are debugging instrumentation used by
|
||||||
|
// tests to assert that the restore code path uses ReadAt (which reads
|
||||||
|
// only the requested slice of a blob) rather than Get (which reads the
|
||||||
|
// full blob into memory).
|
||||||
type blobDiskCache struct {
|
type blobDiskCache struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
dir string
|
dir string
|
||||||
@@ -26,6 +32,11 @@ type blobDiskCache struct {
|
|||||||
items map[string]*blobDiskCacheEntry
|
items map[string]*blobDiskCacheEntry
|
||||||
head *blobDiskCacheEntry // most recent
|
head *blobDiskCacheEntry // most recent
|
||||||
tail *blobDiskCacheEntry // least recent
|
tail *blobDiskCacheEntry // least recent
|
||||||
|
|
||||||
|
// Instrumentation. Mutated under mu; readable via the methods below.
|
||||||
|
getCalls int
|
||||||
|
readAtCalls int
|
||||||
|
peakLen int
|
||||||
}
|
}
|
||||||
|
|
||||||
// newBlobDiskCache creates a new disk-based blob cache with the given max size.
|
// newBlobDiskCache creates a new disk-based blob cache with the given max size.
|
||||||
@@ -115,12 +126,77 @@ func (c *blobDiskCache) Put(key string, data []byte) error {
|
|||||||
c.evictLRU()
|
c.evictLRU()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if n := len(c.items); n > c.peakLen {
|
||||||
|
c.peakLen = n
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutFromReader streams r into the cache file for key, returning the
|
||||||
|
// total number of bytes written. Unlike Put, the data never has to
|
||||||
|
// reside fully in memory at any point — io.Copy uses an internal
|
||||||
|
// 32 KiB buffer. Used by restore to land a freshly decrypted blob on
|
||||||
|
// disk without buffering its entire plaintext (which may be tens of GB)
|
||||||
|
// in RAM.
|
||||||
|
func (c *blobDiskCache) PutFromReader(key string, r io.Reader) (int64, error) {
|
||||||
|
c.mu.Lock()
|
||||||
|
// Remove any prior entry first; we'll re-link after the file is
|
||||||
|
// written successfully.
|
||||||
|
if e, ok := c.items[key]; ok {
|
||||||
|
c.unlink(e)
|
||||||
|
c.curBytes -= e.size
|
||||||
|
_ = os.Remove(c.path(key))
|
||||||
|
delete(c.items, key)
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
f, err := os.OpenFile(c.path(key), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("creating cache file: %w", err)
|
||||||
|
}
|
||||||
|
written, copyErr := io.Copy(f, r)
|
||||||
|
closeErr := f.Close()
|
||||||
|
if copyErr != nil {
|
||||||
|
_ = os.Remove(c.path(key))
|
||||||
|
return written, fmt.Errorf("streaming to cache file: %w", copyErr)
|
||||||
|
}
|
||||||
|
if closeErr != nil {
|
||||||
|
_ = os.Remove(c.path(key))
|
||||||
|
return written, fmt.Errorf("closing cache file: %w", closeErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
// If the entry would exceed maxBytes outright, drop it on the
|
||||||
|
// floor — but the restore path passes math.MaxInt64 as maxBytes
|
||||||
|
// so this branch is effectively unreachable there.
|
||||||
|
if written > c.maxBytes {
|
||||||
|
_ = os.Remove(c.path(key))
|
||||||
|
return written, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
e := &blobDiskCacheEntry{key: key, size: written}
|
||||||
|
c.pushFront(e)
|
||||||
|
c.items[key] = e
|
||||||
|
c.curBytes += written
|
||||||
|
|
||||||
|
for c.curBytes > c.maxBytes && c.tail != nil {
|
||||||
|
c.evictLRU()
|
||||||
|
}
|
||||||
|
|
||||||
|
if n := len(c.items); n > c.peakLen {
|
||||||
|
c.peakLen = n
|
||||||
|
}
|
||||||
|
|
||||||
|
return written, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Get reads a cached blob from disk. Returns data and true on hit.
|
// Get reads a cached blob from disk. Returns data and true on hit.
|
||||||
func (c *blobDiskCache) Get(key string) ([]byte, bool) {
|
func (c *blobDiskCache) Get(key string) ([]byte, bool) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
c.getCalls++
|
||||||
e, ok := c.items[key]
|
e, ok := c.items[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
@@ -147,6 +223,7 @@ func (c *blobDiskCache) Get(key string) ([]byte, bool) {
|
|||||||
// ReadAt reads a slice of a cached blob without loading the entire blob into memory.
|
// ReadAt reads a slice of a cached blob without loading the entire blob into memory.
|
||||||
func (c *blobDiskCache) ReadAt(key string, offset, length int64) ([]byte, error) {
|
func (c *blobDiskCache) ReadAt(key string, offset, length int64) ([]byte, error) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
c.readAtCalls++
|
||||||
e, ok := c.items[key]
|
e, ok := c.items[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
@@ -223,6 +300,28 @@ func (c *blobDiskCache) Len() int {
|
|||||||
return len(c.items)
|
return len(c.items)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetCalls returns the number of times Get has been called.
|
||||||
|
func (c *blobDiskCache) GetCalls() int {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
return c.getCalls
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadAtCalls returns the number of times ReadAt has been called.
|
||||||
|
func (c *blobDiskCache) ReadAtCalls() int {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
return c.readAtCalls
|
||||||
|
}
|
||||||
|
|
||||||
|
// PeakLen returns the maximum number of cached entries ever held at
|
||||||
|
// once during this cache's lifetime.
|
||||||
|
func (c *blobDiskCache) PeakLen() int {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
return c.peakLen
|
||||||
|
}
|
||||||
|
|
||||||
// Close removes the cache directory and all cached blobs.
|
// Close removes the cache directory and all cached blobs.
|
||||||
func (c *blobDiskCache) Close() error {
|
func (c *blobDiskCache) Close() error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
|||||||
@@ -159,7 +159,12 @@ func (v *Vaultik) prepareRestoreIdentity() (age.Identity, error) {
|
|||||||
return identity, nil
|
return identity, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// restoreAllFiles iterates over files and restores each one, tracking progress and failures
|
// restoreAllFiles processes files in blob-locality order: drain every
|
||||||
|
// file whose blob set is on disk, download the missing blobs for the
|
||||||
|
// pending file with the smallest uncached count, repeat. This keeps
|
||||||
|
// peak cache occupancy near 1 even on snapshots whose path order
|
||||||
|
// interleaves blobs, and lets the sweeper free each blob the moment
|
||||||
|
// its file set is exhausted.
|
||||||
func (v *Vaultik) restoreAllFiles(
|
func (v *Vaultik) restoreAllFiles(
|
||||||
files []*database.File,
|
files []*database.File,
|
||||||
repos *database.Repositories,
|
repos *database.Repositories,
|
||||||
@@ -177,13 +182,47 @@ func (v *Vaultik) restoreAllFiles(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("creating blob cache: %w", err)
|
return nil, fmt.Errorf("creating blob cache: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = blobCache.Close() }()
|
if v.restoreCacheObserver != nil {
|
||||||
|
v.restoreCacheObserver(blobCache)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if v.restoreCacheObserver != nil {
|
||||||
|
v.restoreCacheObserver(blobCache)
|
||||||
|
}
|
||||||
|
_ = blobCache.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
// Per-restore sweep state: every blob_size_limit/100 bytes written,
|
// Per-restore sweep state: every blob_size_limit/100 bytes written,
|
||||||
// scan the cache and delete any blob whose remaining file references
|
// scan the cache and delete any blob whose remaining file references
|
||||||
// are all already restored.
|
// are all already restored.
|
||||||
sweeper := newRestoreSweeper(v.ctx, repos, blobCache, v.Config.BlobSizeLimit.Int64()/100)
|
sweeper := newRestoreSweeper(v.ctx, repos, blobCache, v.Config.BlobSizeLimit.Int64()/100)
|
||||||
|
|
||||||
|
// Pre-fetch every blob row once so chunk extraction can map a
|
||||||
|
// blob_id to its hash without a DB round-trip per chunk.
|
||||||
|
blobsByID, err := repos.Blobs.GetAll(v.ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("fetching blob index: %w", err)
|
||||||
|
}
|
||||||
|
blobIDToHash := make(map[string]string, len(blobsByID))
|
||||||
|
blobByHash := make(map[string]*database.Blob, len(blobsByID))
|
||||||
|
for id, blob := range blobsByID {
|
||||||
|
hash := blob.Hash.String()
|
||||||
|
blobIDToHash[id] = hash
|
||||||
|
blobByHash[hash] = blob
|
||||||
|
}
|
||||||
|
|
||||||
|
plan, err := newRestorePlan(v.ctx, repos, files, chunkToBlobMap, blobIDToHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("building restore plan: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index files by ID so the loop can look them up by the IDs the
|
||||||
|
// plan hands back.
|
||||||
|
filesByID := make(map[types.FileID]*database.File, len(files))
|
||||||
|
for _, f := range files {
|
||||||
|
filesByID[f.ID] = f
|
||||||
|
}
|
||||||
|
|
||||||
// Calculate total bytes expected for percentage / ETA arithmetic.
|
// Calculate total bytes expected for percentage / ETA arithmetic.
|
||||||
var totalBytesExpected int64
|
var totalBytesExpected int64
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
@@ -195,17 +234,65 @@ func (v *Vaultik) restoreAllFiles(
|
|||||||
v.UI.Size(totalBytesExpected),
|
v.UI.Size(totalBytesExpected),
|
||||||
v.UI.Path(opts.TargetDir))
|
v.UI.Path(opts.TargetDir))
|
||||||
|
|
||||||
|
session := &restoreSession{
|
||||||
|
v: v,
|
||||||
|
ctx: v.ctx,
|
||||||
|
repos: repos,
|
||||||
|
opts: opts,
|
||||||
|
identity: identity,
|
||||||
|
chunkToBlobMap: chunkToBlobMap,
|
||||||
|
blobByHash: blobByHash,
|
||||||
|
blobIDToHash: blobIDToHash,
|
||||||
|
blobCache: blobCache,
|
||||||
|
sweeper: sweeper,
|
||||||
|
result: result,
|
||||||
|
}
|
||||||
|
|
||||||
// Periodic progress output, matching the snapshot create cadence.
|
// Periodic progress output, matching the snapshot create cadence.
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
lastStatusTime := startTime
|
lastStatusTime := startTime
|
||||||
const statusInterval = 15 * time.Second
|
const statusInterval = 15 * time.Second
|
||||||
|
|
||||||
for i, file := range files {
|
processed := 0
|
||||||
|
for plan.hasPending() {
|
||||||
if v.ctx.Err() != nil {
|
if v.ctx.Err() != nil {
|
||||||
return nil, v.ctx.Err()
|
return nil, v.ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, sweeper, result); err != nil {
|
fileID, ready := plan.popReady()
|
||||||
|
if !ready {
|
||||||
|
// No file is fully cache-served. First free any blobs
|
||||||
|
// whose file sets are exhausted — without this, the
|
||||||
|
// blob whose last file we just finished would still be
|
||||||
|
// cached when we Put the next one, briefly pushing
|
||||||
|
// peak occupancy from 1 to 2.
|
||||||
|
sweeper.sweep()
|
||||||
|
|
||||||
|
// Pick the pending file with the smallest uncached
|
||||||
|
// blob set and download its blobs. After each blob
|
||||||
|
// lands, the plan moves any pending file whose set
|
||||||
|
// just emptied onto the ready queue.
|
||||||
|
next := plan.pickNextDownload()
|
||||||
|
if next.IsZero() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for _, hash := range plan.blobsNeeded(next) {
|
||||||
|
blob, ok := blobByHash[hash]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("blob hash %s missing from blob index", hash[:16])
|
||||||
|
}
|
||||||
|
if err := session.downloadBlobToCache(hash, blob.CompressedSize); err != nil {
|
||||||
|
return nil, fmt.Errorf("downloading blob %s: %w", hash[:16], err)
|
||||||
|
}
|
||||||
|
result.BlobsDownloaded++
|
||||||
|
result.BytesDownloaded += blob.CompressedSize
|
||||||
|
plan.markBlobCached(hash)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
file := filesByID[fileID]
|
||||||
|
if err := session.restoreFile(file); err != nil {
|
||||||
log.Error("Failed to restore file", "path", file.Path, "error", err)
|
log.Error("Failed to restore file", "path", file.Path, "error", err)
|
||||||
if !opts.SkipErrors {
|
if !opts.SkipErrors {
|
||||||
return nil, fmt.Errorf("restoring %s: %w (pass --skip-errors to continue past restore failures)", file.Path, err)
|
return nil, fmt.Errorf("restoring %s: %w (pass --skip-errors to continue past restore failures)", file.Path, err)
|
||||||
@@ -213,22 +300,26 @@ func (v *Vaultik) restoreAllFiles(
|
|||||||
v.UI.Error("Failed to restore %s: %v. Skipping (--skip-errors).", v.UI.Path(file.Path.String()), err)
|
v.UI.Error("Failed to restore %s: %v. Skipping (--skip-errors).", v.UI.Path(file.Path.String()), err)
|
||||||
result.FilesFailed++
|
result.FilesFailed++
|
||||||
result.FailedFiles = append(result.FailedFiles, file.Path.String())
|
result.FailedFiles = append(result.FailedFiles, file.Path.String())
|
||||||
|
plan.finishFile(fileID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record the file as restored so the sweeper can free blobs once
|
// Record the file as restored so the sweeper can free blobs
|
||||||
// all referencing files are done.
|
// once all referencing files are done, and drop it from the
|
||||||
sweeper.fileRestored(file.ID.String())
|
// plan's indexes so future picks ignore it.
|
||||||
|
sweeper.fileRestored(fileID.String())
|
||||||
|
plan.finishFile(fileID)
|
||||||
|
processed++
|
||||||
|
|
||||||
if time.Since(lastStatusTime) >= statusInterval {
|
if time.Since(lastStatusTime) >= statusInterval {
|
||||||
v.printRestoreProgress(i+1, len(files), result.BytesRestored, totalBytesExpected, startTime)
|
v.printRestoreProgress(processed, len(files), result.BytesRestored, totalBytesExpected, startTime)
|
||||||
lastStatusTime = time.Now()
|
lastStatusTime = time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Structured progress log for --verbose / JSON consumers.
|
// Structured progress log for --verbose / JSON consumers.
|
||||||
if (i+1)%100 == 0 || i+1 == len(files) {
|
if processed%100 == 0 || processed == len(files) {
|
||||||
log.Info("Restore progress",
|
log.Info("Restore progress",
|
||||||
"files", fmt.Sprintf("%d/%d", i+1, len(files)),
|
"files", fmt.Sprintf("%d/%d", processed, len(files)),
|
||||||
"bytes", humanize.Bytes(uint64(result.BytesRestored)),
|
"bytes", humanize.Bytes(uint64(result.BytesRestored)),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -424,183 +515,128 @@ func (v *Vaultik) buildChunkToBlobMap(ctx context.Context, repos *database.Repos
|
|||||||
return result, rows.Err()
|
return result, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// restoreFile restores a single file
|
// restoreSession holds every piece of per-restore state shared by the
|
||||||
func (v *Vaultik) restoreFile(
|
// restore-time methods. Each restore builds one of these from the
|
||||||
ctx context.Context,
|
// snapshot's metadata and then drives the file loop through methods on
|
||||||
repos *database.Repositories,
|
// it. Keeping this state on the struct rather than threading it
|
||||||
file *database.File,
|
// through every function signature keeps the inner-loop call sites
|
||||||
targetDir string,
|
// readable: restoreFile(file) instead of a ten-argument helper.
|
||||||
identity age.Identity,
|
type restoreSession struct {
|
||||||
chunkToBlobMap map[string]*database.BlobChunk,
|
v *Vaultik
|
||||||
blobCache *blobDiskCache,
|
ctx context.Context
|
||||||
sweeper *restoreSweeper,
|
repos *database.Repositories
|
||||||
result *RestoreResult,
|
opts *RestoreOptions
|
||||||
) error {
|
identity age.Identity
|
||||||
// Calculate target path - use full original path under target directory
|
chunkToBlobMap map[string]*database.BlobChunk
|
||||||
targetPath := filepath.Join(targetDir, file.Path.String())
|
blobByHash map[string]*database.Blob
|
||||||
|
blobIDToHash map[string]string
|
||||||
|
blobCache *blobDiskCache
|
||||||
|
sweeper *restoreSweeper
|
||||||
|
result *RestoreResult
|
||||||
|
}
|
||||||
|
|
||||||
// Create parent directories
|
// restoreFile dispatches to the right per-kind restorer.
|
||||||
|
func (s *restoreSession) restoreFile(file *database.File) error {
|
||||||
|
targetPath := filepath.Join(s.opts.TargetDir, file.Path.String())
|
||||||
parentDir := filepath.Dir(targetPath)
|
parentDir := filepath.Dir(targetPath)
|
||||||
if err := v.Fs.MkdirAll(parentDir, 0755); err != nil {
|
if err := s.v.Fs.MkdirAll(parentDir, 0755); err != nil {
|
||||||
return fmt.Errorf("creating parent directory: %w", err)
|
return fmt.Errorf("creating parent directory: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle symlinks
|
|
||||||
if file.IsSymlink() {
|
if file.IsSymlink() {
|
||||||
return v.restoreSymlink(file, targetPath, result)
|
return s.restoreSymlink(file, targetPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle directories
|
|
||||||
if file.Mode&uint32(os.ModeDir) != 0 {
|
if file.Mode&uint32(os.ModeDir) != 0 {
|
||||||
return v.restoreDirectory(file, targetPath, result)
|
return s.restoreDirectory(file, targetPath)
|
||||||
|
}
|
||||||
|
return s.restoreRegularFile(file, targetPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle regular files
|
// restoreSymlink restores a symbolic link.
|
||||||
return v.restoreRegularFile(ctx, repos, file, targetPath, identity, chunkToBlobMap, blobCache, sweeper, result)
|
func (s *restoreSession) restoreSymlink(file *database.File, targetPath string) error {
|
||||||
}
|
_ = s.v.Fs.Remove(targetPath)
|
||||||
|
// afero.MemMapFs doesn't support symlinks, so route real-FS
|
||||||
// restoreSymlink restores a symbolic link
|
// symlinks through os.
|
||||||
func (v *Vaultik) restoreSymlink(file *database.File, targetPath string, result *RestoreResult) error {
|
if _, ok := s.v.Fs.(*afero.OsFs); ok {
|
||||||
// Remove existing file if it exists
|
|
||||||
_ = v.Fs.Remove(targetPath)
|
|
||||||
|
|
||||||
// Create symlink
|
|
||||||
// Note: afero.MemMapFs doesn't support symlinks, so we use os for real filesystems
|
|
||||||
if osFs, ok := v.Fs.(*afero.OsFs); ok {
|
|
||||||
_ = osFs // silence unused variable warning
|
|
||||||
if err := os.Symlink(file.LinkTarget.String(), targetPath); err != nil {
|
if err := os.Symlink(file.LinkTarget.String(), targetPath); err != nil {
|
||||||
return fmt.Errorf("creating symlink: %w", err)
|
return fmt.Errorf("creating symlink: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Debug("Symlink creation not supported on this filesystem", "path", file.Path, "target", file.LinkTarget)
|
log.Debug("Symlink creation not supported on this filesystem", "path", file.Path, "target", file.LinkTarget)
|
||||||
}
|
}
|
||||||
|
s.result.FilesRestored++
|
||||||
result.FilesRestored++
|
|
||||||
log.Debug("Restored symlink", "path", file.Path, "target", file.LinkTarget)
|
log.Debug("Restored symlink", "path", file.Path, "target", file.LinkTarget)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// restoreDirectory restores a directory with proper permissions
|
// restoreDirectory restores a directory with its permissions, mtime,
|
||||||
func (v *Vaultik) restoreDirectory(file *database.File, targetPath string, result *RestoreResult) error {
|
// and (on real filesystems, with sufficient privileges) ownership.
|
||||||
// Create directory
|
func (s *restoreSession) restoreDirectory(file *database.File, targetPath string) error {
|
||||||
if err := v.Fs.MkdirAll(targetPath, os.FileMode(file.Mode)); err != nil {
|
if err := s.v.Fs.MkdirAll(targetPath, os.FileMode(file.Mode)); err != nil {
|
||||||
return fmt.Errorf("creating directory: %w", err)
|
return fmt.Errorf("creating directory: %w", err)
|
||||||
}
|
}
|
||||||
|
if err := s.v.Fs.Chmod(targetPath, os.FileMode(file.Mode)); err != nil {
|
||||||
// Set permissions
|
|
||||||
if err := v.Fs.Chmod(targetPath, os.FileMode(file.Mode)); err != nil {
|
|
||||||
log.Debug("Failed to set directory permissions", "path", targetPath, "error", err)
|
log.Debug("Failed to set directory permissions", "path", targetPath, "error", err)
|
||||||
}
|
}
|
||||||
|
if _, ok := s.v.Fs.(*afero.OsFs); ok {
|
||||||
// Set ownership (requires root)
|
|
||||||
if osFs, ok := v.Fs.(*afero.OsFs); ok {
|
|
||||||
_ = osFs
|
|
||||||
if err := os.Chown(targetPath, int(file.UID), int(file.GID)); err != nil {
|
if err := os.Chown(targetPath, int(file.UID), int(file.GID)); err != nil {
|
||||||
log.Debug("Failed to set directory ownership", "path", targetPath, "error", err)
|
log.Debug("Failed to set directory ownership", "path", targetPath, "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err := s.v.Fs.Chtimes(targetPath, file.MTime, file.MTime); err != nil {
|
||||||
// Set mtime
|
|
||||||
if err := v.Fs.Chtimes(targetPath, file.MTime, file.MTime); err != nil {
|
|
||||||
log.Debug("Failed to set directory mtime", "path", targetPath, "error", err)
|
log.Debug("Failed to set directory mtime", "path", targetPath, "error", err)
|
||||||
}
|
}
|
||||||
|
s.result.FilesRestored++
|
||||||
result.FilesRestored++
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// restoreRegularFile restores a regular file by reconstructing it from chunks
|
// restoreRegularFile reconstructs a regular file by reading chunks
|
||||||
func (v *Vaultik) restoreRegularFile(
|
// directly out of cached blobs via ReadAt. The expectation when this
|
||||||
ctx context.Context,
|
// method runs is that every blob this file needs is already in the
|
||||||
repos *database.Repositories,
|
// disk cache — the planner guarantees that by only marking files
|
||||||
file *database.File,
|
// "ready" once their full blob set is on disk.
|
||||||
targetPath string,
|
func (s *restoreSession) restoreRegularFile(file *database.File, targetPath string) error {
|
||||||
identity age.Identity,
|
|
||||||
chunkToBlobMap map[string]*database.BlobChunk,
|
|
||||||
blobCache *blobDiskCache,
|
|
||||||
sweeper *restoreSweeper,
|
|
||||||
result *RestoreResult,
|
|
||||||
) error {
|
|
||||||
fileStart := time.Now()
|
fileStart := time.Now()
|
||||||
|
|
||||||
// Get file chunks in order
|
|
||||||
t0 := time.Now()
|
t0 := time.Now()
|
||||||
fileChunks, err := repos.FileChunks.GetByFileID(ctx, file.ID)
|
fileChunks, err := s.repos.FileChunks.GetByFileID(s.ctx, file.ID)
|
||||||
fileChunksQueryDur := time.Since(t0)
|
fileChunksQueryDur := time.Since(t0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("getting file chunks: %w", err)
|
return fmt.Errorf("getting file chunks: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create output file
|
|
||||||
t0 = time.Now()
|
t0 = time.Now()
|
||||||
outFile, err := v.Fs.Create(targetPath)
|
outFile, err := s.v.Fs.Create(targetPath)
|
||||||
createDur := time.Since(t0)
|
createDur := time.Since(t0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("creating output file: %w", err)
|
return fmt.Errorf("creating output file: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = outFile.Close() }()
|
defer func() { _ = outFile.Close() }()
|
||||||
|
|
||||||
// Per-file timing buckets so --debug shows exactly where seconds go.
|
|
||||||
var (
|
var (
|
||||||
blobDBLookupDur time.Duration
|
readAtDur time.Duration
|
||||||
cacheGetDur time.Duration
|
|
||||||
downloadDur time.Duration
|
|
||||||
cachePutDur time.Duration
|
|
||||||
writeDur time.Duration
|
writeDur time.Duration
|
||||||
sweeperDur time.Duration
|
sweeperDur time.Duration
|
||||||
downloadCount int
|
|
||||||
cacheHitCount int
|
|
||||||
bytesWritten int64
|
bytesWritten int64
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, fc := range fileChunks {
|
for _, fc := range fileChunks {
|
||||||
// Find which blob contains this chunk
|
|
||||||
chunkHashStr := fc.ChunkHash.String()
|
chunkHashStr := fc.ChunkHash.String()
|
||||||
blobChunk, ok := chunkToBlobMap[chunkHashStr]
|
blobChunk, ok := s.chunkToBlobMap[chunkHashStr]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("chunk %s not found in any blob", chunkHashStr[:16])
|
return fmt.Errorf("chunk %s not found in any blob", chunkHashStr[:16])
|
||||||
}
|
}
|
||||||
|
blobHash, ok := s.blobIDToHash[blobChunk.BlobID.String()]
|
||||||
// Get the blob's hash from the database (runs per chunk).
|
|
||||||
t0 = time.Now()
|
|
||||||
blob, err := repos.Blobs.GetByID(ctx, blobChunk.BlobID.String())
|
|
||||||
blobDBLookupDur += time.Since(t0)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("getting blob %s: %w", blobChunk.BlobID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Download and decrypt blob if not cached
|
|
||||||
blobHashStr := blob.Hash.String()
|
|
||||||
t0 = time.Now()
|
|
||||||
blobData, ok := blobCache.Get(blobHashStr)
|
|
||||||
cacheGetDur += time.Since(t0)
|
|
||||||
if !ok {
|
if !ok {
|
||||||
|
return fmt.Errorf("blob id %s missing from hash index", blobChunk.BlobID)
|
||||||
|
}
|
||||||
|
|
||||||
t0 = time.Now()
|
t0 = time.Now()
|
||||||
blobData, err = v.downloadBlob(ctx, blobHashStr, blob.CompressedSize, identity)
|
chunkData, err := s.blobCache.ReadAt(blobHash, blobChunk.Offset, blobChunk.Length)
|
||||||
downloadDur += time.Since(t0)
|
readAtDur += time.Since(t0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("downloading blob %s: %w", blobHashStr[:16], err)
|
return fmt.Errorf("reading chunk %s from cached blob %s: %w", fc.ChunkHash[:16], blobHash[:16], err)
|
||||||
}
|
|
||||||
t0 = time.Now()
|
|
||||||
if putErr := blobCache.Put(blobHashStr, blobData); putErr != nil {
|
|
||||||
log.Debug("Failed to cache blob on disk", "hash", blobHashStr[:16], "error", putErr)
|
|
||||||
}
|
|
||||||
cachePutDur += time.Since(t0)
|
|
||||||
downloadCount++
|
|
||||||
result.BlobsDownloaded++
|
|
||||||
result.BytesDownloaded += blob.CompressedSize
|
|
||||||
} else {
|
|
||||||
cacheHitCount++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract chunk from blob
|
|
||||||
if blobChunk.Offset+blobChunk.Length > int64(len(blobData)) {
|
|
||||||
return fmt.Errorf("chunk %s extends beyond blob data (offset=%d, length=%d, blob_size=%d)",
|
|
||||||
fc.ChunkHash[:16], blobChunk.Offset, blobChunk.Length, len(blobData))
|
|
||||||
}
|
|
||||||
chunkData := blobData[blobChunk.Offset : blobChunk.Offset+blobChunk.Length]
|
|
||||||
|
|
||||||
// Write chunk to output file
|
|
||||||
t0 = time.Now()
|
t0 = time.Now()
|
||||||
n, err := outFile.Write(chunkData)
|
n, err := outFile.Write(chunkData)
|
||||||
writeDur += time.Since(t0)
|
writeDur += time.Since(t0)
|
||||||
@@ -609,11 +645,8 @@ func (v *Vaultik) restoreRegularFile(
|
|||||||
}
|
}
|
||||||
bytesWritten += int64(n)
|
bytesWritten += int64(n)
|
||||||
|
|
||||||
// Tell the sweeper about the bytes we just restored so it can
|
|
||||||
// run an eviction sweep once the accumulated total crosses its
|
|
||||||
// threshold (config.BlobSizeLimit/100).
|
|
||||||
t0 = time.Now()
|
t0 = time.Now()
|
||||||
sweeper.chunkRestored(int64(n))
|
s.sweeper.chunkRestored(int64(n))
|
||||||
sweeperDur += time.Since(t0)
|
sweeperDur += time.Since(t0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -621,89 +654,72 @@ func (v *Vaultik) restoreRegularFile(
|
|||||||
"path", file.Path,
|
"path", file.Path,
|
||||||
"chunks", len(fileChunks),
|
"chunks", len(fileChunks),
|
||||||
"bytes_written", bytesWritten,
|
"bytes_written", bytesWritten,
|
||||||
"downloads", downloadCount,
|
|
||||||
"cache_hits", cacheHitCount,
|
|
||||||
"ms_total", time.Since(fileStart).Milliseconds(),
|
"ms_total", time.Since(fileStart).Milliseconds(),
|
||||||
"ms_file_chunks_query", fileChunksQueryDur.Milliseconds(),
|
"ms_file_chunks_query", fileChunksQueryDur.Milliseconds(),
|
||||||
"ms_create", createDur.Milliseconds(),
|
"ms_create", createDur.Milliseconds(),
|
||||||
"ms_blob_db_lookups", blobDBLookupDur.Milliseconds(),
|
"ms_readat", readAtDur.Milliseconds(),
|
||||||
"ms_cache_gets", cacheGetDur.Milliseconds(),
|
|
||||||
"ms_cache_puts", cachePutDur.Milliseconds(),
|
|
||||||
"ms_downloads", downloadDur.Milliseconds(),
|
|
||||||
"ms_writes", writeDur.Milliseconds(),
|
"ms_writes", writeDur.Milliseconds(),
|
||||||
"ms_sweeper", sweeperDur.Milliseconds(),
|
"ms_sweeper", sweeperDur.Milliseconds(),
|
||||||
)
|
)
|
||||||
|
|
||||||
// Close file before setting metadata
|
|
||||||
if err := outFile.Close(); err != nil {
|
if err := outFile.Close(); err != nil {
|
||||||
return fmt.Errorf("closing output file: %w", err)
|
return fmt.Errorf("closing output file: %w", err)
|
||||||
}
|
}
|
||||||
|
if err := s.v.Fs.Chmod(targetPath, os.FileMode(file.Mode)); err != nil {
|
||||||
// Set permissions
|
|
||||||
if err := v.Fs.Chmod(targetPath, os.FileMode(file.Mode)); err != nil {
|
|
||||||
log.Debug("Failed to set file permissions", "path", targetPath, "error", err)
|
log.Debug("Failed to set file permissions", "path", targetPath, "error", err)
|
||||||
}
|
}
|
||||||
|
if _, ok := s.v.Fs.(*afero.OsFs); ok {
|
||||||
// Set ownership (requires root)
|
|
||||||
if osFs, ok := v.Fs.(*afero.OsFs); ok {
|
|
||||||
_ = osFs
|
|
||||||
if err := os.Chown(targetPath, int(file.UID), int(file.GID)); err != nil {
|
if err := os.Chown(targetPath, int(file.UID), int(file.GID)); err != nil {
|
||||||
log.Debug("Failed to set file ownership", "path", targetPath, "error", err)
|
log.Debug("Failed to set file ownership", "path", targetPath, "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err := s.v.Fs.Chtimes(targetPath, file.MTime, file.MTime); err != nil {
|
||||||
// Set mtime
|
|
||||||
if err := v.Fs.Chtimes(targetPath, file.MTime, file.MTime); err != nil {
|
|
||||||
log.Debug("Failed to set file mtime", "path", targetPath, "error", err)
|
log.Debug("Failed to set file mtime", "path", targetPath, "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
result.FilesRestored++
|
s.result.FilesRestored++
|
||||||
result.BytesRestored += bytesWritten
|
s.result.BytesRestored += bytesWritten
|
||||||
|
|
||||||
log.Debug("Restored file", "path", file.Path, "size", humanize.Bytes(uint64(bytesWritten)))
|
log.Debug("Restored file", "path", file.Path, "size", humanize.Bytes(uint64(bytesWritten)))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// downloadBlob downloads and decrypts a blob, returning the plaintext.
|
// downloadBlobToCache streams a blob from remote storage straight into
|
||||||
// Emits a debug log line splitting time spent in the network fetch (Get
|
// the disk cache, decrypting and decompressing on the fly. The
|
||||||
// + Stat round-trips) from the streaming decrypt/decompress/read phase
|
// plaintext never lives fully in memory — io.Copy through
|
||||||
// so --debug shows which side of the wire is the bottleneck.
|
// blobDiskCache.PutFromReader uses a 32 KiB buffer regardless of blob
|
||||||
func (v *Vaultik) downloadBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) ([]byte, error) {
|
// size, which is what makes multi-GB blobs tractable on machines with
|
||||||
|
// less RAM than the blob.
|
||||||
|
func (s *restoreSession) downloadBlobToCache(blobHash string, expectedSize int64) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
t0 := time.Now()
|
t0 := time.Now()
|
||||||
rc, err := v.FetchAndDecryptBlob(ctx, blobHash, expectedSize, identity)
|
rc, err := s.v.FetchAndDecryptBlob(s.ctx, blobHash, expectedSize, s.identity)
|
||||||
fetchSetupDur := time.Since(t0)
|
fetchSetupDur := time.Since(t0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
t0 = time.Now()
|
t0 = time.Now()
|
||||||
data, err := io.ReadAll(rc)
|
written, copyErr := s.blobCache.PutFromReader(blobHash, rc)
|
||||||
readAllDur := time.Since(t0)
|
streamDur := time.Since(t0)
|
||||||
if err != nil {
|
closeErr := rc.Close()
|
||||||
_ = rc.Close()
|
if copyErr != nil {
|
||||||
return nil, fmt.Errorf("reading blob data: %w", err)
|
return copyErr
|
||||||
|
}
|
||||||
|
if closeErr != nil {
|
||||||
|
return closeErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close triggers hash verification
|
log.Debug("Streamed blob into disk cache",
|
||||||
t0 = time.Now()
|
|
||||||
if err := rc.Close(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
closeDur := time.Since(t0)
|
|
||||||
|
|
||||||
log.Debug("Downloaded and decrypted blob (timings)",
|
|
||||||
"hash", blobHash[:16],
|
"hash", blobHash[:16],
|
||||||
"compressed_bytes", expectedSize,
|
"compressed_bytes", expectedSize,
|
||||||
"plaintext_bytes", len(data),
|
"plaintext_bytes", written,
|
||||||
"ms_total", time.Since(start).Milliseconds(),
|
"ms_total", time.Since(start).Milliseconds(),
|
||||||
"ms_fetch_setup", fetchSetupDur.Milliseconds(),
|
"ms_fetch_setup", fetchSetupDur.Milliseconds(),
|
||||||
"ms_read_decrypt_decompress", readAllDur.Milliseconds(),
|
"ms_stream_decrypt_decompress", streamDur.Milliseconds(),
|
||||||
"ms_close_verify", closeDur.Milliseconds(),
|
|
||||||
)
|
)
|
||||||
|
return nil
|
||||||
return data, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// verifyRestoredFiles verifies that all restored files match their expected chunk hashes
|
// verifyRestoredFiles verifies that all restored files match their expected chunk hashes
|
||||||
|
|||||||
315
internal/vaultik/restore_locality_test.go
Normal file
315
internal/vaultik/restore_locality_test.go
Normal file
@@ -0,0 +1,315 @@
|
|||||||
|
package vaultik
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/spf13/afero"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"sneak.berlin/go/vaultik/internal/config"
|
||||||
|
"sneak.berlin/go/vaultik/internal/database"
|
||||||
|
"sneak.berlin/go/vaultik/internal/log"
|
||||||
|
"sneak.berlin/go/vaultik/internal/snapshot"
|
||||||
|
"sneak.berlin/go/vaultik/internal/storage"
|
||||||
|
"sneak.berlin/go/vaultik/internal/ui"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestRestoreLocalityAndReadAt asserts three properties of the restore
|
||||||
|
// hot path that together produce acceptable throughput on real-world
|
||||||
|
// snapshots. All three currently fail on main:
|
||||||
|
//
|
||||||
|
// 1. Peak blob cache occupancy ≤ 1.
|
||||||
|
// Restore order must respect blob locality: every file fully
|
||||||
|
// contained within the currently cached blob should be restored
|
||||||
|
// before any other blob is downloaded. The sweeper then frees
|
||||||
|
// each blob as soon as its file set is exhausted. Without smart
|
||||||
|
// ordering, path-order interleaves blobs and the cache holds
|
||||||
|
// every touched blob until the last file referencing it lands.
|
||||||
|
//
|
||||||
|
// 2. Each remote blob is fetched exactly once.
|
||||||
|
// Counted via wrapping the Storer.
|
||||||
|
//
|
||||||
|
// 3. blobDiskCache.Get is never called during restore.
|
||||||
|
// Chunk extraction from a cached blob must go through ReadAt,
|
||||||
|
// which reads only the chunk's bytes from disk. Get reads the
|
||||||
|
// entire blob (up to 50 GB in production) into memory just to
|
||||||
|
// slice out a few KB — currently the dominant cost in restore.
|
||||||
|
//
|
||||||
|
// The test deliberately constructs an adversarial scenario: three
|
||||||
|
// blobs A/B/C of ~6 MB each, nine files distributed across them, and
|
||||||
|
// path-ordered names that interleave the blobs (a1, b1, c1, a2, b2,
|
||||||
|
// c2, …) so naive path-order processing would touch every blob before
|
||||||
|
// finishing any of them.
|
||||||
|
func TestRestoreLocalityAndReadAt(t *testing.T) {
|
||||||
|
log.Initialize(log.Config{})
|
||||||
|
|
||||||
|
fs := afero.NewOsFs()
|
||||||
|
tempDir, err := os.MkdirTemp("", "vaultik-locality-")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() { _ = os.RemoveAll(tempDir) }()
|
||||||
|
|
||||||
|
dataDir := filepath.Join(tempDir, "source")
|
||||||
|
storeDir := filepath.Join(tempDir, "remote")
|
||||||
|
restoreDir := filepath.Join(tempDir, "restored")
|
||||||
|
dbPath := filepath.Join(tempDir, "index.sqlite")
|
||||||
|
|
||||||
|
require.NoError(t, fs.MkdirAll(dataDir, 0o755))
|
||||||
|
|
||||||
|
// Layout: 15 source files of exactly 1 MiB each. With
|
||||||
|
// chunkSize (avg) = 4 MiB the chunker's minSize is 1 MiB, so any
|
||||||
|
// file of 1 MiB becomes a single chunk. With a 5 MiB blob limit
|
||||||
|
// the packer fits exactly 5 chunks per blob, producing 3 blobs
|
||||||
|
// containing src-001..005, src-006..010, src-011..015.
|
||||||
|
//
|
||||||
|
// Then add 9 "copy" files — byte-for-byte clones of three of the
|
||||||
|
// sources (one from each blob group) — with interleaved names
|
||||||
|
// (cp-001-A, cp-002-B, cp-003-C, cp-004-A, …) so a naive
|
||||||
|
// path-ordered restore would touch all three blobs before
|
||||||
|
// finishing any of them.
|
||||||
|
const (
|
||||||
|
srcBytes = 1024 * 1024
|
||||||
|
srcCount = 15
|
||||||
|
blobsCount = 3
|
||||||
|
perBlob = srcCount / blobsCount
|
||||||
|
)
|
||||||
|
|
||||||
|
type source struct {
|
||||||
|
path string
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
sources := make([]*source, srcCount)
|
||||||
|
for i := 0; i < srcCount; i++ {
|
||||||
|
s := &source{
|
||||||
|
path: fmt.Sprintf("src-%03d.bin", i+1),
|
||||||
|
data: randomBytes(t, srcBytes),
|
||||||
|
}
|
||||||
|
sources[i] = s
|
||||||
|
require.NoError(t, afero.WriteFile(fs, filepath.Join(dataDir, s.path), s.data, 0o644))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pick one representative source per blob group (src-001 → blob
|
||||||
|
// 1, src-006 → blob 2, src-011 → blob 3) and create 3 copies of
|
||||||
|
// each with interleaved alphabetical names.
|
||||||
|
type copyFile struct {
|
||||||
|
path string
|
||||||
|
data []byte
|
||||||
|
sourceBlob int // 0, 1, or 2
|
||||||
|
sourceIndex int // index into sources slice
|
||||||
|
}
|
||||||
|
groupReps := []int{0, perBlob, 2 * perBlob} // 0, 5, 10
|
||||||
|
letters := []byte{'A', 'B', 'C'}
|
||||||
|
var copies []copyFile
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
for j := 0; j < blobsCount; j++ {
|
||||||
|
seq := i*blobsCount + j + 1
|
||||||
|
name := fmt.Sprintf("cp-%03d-%c.bin", seq, letters[j])
|
||||||
|
path := filepath.Join(dataDir, name)
|
||||||
|
src := sources[groupReps[j]]
|
||||||
|
require.NoError(t, afero.WriteFile(fs, path, src.data, 0o644))
|
||||||
|
copies = append(copies, copyFile{path: path, data: src.data, sourceBlob: j, sourceIndex: groupReps[j]})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// chunkSize avg = 4 MiB makes minSize = 1 MiB, so a 1 MiB file
|
||||||
|
// becomes one chunk. maxBlobSize = 5 MiB packs exactly 5 chunks
|
||||||
|
// per blob, yielding 3 blobs from 15 source files.
|
||||||
|
chunkSize := int64(4 * 1024 * 1024)
|
||||||
|
maxBlobSize := int64(5 * 1024 * 1024)
|
||||||
|
|
||||||
|
storer, err := storage.NewFileStorer(storeDir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
agePublicKey := "age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"
|
||||||
|
ageSecretKey := "AGE-SECRET-KEY-19CR5YSFW59HM4TLD6GXVEDMZFTVVF7PPHKUT68TXSFPK7APHXA2QS2NJA5"
|
||||||
|
|
||||||
|
cfg := &config.Config{
|
||||||
|
AgeRecipients: []string{agePublicKey},
|
||||||
|
AgeSecretKey: ageSecretKey,
|
||||||
|
CompressionLevel: 3,
|
||||||
|
Hostname: "test-host",
|
||||||
|
BlobSizeLimit: config.Size(maxBlobSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
db, err := database.New(ctx, dbPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() { _ = db.Close() }()
|
||||||
|
|
||||||
|
repos := database.NewRepositories(db)
|
||||||
|
|
||||||
|
sm := snapshot.NewSnapshotManager(snapshot.SnapshotManagerParams{
|
||||||
|
Repos: repos,
|
||||||
|
Storage: storer,
|
||||||
|
Config: cfg,
|
||||||
|
})
|
||||||
|
sm.SetFilesystem(fs)
|
||||||
|
|
||||||
|
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
||||||
|
FS: fs,
|
||||||
|
Storage: storer,
|
||||||
|
ChunkSize: chunkSize,
|
||||||
|
MaxBlobSize: maxBlobSize,
|
||||||
|
CompressionLevel: cfg.CompressionLevel,
|
||||||
|
AgeRecipients: cfg.AgeRecipients,
|
||||||
|
Repositories: repos,
|
||||||
|
})
|
||||||
|
|
||||||
|
snapshotID, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "locality", "test-version", "test-git")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = scanner.Scan(ctx, dataDir, snapshotID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, sm.CompleteSnapshot(ctx, snapshotID))
|
||||||
|
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, snapshotID))
|
||||||
|
|
||||||
|
blobsOnDisk := listBlobKeys(t, storeDir)
|
||||||
|
t.Logf("backup produced %d blobs", len(blobsOnDisk))
|
||||||
|
require.GreaterOrEqual(t, len(blobsOnDisk), 3, "expected at least 3 blobs from 3 filler groups")
|
||||||
|
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
// Wrap the storer so we can count downloads per blob key.
|
||||||
|
counter := newCountingStorer(storer)
|
||||||
|
|
||||||
|
// Capture the restore-side cache for instrumentation inspection.
|
||||||
|
// The observer fires twice (immediately after creation and
|
||||||
|
// immediately before close) so we read PeakLen and call counters
|
||||||
|
// from the same instance the production code used.
|
||||||
|
var cacheRef *blobDiskCache
|
||||||
|
v := &Vaultik{
|
||||||
|
Config: cfg,
|
||||||
|
Storage: counter,
|
||||||
|
Fs: fs,
|
||||||
|
Stdout: io.Discard,
|
||||||
|
Stderr: io.Discard,
|
||||||
|
UI: ui.NewWithColor(io.Discard, false),
|
||||||
|
restoreCacheObserver: func(c *blobDiskCache) {
|
||||||
|
cacheRef = c
|
||||||
|
},
|
||||||
|
}
|
||||||
|
v.SetContext(ctx)
|
||||||
|
|
||||||
|
require.NoError(t, v.Restore(&RestoreOptions{
|
||||||
|
SnapshotID: snapshotID,
|
||||||
|
TargetDir: restoreDir,
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.NotNil(t, cacheRef, "restoreCacheObserver must fire during restore")
|
||||||
|
|
||||||
|
// Verify restored content matches.
|
||||||
|
for _, s := range sources {
|
||||||
|
restored := filepath.Join(restoreDir, dataDir, s.path)
|
||||||
|
got, err := afero.ReadFile(fs, restored)
|
||||||
|
require.NoErrorf(t, err, "source missing after restore: %s", s.path)
|
||||||
|
require.Truef(t, bytes.Equal(got, s.data), "byte mismatch for source %s", s.path)
|
||||||
|
}
|
||||||
|
for _, c := range copies {
|
||||||
|
restored := filepath.Join(restoreDir, c.path)
|
||||||
|
got, err := afero.ReadFile(fs, restored)
|
||||||
|
require.NoErrorf(t, err, "copy missing after restore: %s", c.path)
|
||||||
|
require.Truef(t, bytes.Equal(got, c.data), "byte mismatch for copy %s", c.path)
|
||||||
|
}
|
||||||
|
|
||||||
|
// (1) Each blob fetched exactly once.
|
||||||
|
for key, n := range counter.snapshot() {
|
||||||
|
if !filterBlobKey(key) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
assert.Equalf(t, 1, n, "blob %s fetched %d times, want exactly 1", key, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// (2) Peak cache size ≤ 1. The sweeper plus locality-aware
|
||||||
|
// ordering should free each blob before the next one downloads.
|
||||||
|
assert.LessOrEqualf(t, cacheRef.PeakLen(), 1,
|
||||||
|
"peak cached blobs was %d; expected ≤ 1 with locality-ordered restore", cacheRef.PeakLen())
|
||||||
|
|
||||||
|
// (3) Cache.Get must never be called during restore — chunk
|
||||||
|
// extraction has to go through ReadAt so we never read the whole
|
||||||
|
// blob from disk to grab a few KB slice.
|
||||||
|
assert.Equalf(t, 0, cacheRef.GetCalls(),
|
||||||
|
"blobDiskCache.Get was called %d times during restore; restore must use ReadAt exclusively", cacheRef.GetCalls())
|
||||||
|
|
||||||
|
t.Logf("blob cache stats: peak_len=%d get_calls=%d readat_calls=%d",
|
||||||
|
cacheRef.PeakLen(), cacheRef.GetCalls(), cacheRef.ReadAtCalls())
|
||||||
|
}
|
||||||
|
|
||||||
|
// randomBytes returns n bytes of random data. Used to make sure the
|
||||||
|
// chunker picks non-degenerate FastCDC boundaries.
|
||||||
|
func randomBytes(t *testing.T, n int) []byte {
|
||||||
|
t.Helper()
|
||||||
|
b := make([]byte, n)
|
||||||
|
_, err := rand.Read(b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
// listBlobKeys walks the FileStorer blobs/ tree and returns the
|
||||||
|
// relative keys for every blob file present.
|
||||||
|
func listBlobKeys(t *testing.T, storeDir string) []string {
|
||||||
|
t.Helper()
|
||||||
|
var keys []string
|
||||||
|
root := filepath.Join(storeDir, "blobs")
|
||||||
|
err := filepath.Walk(root, func(p string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if info.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rel, _ := filepath.Rel(storeDir, p)
|
||||||
|
keys = append(keys, rel)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
sort.Strings(keys)
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
|
||||||
|
// filterBlobKey returns true when key looks like a blob storage path
|
||||||
|
// (rather than a snapshot metadata path).
|
||||||
|
func filterBlobKey(key string) bool {
|
||||||
|
return len(key) > 6 && key[:6] == "blobs/"
|
||||||
|
}
|
||||||
|
|
||||||
|
// countingStorerInternal wraps a storage.Storer and records the number
|
||||||
|
// of Get calls per key, so the locality test can assert each blob is
|
||||||
|
// fetched exactly once. Defined here (rather than reusing the one in
|
||||||
|
// the integration_test package) because this test lives in package
|
||||||
|
// vaultik for access to unexported cache internals.
|
||||||
|
type countingStorerInternal struct {
|
||||||
|
storage.Storer
|
||||||
|
mu sync.Mutex
|
||||||
|
counts map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCountingStorer(inner storage.Storer) *countingStorerInternal {
|
||||||
|
return &countingStorerInternal{Storer: inner, counts: make(map[string]int)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *countingStorerInternal) Get(ctx context.Context, key string) (io.ReadCloser, error) {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.counts[key]++
|
||||||
|
c.mu.Unlock()
|
||||||
|
return c.Storer.Get(ctx, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *countingStorerInternal) snapshot() map[string]int {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
out := make(map[string]int, len(c.counts))
|
||||||
|
for k, v := range c.counts {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
185
internal/vaultik/restore_plan.go
Normal file
185
internal/vaultik/restore_plan.go
Normal file
@@ -0,0 +1,185 @@
|
|||||||
|
package vaultik
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"sneak.berlin/go/vaultik/internal/database"
|
||||||
|
"sneak.berlin/go/vaultik/internal/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// restorePlan orders restore-time file processing by blob locality. The
|
||||||
|
// goal is to keep the blob disk cache occupancy as small as possible:
|
||||||
|
// download one blob, drain every file referencing only that blob, let
|
||||||
|
// the sweeper free the blob, then move on. Files that span multiple
|
||||||
|
// blobs are processed when their full blob set is on disk.
|
||||||
|
//
|
||||||
|
// The plan keeps two indexes:
|
||||||
|
//
|
||||||
|
// - fileBlobs: for each pending file, the set of blob hashes it
|
||||||
|
// still needs that are NOT yet in the cache. Files with an empty
|
||||||
|
// set are "ready" — they can be restored from the current cache
|
||||||
|
// with no further downloads.
|
||||||
|
// - blobFiles: for each blob, the set of pending files referencing
|
||||||
|
// it. Used to short-circuit "when this blob lands, which files
|
||||||
|
// become ready" without a global scan.
|
||||||
|
type restorePlan struct {
|
||||||
|
fileBlobs map[types.FileID]map[string]struct{}
|
||||||
|
blobFiles map[string]map[types.FileID]struct{}
|
||||||
|
ready []types.FileID
|
||||||
|
cached map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newRestorePlan builds the file→blob index for the given files. Files
|
||||||
|
// whose chunks reference no blobs (symlinks, directories) start in the
|
||||||
|
// ready queue immediately.
|
||||||
|
func newRestorePlan(
|
||||||
|
ctx context.Context,
|
||||||
|
repos *database.Repositories,
|
||||||
|
files []*database.File,
|
||||||
|
chunkToBlobMap map[string]*database.BlobChunk,
|
||||||
|
blobIDToHash map[string]string,
|
||||||
|
) (*restorePlan, error) {
|
||||||
|
p := &restorePlan{
|
||||||
|
fileBlobs: make(map[types.FileID]map[string]struct{}, len(files)),
|
||||||
|
blobFiles: make(map[string]map[types.FileID]struct{}),
|
||||||
|
ready: make([]types.FileID, 0, len(files)),
|
||||||
|
cached: make(map[string]struct{}),
|
||||||
|
}
|
||||||
|
for _, f := range files {
|
||||||
|
if f.IsSymlink() || f.Mode&uint32(os.ModeDir) != 0 {
|
||||||
|
// No chunks to fetch — restore can run immediately.
|
||||||
|
p.fileBlobs[f.ID] = nil
|
||||||
|
p.ready = append(p.ready, f.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fileChunks, err := repos.FileChunks.GetByFileID(ctx, f.ID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("planning %s: %w", f.Path, err)
|
||||||
|
}
|
||||||
|
blobs := make(map[string]struct{})
|
||||||
|
for _, fc := range fileChunks {
|
||||||
|
bc, ok := chunkToBlobMap[fc.ChunkHash.String()]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("planning %s: chunk %s missing from blob map",
|
||||||
|
f.Path, fc.ChunkHash.String()[:16])
|
||||||
|
}
|
||||||
|
hash, ok := blobIDToHash[bc.BlobID.String()]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("planning %s: blob id %s missing from id-to-hash map",
|
||||||
|
f.Path, bc.BlobID)
|
||||||
|
}
|
||||||
|
blobs[hash] = struct{}{}
|
||||||
|
}
|
||||||
|
p.fileBlobs[f.ID] = blobs
|
||||||
|
for hash := range blobs {
|
||||||
|
set, ok := p.blobFiles[hash]
|
||||||
|
if !ok {
|
||||||
|
set = make(map[types.FileID]struct{})
|
||||||
|
p.blobFiles[hash] = set
|
||||||
|
}
|
||||||
|
set[f.ID] = struct{}{}
|
||||||
|
}
|
||||||
|
if len(blobs) == 0 {
|
||||||
|
p.ready = append(p.ready, f.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// markBlobCached records that the named blob is now resident in the
|
||||||
|
// disk cache and moves any pending file whose remaining-uncached-blobs
|
||||||
|
// set just dropped to empty onto the ready queue.
|
||||||
|
func (p *restorePlan) markBlobCached(blobHash string) {
|
||||||
|
if _, already := p.cached[blobHash]; already {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.cached[blobHash] = struct{}{}
|
||||||
|
for fileID := range p.blobFiles[blobHash] {
|
||||||
|
blobs := p.fileBlobs[fileID]
|
||||||
|
delete(blobs, blobHash)
|
||||||
|
if len(blobs) == 0 {
|
||||||
|
p.ready = append(p.ready, fileID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// popReady returns the next ready file, removing it from the queue. If
|
||||||
|
// no file is ready, the second return value is false.
|
||||||
|
func (p *restorePlan) popReady() (types.FileID, bool) {
|
||||||
|
if len(p.ready) == 0 {
|
||||||
|
return types.FileID{}, false
|
||||||
|
}
|
||||||
|
id := p.ready[0]
|
||||||
|
p.ready = p.ready[1:]
|
||||||
|
return id, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// finishFile drops a restored file from both indexes so subsequent
|
||||||
|
// planning calls don't reconsider it.
|
||||||
|
func (p *restorePlan) finishFile(fileID types.FileID) {
|
||||||
|
for hash := range p.fileBlobs[fileID] {
|
||||||
|
if set, ok := p.blobFiles[hash]; ok {
|
||||||
|
delete(set, fileID)
|
||||||
|
if len(set) == 0 {
|
||||||
|
delete(p.blobFiles, hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(p.fileBlobs, fileID)
|
||||||
|
// Also scrub the file from any blobFiles entries where it might
|
||||||
|
// still appear even after its uncached-blob set was emptied.
|
||||||
|
for hash, set := range p.blobFiles {
|
||||||
|
if _, ok := set[fileID]; ok {
|
||||||
|
delete(set, fileID)
|
||||||
|
if len(set) == 0 {
|
||||||
|
delete(p.blobFiles, hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pickNextDownload returns the pending file whose remaining-uncached
|
||||||
|
// blob set is smallest (with ties broken by FileID string compare so
|
||||||
|
// the choice is deterministic across runs). This file's blobs are
|
||||||
|
// downloaded next, after which it — together with any other pending
|
||||||
|
// files whose blob sets become empty — moves to the ready queue.
|
||||||
|
//
|
||||||
|
// The zero FileID return means nothing is pending.
|
||||||
|
func (p *restorePlan) pickNextDownload() types.FileID {
|
||||||
|
var best types.FileID
|
||||||
|
bestCount := math.MaxInt
|
||||||
|
var bestID string
|
||||||
|
for id, blobs := range p.fileBlobs {
|
||||||
|
n := len(blobs)
|
||||||
|
if n == 0 {
|
||||||
|
// Already-ready files should have been popped via
|
||||||
|
// popReady; ignore here just in case.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idStr := id.String()
|
||||||
|
if n < bestCount || (n == bestCount && (best.IsZero() || idStr < bestID)) {
|
||||||
|
best = id
|
||||||
|
bestCount = n
|
||||||
|
bestID = idStr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return best
|
||||||
|
}
|
||||||
|
|
||||||
|
// blobsNeeded returns the uncached blob hashes for fileID in any order.
|
||||||
|
func (p *restorePlan) blobsNeeded(fileID types.FileID) []string {
|
||||||
|
blobs := p.fileBlobs[fileID]
|
||||||
|
out := make([]string, 0, len(blobs))
|
||||||
|
for h := range blobs {
|
||||||
|
out = append(out, h)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// hasPending reports whether any unfinished files remain.
|
||||||
|
func (p *restorePlan) hasPending() bool {
|
||||||
|
return len(p.fileBlobs) > 0
|
||||||
|
}
|
||||||
@@ -44,6 +44,13 @@ type Vaultik struct {
|
|||||||
// writer wrapping Stdout; the cli layer replaces it with a discarding
|
// writer wrapping Stdout; the cli layer replaces it with a discarding
|
||||||
// writer in --cron mode.
|
// writer in --cron mode.
|
||||||
UI *ui.Writer
|
UI *ui.Writer
|
||||||
|
|
||||||
|
// restoreCacheObserver, if non-nil, is invoked once with the
|
||||||
|
// restore-side blob disk cache immediately after the cache is
|
||||||
|
// created and again immediately before it is closed. Only
|
||||||
|
// internal-package tests set this; the type is unexported so
|
||||||
|
// callers outside this package can't reach it.
|
||||||
|
restoreCacheObserver func(*blobDiskCache)
|
||||||
}
|
}
|
||||||
|
|
||||||
// VaultikParams contains all parameters for New that can be provided by fx
|
// VaultikParams contains all parameters for New that can be provided by fx
|
||||||
|
|||||||
Reference in New Issue
Block a user