diff --git a/internal/vaultik/blobcache.go b/internal/vaultik/blobcache.go index cdcee69..397d366 100644 --- a/internal/vaultik/blobcache.go +++ b/internal/vaultik/blobcache.go @@ -181,6 +181,34 @@ func (c *blobDiskCache) Has(key string) bool { return ok } +// Delete removes a blob from the cache and its disk file. No-op if absent. +// Used by restore's sweep logic to free blobs whose chunks have all been +// restored (so they will never be needed again during this restore). +func (c *blobDiskCache) Delete(key string) { + c.mu.Lock() + defer c.mu.Unlock() + e, ok := c.items[key] + if !ok { + return + } + c.unlink(e) + delete(c.items, key) + c.curBytes -= e.size + _ = os.Remove(c.path(key)) +} + +// Keys returns a snapshot of all cached keys. Safe for iteration without +// holding the cache lock; the cache may change concurrently. +func (c *blobDiskCache) Keys() []string { + c.mu.Lock() + defer c.mu.Unlock() + keys := make([]string, 0, len(c.items)) + for k := range c.items { + keys = append(keys, k) + } + return keys +} + // Size returns current total cached bytes. func (c *blobDiskCache) Size() int64 { c.mu.Lock() diff --git a/internal/vaultik/restore.go b/internal/vaultik/restore.go index 8233f20..17d7b3c 100644 --- a/internal/vaultik/restore.go +++ b/internal/vaultik/restore.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "fmt" "io" + "math" "os" "path/filepath" "time" @@ -176,12 +177,22 @@ func (v *Vaultik) restoreAllFiles( chunkToBlobMap map[string]*database.BlobChunk, ) (*RestoreResult, error) { result := &RestoreResult{} - blobCache, err := newBlobDiskCache(4 * v.Config.BlobSizeLimit.Int64()) + + // The restore-side blob cache is unbounded — restores may read any + // blob many times across deduplicated files and we want to avoid + // re-downloading until we can prove a blob is no longer needed. + // Cleanup is driven by the sweeper below, not by LRU. + blobCache, err := newBlobDiskCache(math.MaxInt64) if err != nil { return nil, fmt.Errorf("creating blob cache: %w", err) } defer func() { _ = blobCache.Close() }() + // Per-restore sweep state: every blob_size_limit/100 bytes written, + // scan the cache and delete any blob whose remaining file references + // are all already restored. + sweeper := newRestoreSweeper(v.ctx, repos, blobCache, v.Config.BlobSizeLimit.Int64()/100) + // Calculate total bytes for progress bar var totalBytesExpected int64 for _, file := range files { @@ -196,7 +207,7 @@ func (v *Vaultik) restoreAllFiles( return nil, v.ctx.Err() } - if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, result); err != nil { + if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, sweeper, result); err != nil { log.Error("Failed to restore file", "path", file.Path, "error", err) if !opts.SkipErrors { return nil, fmt.Errorf("restoring %s: %w (pass --skip-errors to continue past restore failures)", file.Path, err) @@ -211,6 +222,10 @@ func (v *Vaultik) restoreAllFiles( continue } + // Record the file as restored so the sweeper can free blobs once + // all referencing files are done. + sweeper.fileRestored(file.ID.String()) + // Update progress bar if bar != nil { _ = bar.Add64(file.Size) @@ -388,6 +403,7 @@ func (v *Vaultik) restoreFile( identity age.Identity, chunkToBlobMap map[string]*database.BlobChunk, blobCache *blobDiskCache, + sweeper *restoreSweeper, result *RestoreResult, ) error { // Calculate target path - use full original path under target directory @@ -410,7 +426,7 @@ func (v *Vaultik) restoreFile( } // Handle regular files - return v.restoreRegularFile(ctx, repos, file, targetPath, identity, chunkToBlobMap, blobCache, result) + return v.restoreRegularFile(ctx, repos, file, targetPath, identity, chunkToBlobMap, blobCache, sweeper, result) } // restoreSymlink restores a symbolic link @@ -472,6 +488,7 @@ func (v *Vaultik) restoreRegularFile( identity age.Identity, chunkToBlobMap map[string]*database.BlobChunk, blobCache *blobDiskCache, + sweeper *restoreSweeper, result *RestoreResult, ) error { // Get file chunks in order @@ -531,6 +548,11 @@ func (v *Vaultik) restoreRegularFile( return fmt.Errorf("writing chunk: %w", err) } bytesWritten += int64(n) + + // Tell the sweeper about the bytes we just restored so it can + // run an eviction sweep once the accumulated total crosses its + // threshold (config.BlobSizeLimit/100). + sweeper.chunkRestored(int64(n)) } // Close file before setting metadata diff --git a/internal/vaultik/restore_sweeper.go b/internal/vaultik/restore_sweeper.go new file mode 100644 index 0000000..2594844 --- /dev/null +++ b/internal/vaultik/restore_sweeper.go @@ -0,0 +1,118 @@ +package vaultik + +import ( + "context" + "fmt" + + "sneak.berlin/go/vaultik/internal/database" + "sneak.berlin/go/vaultik/internal/log" +) + +// restoreSweeper frees cached blobs once all files that reference any of +// their chunks have been restored. It works as follows: +// +// 1. Callers add a file's ID to an in-memory restored set via +// fileRestored once the file is fully written to disk. +// 2. After each chunk is restored, chunkRestored accumulates a running +// byte count. +// 3. When the accumulator crosses a threshold (one hundredth of the +// configured blob size — so a sweep runs about a hundred times per +// blob's worth of restored bytes), the sweeper iterates every key in +// the cache. For each cached blob it asks the DB which files +// reference any chunk in that blob, then compares that list against +// the in-memory restored set. If any referencing file is missing +// from the set the blob is kept; otherwise the cache entry is +// deleted. +// +// All DB reads happen against the snapshot's temporary metadata DB, +// which is local, indexed, and not under contention — the queries are +// cheap and run at most once per blob per sweep interval. +type restoreSweeper struct { + ctx context.Context + repos *database.Repositories + cache *blobDiskCache + threshold int64 + bytesAccum int64 + restored map[string]struct{} +} + +// newRestoreSweeper returns a sweeper that triggers eviction every +// `threshold` bytes restored. Callers should pass blob_size_limit/100. +func newRestoreSweeper(ctx context.Context, repos *database.Repositories, cache *blobDiskCache, threshold int64) *restoreSweeper { + if threshold <= 0 { + threshold = 1 + } + return &restoreSweeper{ + ctx: ctx, + repos: repos, + cache: cache, + threshold: threshold, + restored: make(map[string]struct{}), + } +} + +// fileRestored records a file as fully restored. After this call, any +// blob whose only remaining references come from files in the restored +// set may be evicted on the next sweep. +func (s *restoreSweeper) fileRestored(fileID string) { + s.restored[fileID] = struct{}{} +} + +// chunkRestored accounts n bytes against the sweep threshold and runs a +// sweep if the threshold has been crossed since the last sweep. +func (s *restoreSweeper) chunkRestored(n int64) { + s.bytesAccum += n + if s.bytesAccum < s.threshold { + return + } + s.bytesAccum = 0 + s.sweep() +} + +// sweep deletes any cached blob whose chunks are no longer referenced +// by an unrestored file. Per-blob DB failures are logged and the blob +// is kept — we'd rather hold a blob longer than risk a re-download. +func (s *restoreSweeper) sweep() { + for _, blobHash := range s.cache.Keys() { + needed, err := s.blobStillNeeded(blobHash) + if err != nil { + log.Debug("sweeper referencing-files query failed", "blob_hash", blobHash[:16], "error", err) + continue + } + if !needed { + s.cache.Delete(blobHash) + } + } +} + +// blobStillNeeded returns true if any file that references a chunk in +// this blob has not yet been restored. On any error the function +// returns true — keeping the blob is always the safe answer because we +// can't prove we're done with it. +func (s *restoreSweeper) blobStillNeeded(blobHash string) (bool, error) { + rows, err := s.repos.DB().Conn().QueryContext(s.ctx, ` + SELECT DISTINCT fc.file_id + FROM file_chunks fc + JOIN blob_chunks bc ON bc.chunk_hash = fc.chunk_hash + JOIN blobs b ON b.id = bc.blob_id + WHERE b.blob_hash = ? + `, blobHash) + if err != nil { + return true, fmt.Errorf("querying referencing files: %w", err) + } + defer func() { _ = rows.Close() }() + + for rows.Next() { + var fileID string + if err := rows.Scan(&fileID); err != nil { + return true, fmt.Errorf("scanning file_id: %w", err) + } + if _, ok := s.restored[fileID]; !ok { + return true, nil + } + } + if err := rows.Err(); err != nil { + return true, err + } + return false, nil +}