Compare commits

..

3 Commits

Author SHA1 Message Date
002ac743fc fix: replace in-memory blob cache with disk-based LRU cache (closes #29)
Blobs are typically hundreds of megabytes and should not be held in memory.
The new blobDiskCache writes cached blobs to a temp directory, tracks LRU
order in memory, and evicts least-recently-used files when total disk usage
exceeds a configurable limit (default 10 GiB).

Design:
- Blobs written to os.TempDir()/vaultik-blobcache-*/<hash>
- Doubly-linked list for O(1) LRU promotion/eviction
- ReadAt support for reading chunk slices without loading full blob
- Temp directory cleaned up on Close()
- Oversized entries (> maxBytes) silently skipped

Also adds blob_fetch_stub.go with stub implementations for
FetchAndDecryptBlob/FetchBlob to fix pre-existing compile errors.
2026-02-15 21:19:57 -08:00
8adc668fa6 Merge pull request 'Prevent double-close of blobgen.Writer in CompressStream (closes #28)' (#33) from fix/issue-28 into main
Reviewed-on: #33
2026-02-16 06:04:33 +01:00
clawbot
441c441eca fix: prevent double-close of blobgen.Writer in CompressStream
CompressStream had both a defer w.Close() and an explicit w.Close() call,
causing the compressor and encryptor to be closed twice. The second close
on the zstd encoder returns an error, and the age encryptor may write
duplicate finalization bytes, potentially corrupting the output stream.

Use a closed flag to prevent the deferred close from running after the
explicit close succeeds.
2026-02-08 12:03:36 -08:00
5 changed files with 428 additions and 73 deletions

View File

@ -51,7 +51,13 @@ func CompressStream(dst io.Writer, src io.Reader, compressionLevel int, recipien
if err != nil { if err != nil {
return 0, "", fmt.Errorf("creating writer: %w", err) return 0, "", fmt.Errorf("creating writer: %w", err)
} }
defer func() { _ = w.Close() }()
closed := false
defer func() {
if !closed {
_ = w.Close()
}
}()
// Copy data // Copy data
if _, err := io.Copy(w, src); err != nil { if _, err := io.Copy(w, src); err != nil {
@ -62,6 +68,7 @@ func CompressStream(dst io.Writer, src io.Reader, compressionLevel int, recipien
if err := w.Close(); err != nil { if err := w.Close(); err != nil {
return 0, "", fmt.Errorf("closing writer: %w", err) return 0, "", fmt.Errorf("closing writer: %w", err)
} }
closed = true
return w.BytesWritten(), hex.EncodeToString(w.Sum256()), nil return w.BytesWritten(), hex.EncodeToString(w.Sum256()), nil
} }

View File

@ -0,0 +1,28 @@
package vaultik
// TODO: These are stub implementations for methods referenced but not yet
// implemented. They allow the package to compile for testing.
// Remove once the real implementations land.
import (
"context"
"fmt"
"io"
"filippo.io/age"
)
// FetchAndDecryptBlobResult holds the result of fetching and decrypting a blob.
type FetchAndDecryptBlobResult struct {
Data []byte
}
// FetchAndDecryptBlob downloads a blob, decrypts it, and returns the plaintext data.
func (v *Vaultik) FetchAndDecryptBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) (*FetchAndDecryptBlobResult, error) {
return nil, fmt.Errorf("FetchAndDecryptBlob not yet implemented")
}
// FetchBlob downloads a blob and returns a reader for the encrypted data.
func (v *Vaultik) FetchBlob(ctx context.Context, blobHash string, expectedSize int64) (io.ReadCloser, int64, error) {
return nil, 0, fmt.Errorf("FetchBlob not yet implemented")
}

View File

@ -1,83 +1,210 @@
package vaultik package vaultik
import ( import (
"container/list" "fmt"
"os"
"path/filepath"
"sync" "sync"
) )
// defaultMaxBlobCacheBytes is the default maximum size of the blob cache (1 GB). // defaultMaxBlobCacheBytes is the default maximum size of the disk blob cache (10 GB).
const defaultMaxBlobCacheBytes = 1 << 30 // 1 GiB const defaultMaxBlobCacheBytes = 10 << 30 // 10 GiB
// blobCacheEntry is an entry in the LRU blob cache. // blobDiskCacheEntry tracks a cached blob on disk.
type blobCacheEntry struct { type blobDiskCacheEntry struct {
key string key string
data []byte size int64
prev *blobDiskCacheEntry
next *blobDiskCacheEntry
} }
// blobLRUCache is a simple LRU cache for downloaded blob data, bounded by // blobDiskCache is an LRU cache that stores blobs on disk instead of in memory.
// total byte size to prevent memory exhaustion during large restores. // Blobs are written to a temp directory keyed by their hash. When total size
type blobLRUCache struct { // exceeds maxBytes, the least-recently-used entries are evicted (deleted from disk).
type blobDiskCache struct {
mu sync.Mutex mu sync.Mutex
dir string
maxBytes int64 maxBytes int64
curBytes int64 curBytes int64
ll *list.List items map[string]*blobDiskCacheEntry
items map[string]*list.Element head *blobDiskCacheEntry // most recent
tail *blobDiskCacheEntry // least recent
} }
// newBlobLRUCache creates a new LRU blob cache with the given maximum size in bytes. // newBlobDiskCache creates a new disk-based blob cache with the given max size.
func newBlobLRUCache(maxBytes int64) *blobLRUCache { func newBlobDiskCache(maxBytes int64) (*blobDiskCache, error) {
return &blobLRUCache{ dir, err := os.MkdirTemp("", "vaultik-blobcache-*")
if err != nil {
return nil, fmt.Errorf("creating blob cache dir: %w", err)
}
return &blobDiskCache{
dir: dir,
maxBytes: maxBytes, maxBytes: maxBytes,
ll: list.New(), items: make(map[string]*blobDiskCacheEntry),
items: make(map[string]*list.Element), }, nil
}
func (c *blobDiskCache) path(key string) string {
return filepath.Join(c.dir, key)
}
func (c *blobDiskCache) unlink(e *blobDiskCacheEntry) {
if e.prev != nil {
e.prev.next = e.next
} else {
c.head = e.next
}
if e.next != nil {
e.next.prev = e.prev
} else {
c.tail = e.prev
}
e.prev = nil
e.next = nil
}
func (c *blobDiskCache) pushFront(e *blobDiskCacheEntry) {
e.prev = nil
e.next = c.head
if c.head != nil {
c.head.prev = e
}
c.head = e
if c.tail == nil {
c.tail = e
} }
} }
// Get returns the blob data for the given key, or nil if not cached. func (c *blobDiskCache) evictLRU() {
func (c *blobLRUCache) Get(key string) ([]byte, bool) { if c.tail == nil {
c.mu.Lock()
defer c.mu.Unlock()
if ele, ok := c.items[key]; ok {
c.ll.MoveToFront(ele)
return ele.Value.(*blobCacheEntry).data, true
}
return nil, false
}
// Put adds a blob to the cache, evicting least-recently-used entries if needed.
func (c *blobLRUCache) Put(key string, data []byte) {
c.mu.Lock()
defer c.mu.Unlock()
entrySize := int64(len(data))
// If this single entry exceeds max, don't cache it
if entrySize > c.maxBytes {
return return
} }
victim := c.tail
// If already present, update in place c.unlink(victim)
if ele, ok := c.items[key]; ok { delete(c.items, victim.key)
c.ll.MoveToFront(ele) c.curBytes -= victim.size
old := ele.Value.(*blobCacheEntry) _ = os.Remove(c.path(victim.key))
c.curBytes += entrySize - int64(len(old.data)) }
old.data = data
} else { // Put writes blob data to disk cache. Entries larger than maxBytes are silently skipped.
ele := c.ll.PushFront(&blobCacheEntry{key: key, data: data}) func (c *blobDiskCache) Put(key string, data []byte) error {
c.items[key] = ele entrySize := int64(len(data))
c.curBytes += entrySize
} c.mu.Lock()
defer c.mu.Unlock()
// Evict from back until under limit
for c.curBytes > c.maxBytes && c.ll.Len() > 0 { if entrySize > c.maxBytes {
oldest := c.ll.Back() return nil
if oldest == nil { }
break
} // Remove old entry if updating
entry := oldest.Value.(*blobCacheEntry) if e, ok := c.items[key]; ok {
c.ll.Remove(oldest) c.unlink(e)
delete(c.items, entry.key) c.curBytes -= e.size
c.curBytes -= int64(len(entry.data)) _ = os.Remove(c.path(key))
} delete(c.items, key)
}
if err := os.WriteFile(c.path(key), data, 0600); err != nil {
return fmt.Errorf("writing blob to cache: %w", err)
}
e := &blobDiskCacheEntry{key: key, size: entrySize}
c.pushFront(e)
c.items[key] = e
c.curBytes += entrySize
for c.curBytes > c.maxBytes && c.tail != nil {
c.evictLRU()
}
return nil
}
// Get reads a cached blob from disk. Returns data and true on hit.
func (c *blobDiskCache) Get(key string) ([]byte, bool) {
c.mu.Lock()
e, ok := c.items[key]
if !ok {
c.mu.Unlock()
return nil, false
}
c.unlink(e)
c.pushFront(e)
c.mu.Unlock()
data, err := os.ReadFile(c.path(key))
if err != nil {
c.mu.Lock()
if e2, ok2 := c.items[key]; ok2 && e2 == e {
c.unlink(e)
delete(c.items, key)
c.curBytes -= e.size
}
c.mu.Unlock()
return nil, false
}
return data, true
}
// ReadAt reads a slice of a cached blob without loading the entire blob into memory.
func (c *blobDiskCache) ReadAt(key string, offset, length int64) ([]byte, error) {
c.mu.Lock()
e, ok := c.items[key]
if !ok {
c.mu.Unlock()
return nil, fmt.Errorf("key %q not in cache", key)
}
if offset+length > e.size {
c.mu.Unlock()
return nil, fmt.Errorf("read beyond blob size: offset=%d length=%d size=%d", offset, length, e.size)
}
c.unlink(e)
c.pushFront(e)
c.mu.Unlock()
f, err := os.Open(c.path(key))
if err != nil {
return nil, err
}
defer f.Close()
buf := make([]byte, length)
if _, err := f.ReadAt(buf, offset); err != nil {
return nil, err
}
return buf, nil
}
// Has returns whether a key exists in the cache.
func (c *blobDiskCache) Has(key string) bool {
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.items[key]
return ok
}
// Size returns current total cached bytes.
func (c *blobDiskCache) Size() int64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.curBytes
}
// Len returns number of cached entries.
func (c *blobDiskCache) Len() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.items)
}
// Close removes the cache directory and all cached blobs.
func (c *blobDiskCache) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
c.items = nil
c.head = nil
c.tail = nil
c.curBytes = 0
return os.RemoveAll(c.dir)
} }

