Bound blob cache during restore with LRU eviction (closes #29) #34
83
internal/vaultik/blobcache.go
Normal file
83
internal/vaultik/blobcache.go
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
package vaultik
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/list"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// defaultMaxBlobCacheBytes is the default maximum size of the blob cache (1 GB).
|
||||||
|
const defaultMaxBlobCacheBytes = 1 << 30 // 1 GiB
|
||||||
|
|
||||||
|
// blobCacheEntry is an entry in the LRU blob cache.
|
||||||
|
type blobCacheEntry struct {
|
||||||
|
key string
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// blobLRUCache is a simple LRU cache for downloaded blob data, bounded by
|
||||||
|
// total byte size to prevent memory exhaustion during large restores.
|
||||||
|
type blobLRUCache struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
maxBytes int64
|
||||||
|
curBytes int64
|
||||||
|
ll *list.List
|
||||||
|
items map[string]*list.Element
|
||||||
|
}
|
||||||
|
|
||||||
|
// newBlobLRUCache creates a new LRU blob cache with the given maximum size in bytes.
|
||||||
|
func newBlobLRUCache(maxBytes int64) *blobLRUCache {
|
||||||
|
return &blobLRUCache{
|
||||||
|
maxBytes: maxBytes,
|
||||||
|
ll: list.New(),
|
||||||
|
items: make(map[string]*list.Element),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the blob data for the given key, or nil if not cached.
|
||||||
|
func (c *blobLRUCache) Get(key string) ([]byte, bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if ele, ok := c.items[key]; ok {
|
||||||
|
c.ll.MoveToFront(ele)
|
||||||
|
return ele.Value.(*blobCacheEntry).data, true
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put adds a blob to the cache, evicting least-recently-used entries if needed.
|
||||||
|
func (c *blobLRUCache) Put(key string, data []byte) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
entrySize := int64(len(data))
|
||||||
|
|
||||||
|
// If this single entry exceeds max, don't cache it
|
||||||
|
if entrySize > c.maxBytes {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If already present, update in place
|
||||||
|
if ele, ok := c.items[key]; ok {
|
||||||
|
c.ll.MoveToFront(ele)
|
||||||
|
old := ele.Value.(*blobCacheEntry)
|
||||||
|
c.curBytes += entrySize - int64(len(old.data))
|
||||||
|
old.data = data
|
||||||
|
} else {
|
||||||
|
ele := c.ll.PushFront(&blobCacheEntry{key: key, data: data})
|
||||||
|
c.items[key] = ele
|
||||||
|
c.curBytes += entrySize
|
||||||
|
}
|
||||||
|
|
||||||
|
// Evict from back until under limit
|
||||||
|
for c.curBytes > c.maxBytes && c.ll.Len() > 0 {
|
||||||
|
oldest := c.ll.Back()
|
||||||
|
if oldest == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
entry := oldest.Value.(*blobCacheEntry)
|
||||||
|
c.ll.Remove(oldest)
|
||||||
|
delete(c.items, entry.key)
|
||||||
|
c.curBytes -= int64(len(entry.data))
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -109,7 +109,7 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
|
|||||||
|
|
||||||
// Step 5: Restore files
|
// Step 5: Restore files
|
||||||
result := &RestoreResult{}
|
result := &RestoreResult{}
|
||||||
blobCache := make(map[string][]byte) // Cache downloaded and decrypted blobs
|
blobCache := newBlobLRUCache(defaultMaxBlobCacheBytes) // LRU cache bounded to ~1 GiB
|
||||||
|
|
||||||
for i, file := range files {
|
for i, file := range files {
|
||||||
if v.ctx.Err() != nil {
|
if v.ctx.Err() != nil {
|
||||||
@ -299,7 +299,7 @@ func (v *Vaultik) restoreFile(
|
|||||||
targetDir string,
|
targetDir string,
|
||||||
identity age.Identity,
|
identity age.Identity,
|
||||||
chunkToBlobMap map[string]*database.BlobChunk,
|
chunkToBlobMap map[string]*database.BlobChunk,
|
||||||
blobCache map[string][]byte,
|
blobCache *blobLRUCache,
|
||||||
result *RestoreResult,
|
result *RestoreResult,
|
||||||
) error {
|
) error {
|
||||||
// Calculate target path - use full original path under target directory
|
// Calculate target path - use full original path under target directory
|
||||||
@ -383,7 +383,7 @@ func (v *Vaultik) restoreRegularFile(
|
|||||||
targetPath string,
|
targetPath string,
|
||||||
identity age.Identity,
|
identity age.Identity,
|
||||||
chunkToBlobMap map[string]*database.BlobChunk,
|
chunkToBlobMap map[string]*database.BlobChunk,
|
||||||
blobCache map[string][]byte,
|
blobCache *blobLRUCache,
|
||||||
result *RestoreResult,
|
result *RestoreResult,
|
||||||
) error {
|
) error {
|
||||||
// Get file chunks in order
|
// Get file chunks in order
|
||||||
@ -417,13 +417,13 @@ func (v *Vaultik) restoreRegularFile(
|
|||||||
|
|
||||||
// Download and decrypt blob if not cached
|
// Download and decrypt blob if not cached
|
||||||
blobHashStr := blob.Hash.String()
|
blobHashStr := blob.Hash.String()
|
||||||
blobData, ok := blobCache[blobHashStr]
|
blobData, ok := blobCache.Get(blobHashStr)
|
||||||
if !ok {
|
if !ok {
|
||||||
blobData, err = v.downloadBlob(ctx, blobHashStr, blob.CompressedSize, identity)
|
blobData, err = v.downloadBlob(ctx, blobHashStr, blob.CompressedSize, identity)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("downloading blob %s: %w", blobHashStr[:16], err)
|
return fmt.Errorf("downloading blob %s: %w", blobHashStr[:16], err)
|
||||||
}
|
}
|
||||||
blobCache[blobHashStr] = blobData
|
blobCache.Put(blobHashStr, blobData)
|
||||||
result.BlobsDownloaded++
|
result.BlobsDownloaded++
|
||||||
result.BytesDownloaded += blob.CompressedSize
|
result.BytesDownloaded += blob.CompressedSize
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user