Merge fix/restore-blob-cache-eviction
All checks were successful
check / check (push) Successful in 1m57s
All checks were successful
check / check (push) Successful in 1m57s
This commit is contained in:
@@ -181,6 +181,34 @@ func (c *blobDiskCache) Has(key string) bool {
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete removes a blob from the cache and its disk file. No-op if absent.
|
||||||
|
// Used by restore's sweep logic to free blobs whose chunks have all been
|
||||||
|
// restored (so they will never be needed again during this restore).
|
||||||
|
func (c *blobDiskCache) Delete(key string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
e, ok := c.items[key]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.unlink(e)
|
||||||
|
delete(c.items, key)
|
||||||
|
c.curBytes -= e.size
|
||||||
|
_ = os.Remove(c.path(key))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keys returns a snapshot of all cached keys. Safe for iteration without
|
||||||
|
// holding the cache lock; the cache may change concurrently.
|
||||||
|
func (c *blobDiskCache) Keys() []string {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
keys := make([]string, 0, len(c.items))
|
||||||
|
for k := range c.items {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
|
||||||
// Size returns current total cached bytes.
|
// Size returns current total cached bytes.
|
||||||
func (c *blobDiskCache) Size() int64 {
|
func (c *blobDiskCache) Size() int64 {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
@@ -176,12 +177,22 @@ func (v *Vaultik) restoreAllFiles(
|
|||||||
chunkToBlobMap map[string]*database.BlobChunk,
|
chunkToBlobMap map[string]*database.BlobChunk,
|
||||||
) (*RestoreResult, error) {
|
) (*RestoreResult, error) {
|
||||||
result := &RestoreResult{}
|
result := &RestoreResult{}
|
||||||
blobCache, err := newBlobDiskCache(4 * v.Config.BlobSizeLimit.Int64())
|
|
||||||
|
// The restore-side blob cache is unbounded — restores may read any
|
||||||
|
// blob many times across deduplicated files and we want to avoid
|
||||||
|
// re-downloading until we can prove a blob is no longer needed.
|
||||||
|
// Cleanup is driven by the sweeper below, not by LRU.
|
||||||
|
blobCache, err := newBlobDiskCache(math.MaxInt64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("creating blob cache: %w", err)
|
return nil, fmt.Errorf("creating blob cache: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = blobCache.Close() }()
|
defer func() { _ = blobCache.Close() }()
|
||||||
|
|
||||||
|
// Per-restore sweep state: every blob_size_limit/100 bytes written,
|
||||||
|
// scan the cache and delete any blob whose remaining file references
|
||||||
|
// are all already restored.
|
||||||
|
sweeper := newRestoreSweeper(v.ctx, repos, blobCache, v.Config.BlobSizeLimit.Int64()/100)
|
||||||
|
|
||||||
// Calculate total bytes for progress bar
|
// Calculate total bytes for progress bar
|
||||||
var totalBytesExpected int64
|
var totalBytesExpected int64
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
@@ -196,7 +207,7 @@ func (v *Vaultik) restoreAllFiles(
|
|||||||
return nil, v.ctx.Err()
|
return nil, v.ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, result); err != nil {
|
if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, sweeper, result); err != nil {
|
||||||
log.Error("Failed to restore file", "path", file.Path, "error", err)
|
log.Error("Failed to restore file", "path", file.Path, "error", err)
|
||||||
if !opts.SkipErrors {
|
if !opts.SkipErrors {
|
||||||
return nil, fmt.Errorf("restoring %s: %w (pass --skip-errors to continue past restore failures)", file.Path, err)
|
return nil, fmt.Errorf("restoring %s: %w (pass --skip-errors to continue past restore failures)", file.Path, err)
|
||||||
@@ -211,6 +222,10 @@ func (v *Vaultik) restoreAllFiles(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Record the file as restored so the sweeper can free blobs once
|
||||||
|
// all referencing files are done.
|
||||||
|
sweeper.fileRestored(file.ID.String())
|
||||||
|
|
||||||
// Update progress bar
|
// Update progress bar
|
||||||
if bar != nil {
|
if bar != nil {
|
||||||
_ = bar.Add64(file.Size)
|
_ = bar.Add64(file.Size)
|
||||||
@@ -388,6 +403,7 @@ func (v *Vaultik) restoreFile(
|
|||||||
identity age.Identity,
|
identity age.Identity,
|
||||||
chunkToBlobMap map[string]*database.BlobChunk,
|
chunkToBlobMap map[string]*database.BlobChunk,
|
||||||
blobCache *blobDiskCache,
|
blobCache *blobDiskCache,
|
||||||
|
sweeper *restoreSweeper,
|
||||||
result *RestoreResult,
|
result *RestoreResult,
|
||||||
) error {
|
) error {
|
||||||
// Calculate target path - use full original path under target directory
|
// Calculate target path - use full original path under target directory
|
||||||
@@ -410,7 +426,7 @@ func (v *Vaultik) restoreFile(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Handle regular files
|
// Handle regular files
|
||||||
return v.restoreRegularFile(ctx, repos, file, targetPath, identity, chunkToBlobMap, blobCache, result)
|
return v.restoreRegularFile(ctx, repos, file, targetPath, identity, chunkToBlobMap, blobCache, sweeper, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
// restoreSymlink restores a symbolic link
|
// restoreSymlink restores a symbolic link
|
||||||
@@ -472,6 +488,7 @@ func (v *Vaultik) restoreRegularFile(
|
|||||||
identity age.Identity,
|
identity age.Identity,
|
||||||
chunkToBlobMap map[string]*database.BlobChunk,
|
chunkToBlobMap map[string]*database.BlobChunk,
|
||||||
blobCache *blobDiskCache,
|
blobCache *blobDiskCache,
|
||||||
|
sweeper *restoreSweeper,
|
||||||
result *RestoreResult,
|
result *RestoreResult,
|
||||||
) error {
|
) error {
|
||||||
// Get file chunks in order
|
// Get file chunks in order
|
||||||
@@ -531,6 +548,11 @@ func (v *Vaultik) restoreRegularFile(
|
|||||||
return fmt.Errorf("writing chunk: %w", err)
|
return fmt.Errorf("writing chunk: %w", err)
|
||||||
}
|
}
|
||||||
bytesWritten += int64(n)
|
bytesWritten += int64(n)
|
||||||
|
|
||||||
|
// Tell the sweeper about the bytes we just restored so it can
|
||||||
|
// run an eviction sweep once the accumulated total crosses its
|
||||||
|
// threshold (config.BlobSizeLimit/100).
|
||||||
|
sweeper.chunkRestored(int64(n))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close file before setting metadata
|
// Close file before setting metadata
|
||||||
|
|||||||
118
internal/vaultik/restore_sweeper.go
Normal file
118
internal/vaultik/restore_sweeper.go
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
package vaultik
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"sneak.berlin/go/vaultik/internal/database"
|
||||||
|
"sneak.berlin/go/vaultik/internal/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// restoreSweeper frees cached blobs once all files that reference any of
|
||||||
|
// their chunks have been restored. It works as follows:
|
||||||
|
//
|
||||||
|
// 1. Callers add a file's ID to an in-memory restored set via
|
||||||
|
// fileRestored once the file is fully written to disk.
|
||||||
|
// 2. After each chunk is restored, chunkRestored accumulates a running
|
||||||
|
// byte count.
|
||||||
|
// 3. When the accumulator crosses a threshold (one hundredth of the
|
||||||
|
// configured blob size — so a sweep runs about a hundred times per
|
||||||
|
// blob's worth of restored bytes), the sweeper iterates every key in
|
||||||
|
// the cache. For each cached blob it asks the DB which files
|
||||||
|
// reference any chunk in that blob, then compares that list against
|
||||||
|
// the in-memory restored set. If any referencing file is missing
|
||||||
|
// from the set the blob is kept; otherwise the cache entry is
|
||||||
|
// deleted.
|
||||||
|
//
|
||||||
|
// All DB reads happen against the snapshot's temporary metadata DB,
|
||||||
|
// which is local, indexed, and not under contention — the queries are
|
||||||
|
// cheap and run at most once per blob per sweep interval.
|
||||||
|
type restoreSweeper struct {
|
||||||
|
ctx context.Context
|
||||||
|
repos *database.Repositories
|
||||||
|
cache *blobDiskCache
|
||||||
|
threshold int64
|
||||||
|
bytesAccum int64
|
||||||
|
restored map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newRestoreSweeper returns a sweeper that triggers eviction every
|
||||||
|
// `threshold` bytes restored. Callers should pass blob_size_limit/100.
|
||||||
|
func newRestoreSweeper(ctx context.Context, repos *database.Repositories, cache *blobDiskCache, threshold int64) *restoreSweeper {
|
||||||
|
if threshold <= 0 {
|
||||||
|
threshold = 1
|
||||||
|
}
|
||||||
|
return &restoreSweeper{
|
||||||
|
ctx: ctx,
|
||||||
|
repos: repos,
|
||||||
|
cache: cache,
|
||||||
|
threshold: threshold,
|
||||||
|
restored: make(map[string]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fileRestored records a file as fully restored. After this call, any
|
||||||
|
// blob whose only remaining references come from files in the restored
|
||||||
|
// set may be evicted on the next sweep.
|
||||||
|
func (s *restoreSweeper) fileRestored(fileID string) {
|
||||||
|
s.restored[fileID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// chunkRestored accounts n bytes against the sweep threshold and runs a
|
||||||
|
// sweep if the threshold has been crossed since the last sweep.
|
||||||
|
func (s *restoreSweeper) chunkRestored(n int64) {
|
||||||
|
s.bytesAccum += n
|
||||||
|
if s.bytesAccum < s.threshold {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.bytesAccum = 0
|
||||||
|
s.sweep()
|
||||||
|
}
|
||||||
|
|
||||||
|
// sweep deletes any cached blob whose chunks are no longer referenced
|
||||||
|
// by an unrestored file. Per-blob DB failures are logged and the blob
|
||||||
|
// is kept — we'd rather hold a blob longer than risk a re-download.
|
||||||
|
func (s *restoreSweeper) sweep() {
|
||||||
|
for _, blobHash := range s.cache.Keys() {
|
||||||
|
needed, err := s.blobStillNeeded(blobHash)
|
||||||
|
if err != nil {
|
||||||
|
log.Debug("sweeper referencing-files query failed", "blob_hash", blobHash[:16], "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !needed {
|
||||||
|
s.cache.Delete(blobHash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// blobStillNeeded returns true if any file that references a chunk in
|
||||||
|
// this blob has not yet been restored. On any error the function
|
||||||
|
// returns true — keeping the blob is always the safe answer because we
|
||||||
|
// can't prove we're done with it.
|
||||||
|
func (s *restoreSweeper) blobStillNeeded(blobHash string) (bool, error) {
|
||||||
|
rows, err := s.repos.DB().Conn().QueryContext(s.ctx, `
|
||||||
|
SELECT DISTINCT fc.file_id
|
||||||
|
FROM file_chunks fc
|
||||||
|
JOIN blob_chunks bc ON bc.chunk_hash = fc.chunk_hash
|
||||||
|
JOIN blobs b ON b.id = bc.blob_id
|
||||||
|
WHERE b.blob_hash = ?
|
||||||
|
`, blobHash)
|
||||||
|
if err != nil {
|
||||||
|
return true, fmt.Errorf("querying referencing files: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = rows.Close() }()
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var fileID string
|
||||||
|
if err := rows.Scan(&fileID); err != nil {
|
||||||
|
return true, fmt.Errorf("scanning file_id: %w", err)
|
||||||
|
}
|
||||||
|
if _, ok := s.restored[fileID]; !ok {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user