View File

@ -0,0 +1,189 @@
package vaultik
import (
"bytes"
"crypto/rand"
"fmt"
"testing"
)
func TestBlobDiskCache_BasicGetPut(t *testing.T) {
cache, err := newBlobDiskCache(1 << 20)
if err != nil {
t.Fatal(err)
}
defer cache.Close()
data := []byte("hello world")
if err := cache.Put("key1", data); err != nil {
t.Fatal(err)
}
got, ok := cache.Get("key1")
if !ok {
t.Fatal("expected cache hit")
}
if !bytes.Equal(got, data) {
t.Fatalf("got %q, want %q", got, data)
}
_, ok = cache.Get("nonexistent")
if ok {
t.Fatal("expected cache miss")
}
}
func TestBlobDiskCache_EvictionUnderPressure(t *testing.T) {
maxBytes := int64(1000)
cache, err := newBlobDiskCache(maxBytes)
if err != nil {
t.Fatal(err)
}
defer cache.Close()
for i := 0; i < 5; i++ {
data := make([]byte, 300)
if err := cache.Put(fmt.Sprintf("key%d", i), data); err != nil {
t.Fatal(err)
}
}
if cache.Size() > maxBytes {
t.Fatalf("cache size %d exceeds max %d", cache.Size(), maxBytes)
}
if !cache.Has("key4") {
t.Fatal("expected key4 to be cached")
}
if cache.Has("key0") {
t.Fatal("expected key0 to be evicted")
}
}
func TestBlobDiskCache_OversizedEntryRejected(t *testing.T) {
cache, err := newBlobDiskCache(100)
if err != nil {
t.Fatal(err)
}
defer cache.Close()
data := make([]byte, 200)
if err := cache.Put("big", data); err != nil {
t.Fatal(err)
}
if cache.Has("big") {
t.Fatal("oversized entry should not be cached")
}
}
func TestBlobDiskCache_UpdateInPlace(t *testing.T) {
cache, err := newBlobDiskCache(1 << 20)
if err != nil {
t.Fatal(err)
}
defer cache.Close()
if err := cache.Put("key1", []byte("v1")); err != nil {
t.Fatal(err)
}
if err := cache.Put("key1", []byte("version2")); err != nil {
t.Fatal(err)
}
got, ok := cache.Get("key1")
if !ok {
t.Fatal("expected hit")
}
if string(got) != "version2" {
t.Fatalf("got %q, want %q", got, "version2")
}
if cache.Len() != 1 {
t.Fatalf("expected 1 entry, got %d", cache.Len())
}
if cache.Size() != int64(len("version2")) {
t.Fatalf("expected size %d, got %d", len("version2"), cache.Size())
}
}
func TestBlobDiskCache_ReadAt(t *testing.T) {
cache, err := newBlobDiskCache(1 << 20)
if err != nil {
t.Fatal(err)
}
defer cache.Close()
data := make([]byte, 1024)
if _, err := rand.Read(data); err != nil {
t.Fatal(err)
}
if err := cache.Put("blob1", data); err != nil {
t.Fatal(err)
}
chunk, err := cache.ReadAt("blob1", 100, 200)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(chunk, data[100:300]) {
t.Fatal("ReadAt returned wrong data")
}
_, err = cache.ReadAt("blob1", 900, 200)
if err == nil {
t.Fatal("expected error for out-of-bounds read")
}
_, err = cache.ReadAt("missing", 0, 10)
if err == nil {
t.Fatal("expected error for missing key")
}
}
func TestBlobDiskCache_Close(t *testing.T) {
cache, err := newBlobDiskCache(1 << 20)
if err != nil {
t.Fatal(err)
}
if err := cache.Put("key1", []byte("data")); err != nil {
t.Fatal(err)
}
if err := cache.Close(); err != nil {
t.Fatal(err)
}
}
func TestBlobDiskCache_LRUOrder(t *testing.T) {
cache, err := newBlobDiskCache(200)
if err != nil {
t.Fatal(err)
}
defer cache.Close()
d := make([]byte, 100)
if err := cache.Put("a", d); err != nil {
t.Fatal(err)
}
if err := cache.Put("b", d); err != nil {
t.Fatal(err)
}
// Access "a" to make it most recently used
cache.Get("a")
// Adding "c" should evict "b" (LRU), not "a"
if err := cache.Put("c", d); err != nil {
t.Fatal(err)
}
if !cache.Has("a") {
t.Fatal("expected 'a' to survive")
}
if !cache.Has("c") {
t.Fatal("expected 'c' to be present")
}
if cache.Has("b") {
t.Fatal("expected 'b' to be evicted")
}
}

View File

@ -109,7 +109,11 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
// Step 5: Restore files // Step 5: Restore files
result := &RestoreResult{} result := &RestoreResult{}
blobCache := newBlobLRUCache(defaultMaxBlobCacheBytes) // LRU cache bounded to ~1 GiB blobCache, err := newBlobDiskCache(defaultMaxBlobCacheBytes)
if err != nil {
return fmt.Errorf("creating blob cache: %w", err)
}
defer blobCache.Close()
for i, file := range files { for i, file := range files {
if v.ctx.Err() != nil { if v.ctx.Err() != nil {
@ -141,7 +145,7 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
"duration", result.Duration, "duration", result.Duration,
) )
_, _ = fmt.Fprintf(v.Stdout, "Restored %d files (%s) in %s\n", v.printfStdout("Restored %d files (%s) in %s\n",
result.FilesRestored, result.FilesRestored,
humanize.Bytes(uint64(result.BytesRestored)), humanize.Bytes(uint64(result.BytesRestored)),
result.Duration.Round(time.Second), result.Duration.Round(time.Second),
@ -154,14 +158,14 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
} }
if result.FilesFailed > 0 { if result.FilesFailed > 0 {
_, _ = fmt.Fprintf(v.Stdout, "\nVerification FAILED: %d files did not match expected checksums\n", result.FilesFailed) v.printfStdout("\nVerification FAILED: %d files did not match expected checksums\n", result.FilesFailed)
for _, path := range result.FailedFiles { for _, path := range result.FailedFiles {
_, _ = fmt.Fprintf(v.Stdout, " - %s\n", path) v.printfStdout(" - %s\n", path)
} }
return fmt.Errorf("%d files failed verification", result.FilesFailed) return fmt.Errorf("%d files failed verification", result.FilesFailed)
} }
_, _ = fmt.Fprintf(v.Stdout, "Verified %d files (%s)\n", v.printfStdout("Verified %d files (%s)\n",
result.FilesVerified, result.FilesVerified,
humanize.Bytes(uint64(result.BytesVerified)), humanize.Bytes(uint64(result.BytesVerified)),
) )
@ -299,7 +303,7 @@ func (v *Vaultik) restoreFile(
targetDir string, targetDir string,
identity age.Identity, identity age.Identity,
chunkToBlobMap map[string]*database.BlobChunk, chunkToBlobMap map[string]*database.BlobChunk,
blobCache *blobLRUCache, blobCache *blobDiskCache,
result *RestoreResult, result *RestoreResult,
) error { ) error {
// Calculate target path - use full original path under target directory // Calculate target path - use full original path under target directory
@ -383,7 +387,7 @@ func (v *Vaultik) restoreRegularFile(
targetPath string, targetPath string,
identity age.Identity, identity age.Identity,
chunkToBlobMap map[string]*database.BlobChunk, chunkToBlobMap map[string]*database.BlobChunk,
blobCache *blobLRUCache, blobCache *blobDiskCache,
result *RestoreResult, result *RestoreResult,
) error { ) error {
// Get file chunks in order // Get file chunks in order
@ -423,7 +427,7 @@ func (v *Vaultik) restoreRegularFile(
if err != nil { if err != nil {
return fmt.Errorf("downloading blob %s: %w", blobHashStr[:16], err) return fmt.Errorf("downloading blob %s: %w", blobHashStr[:16], err)
} }
blobCache.Put(blobHashStr, blobData) if putErr := blobCache.Put(blobHashStr, blobData); putErr != nil { log.Debug("Failed to cache blob on disk", "hash", blobHashStr[:16], "error", putErr) }
result.BlobsDownloaded++ result.BlobsDownloaded++
result.BytesDownloaded += blob.CompressedSize result.BytesDownloaded += blob.CompressedSize
} }
@ -511,7 +515,7 @@ func (v *Vaultik) verifyRestoredFiles(
"files", len(regularFiles), "files", len(regularFiles),
"bytes", humanize.Bytes(uint64(totalBytes)), "bytes", humanize.Bytes(uint64(totalBytes)),
) )
_, _ = fmt.Fprintf(v.Stdout, "\nVerifying %d files (%s)...\n", v.printfStdout("\nVerifying %d files (%s)...\n",
len(regularFiles), len(regularFiles),
humanize.Bytes(uint64(totalBytes)), humanize.Bytes(uint64(totalBytes)),
) )
@ -522,13 +526,13 @@ func (v *Vaultik) verifyRestoredFiles(
bar = progressbar.NewOptions64( bar = progressbar.NewOptions64(
totalBytes, totalBytes,
progressbar.OptionSetDescription("Verifying"), progressbar.OptionSetDescription("Verifying"),
progressbar.OptionSetWriter(os.Stderr), progressbar.OptionSetWriter(v.Stderr),
progressbar.OptionShowBytes(true), progressbar.OptionShowBytes(true),
progressbar.OptionShowCount(), progressbar.OptionShowCount(),
progressbar.OptionSetWidth(40), progressbar.OptionSetWidth(40),
progressbar.OptionThrottle(100*time.Millisecond), progressbar.OptionThrottle(100*time.Millisecond),
progressbar.OptionOnCompletion(func() { progressbar.OptionOnCompletion(func() {
fmt.Fprint(os.Stderr, "\n") v.printfStderr("\n")
}), }),
progressbar.OptionSetRenderBlankState(true), progressbar.OptionSetRenderBlankState(true),
) )