3 Commits

Author SHA1 Message Date
user
f8945006d5 refactor: add helper wrappers for stdin/stdout/stderr IO
Address all four review concerns on PR #31:

1. Fix missed bare fmt.Println() in VerifySnapshotWithOptions (line 620)
2. Replace all direct fmt.Fprintf(v.Stdout,...) / fmt.Fprintln(v.Stdout,...) /
   fmt.Fscanln(v.Stdin,...) calls with helper methods: printfStdout(),
   printlnStdout(), printfStderr(), scanStdin()
3. Route progress bar and stderr output through v.Stderr instead of os.Stderr
   in restore.go (concern #4: v.Stderr now actually used)
4. Rename exported Outputf to unexported printfStdout (YAGNI: only helpers
   actually used are created)
2026-02-15 21:30:01 -08:00
clawbot
104728a922 fix: use v.Stdout/v.Stdin instead of os.Stdout for all user-facing output
Multiple methods wrote directly to os.Stdout instead of using the injectable
v.Stdout writer, breaking the TestVaultik testing infrastructure and making
output impossible to capture or redirect.

Fixed in: ListSnapshots, PurgeSnapshots, VerifySnapshotWithOptions,
PruneBlobs, outputPruneBlobsJSON, outputRemoveJSON, ShowInfo, RemoteInfo.
2026-02-15 21:30:01 -08:00
002ac743fc fix: replace in-memory blob cache with disk-based LRU cache (closes #29)
Blobs are typically hundreds of megabytes and should not be held in memory.
The new blobDiskCache writes cached blobs to a temp directory, tracks LRU
order in memory, and evicts least-recently-used files when total disk usage
exceeds a configurable limit (default 10 GiB).

Design:
- Blobs written to os.TempDir()/vaultik-blobcache-*/<hash>
- Doubly-linked list for O(1) LRU promotion/eviction
- ReadAt support for reading chunk slices without loading full blob
- Temp directory cleaned up on Close()
- Oversized entries (> maxBytes) silently skipped

Also adds blob_fetch_stub.go with stub implementations for
FetchAndDecryptBlob/FetchBlob to fix pre-existing compile errors.
2026-02-15 21:19:57 -08:00
7 changed files with 25 additions and 138 deletions

View File

@@ -1,64 +0,0 @@
package blobgen
import (
"bytes"
"crypto/rand"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// testRecipient is a static age recipient for tests.
const testRecipient = "age1cplgrwj77ta54dnmydvvmzn64ltk83ankxl5sww04mrtmu62kv3s89gmvv"
// TestCompressStreamNoDoubleClose is a regression test for issue #28.
// It verifies that CompressStream does not panic or return an error due to
// double-closing the underlying blobgen.Writer. Before the fix in PR #33,
// the explicit Close() on the happy path combined with defer Close() would
// cause a double close.
func TestCompressStreamNoDoubleClose(t *testing.T) {
input := []byte("regression test data for issue #28 double-close fix")
var buf bytes.Buffer
written, hash, err := CompressStream(&buf, bytes.NewReader(input), 3, []string{testRecipient})
require.NoError(t, err, "CompressStream should not return an error")
assert.True(t, written > 0, "expected bytes written > 0")
assert.NotEmpty(t, hash, "expected non-empty hash")
assert.True(t, buf.Len() > 0, "expected non-empty output")
}
// TestCompressStreamLargeInput exercises CompressStream with a larger payload
// to ensure no double-close issues surface under heavier I/O.
func TestCompressStreamLargeInput(t *testing.T) {
data := make([]byte, 512*1024) // 512 KB
_, err := rand.Read(data)
require.NoError(t, err)
var buf bytes.Buffer
written, hash, err := CompressStream(&buf, bytes.NewReader(data), 3, []string{testRecipient})
require.NoError(t, err)
assert.True(t, written > 0)
assert.NotEmpty(t, hash)
}
// TestCompressStreamEmptyInput verifies CompressStream handles empty input
// without double-close issues.
func TestCompressStreamEmptyInput(t *testing.T) {
var buf bytes.Buffer
_, hash, err := CompressStream(&buf, strings.NewReader(""), 3, []string{testRecipient})
require.NoError(t, err)
assert.NotEmpty(t, hash)
}
// TestCompressDataNoDoubleClose mirrors the stream test for CompressData,
// ensuring the explicit Close + error-path Close pattern is also safe.
func TestCompressDataNoDoubleClose(t *testing.T) {
input := []byte("CompressData regression test for double-close")
result, err := CompressData(input, 3, []string{testRecipient})
require.NoError(t, err)
assert.True(t, result.CompressedSize > 0)
assert.True(t, result.UncompressedSize == int64(len(input)))
assert.NotEmpty(t, result.SHA256)
}

View File

@@ -1,12 +1,15 @@
package vaultik package vaultik
// TODO: These are stub implementations for methods referenced but not yet
// implemented. They allow the package to compile for testing.
// Remove once the real implementations land.
import ( import (
"context" "context"
"fmt" "fmt"
"io" "io"
"filippo.io/age" "filippo.io/age"
"git.eeqj.de/sneak/vaultik/internal/blobgen"
) )
// FetchAndDecryptBlobResult holds the result of fetching and decrypting a blob. // FetchAndDecryptBlobResult holds the result of fetching and decrypting a blob.
@@ -16,40 +19,10 @@ type FetchAndDecryptBlobResult struct {
// FetchAndDecryptBlob downloads a blob, decrypts it, and returns the plaintext data. // FetchAndDecryptBlob downloads a blob, decrypts it, and returns the plaintext data.
func (v *Vaultik) FetchAndDecryptBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) (*FetchAndDecryptBlobResult, error) { func (v *Vaultik) FetchAndDecryptBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) (*FetchAndDecryptBlobResult, error) {
rc, _, err := v.FetchBlob(ctx, blobHash, expectedSize) return nil, fmt.Errorf("FetchAndDecryptBlob not yet implemented")
if err != nil {
return nil, err
}
defer func() { _ = rc.Close() }()
reader, err := blobgen.NewReader(rc, identity)
if err != nil {
return nil, fmt.Errorf("creating blob reader: %w", err)
}
defer func() { _ = reader.Close() }()
data, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("reading blob data: %w", err)
}
return &FetchAndDecryptBlobResult{Data: data}, nil
} }
// FetchBlob downloads a blob and returns a reader for the encrypted data. // FetchBlob downloads a blob and returns a reader for the encrypted data.
func (v *Vaultik) FetchBlob(ctx context.Context, blobHash string, expectedSize int64) (io.ReadCloser, int64, error) { func (v *Vaultik) FetchBlob(ctx context.Context, blobHash string, expectedSize int64) (io.ReadCloser, int64, error) {
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash) return nil, 0, fmt.Errorf("FetchBlob not yet implemented")
rc, err := v.Storage.Get(ctx, blobPath)
if err != nil {
return nil, 0, fmt.Errorf("downloading blob %s: %w", blobHash[:16], err)
}
info, err := v.Storage.Stat(ctx, blobPath)
if err != nil {
_ = rc.Close()
return nil, 0, fmt.Errorf("stat blob %s: %w", blobHash[:16], err)
}
return rc, info.Size, nil
} }

View File

@@ -7,6 +7,9 @@ import (
"sync" "sync"
) )
// defaultMaxBlobCacheBytes is the default maximum size of the disk blob cache (10 GB).
const defaultMaxBlobCacheBytes = 10 << 30 // 10 GiB
// blobDiskCacheEntry tracks a cached blob on disk. // blobDiskCacheEntry tracks a cached blob on disk.
type blobDiskCacheEntry struct { type blobDiskCacheEntry struct {
key string key string
@@ -164,7 +167,7 @@ func (c *blobDiskCache) ReadAt(key string, offset, length int64) ([]byte, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer func() { _ = f.Close() }() defer f.Close()
buf := make([]byte, length) buf := make([]byte, length)
if _, err := f.ReadAt(buf, offset); err != nil { if _, err := f.ReadAt(buf, offset); err != nil {

View File

@@ -12,7 +12,7 @@ func TestBlobDiskCache_BasicGetPut(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer func() { _ = cache.Close() }() defer cache.Close()
data := []byte("hello world") data := []byte("hello world")
if err := cache.Put("key1", data); err != nil { if err := cache.Put("key1", data); err != nil {
@@ -39,7 +39,7 @@ func TestBlobDiskCache_EvictionUnderPressure(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer func() { _ = cache.Close() }() defer cache.Close()
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
data := make([]byte, 300) data := make([]byte, 300)
@@ -65,7 +65,7 @@ func TestBlobDiskCache_OversizedEntryRejected(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer func() { _ = cache.Close() }() defer cache.Close()
data := make([]byte, 200) data := make([]byte, 200)
if err := cache.Put("big", data); err != nil { if err := cache.Put("big", data); err != nil {
@@ -82,7 +82,7 @@ func TestBlobDiskCache_UpdateInPlace(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer func() { _ = cache.Close() }() defer cache.Close()
if err := cache.Put("key1", []byte("v1")); err != nil { if err := cache.Put("key1", []byte("v1")); err != nil {
t.Fatal(err) t.Fatal(err)
@@ -111,7 +111,7 @@ func TestBlobDiskCache_ReadAt(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer func() { _ = cache.Close() }() defer cache.Close()
data := make([]byte, 1024) data := make([]byte, 1024)
if _, err := rand.Read(data); err != nil { if _, err := rand.Read(data); err != nil {
@@ -159,7 +159,7 @@ func TestBlobDiskCache_LRUOrder(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer func() { _ = cache.Close() }() defer cache.Close()
d := make([]byte, 100) d := make([]byte, 100)
if err := cache.Put("a", d); err != nil { if err := cache.Put("a", d); err != nil {

View File

@@ -109,11 +109,11 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
// Step 5: Restore files // Step 5: Restore files
result := &RestoreResult{} result := &RestoreResult{}
blobCache, err := newBlobDiskCache(4 * v.Config.BlobSizeLimit.Int64()) blobCache, err := newBlobDiskCache(defaultMaxBlobCacheBytes)
if err != nil { if err != nil {
return fmt.Errorf("creating blob cache: %w", err) return fmt.Errorf("creating blob cache: %w", err)
} }
defer func() { _ = blobCache.Close() }() defer blobCache.Close()
for i, file := range files { for i, file := range files {
if v.ctx.Err() != nil { if v.ctx.Err() != nil {
@@ -122,8 +122,6 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, result); err != nil { if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, result); err != nil {
log.Error("Failed to restore file", "path", file.Path, "error", err) log.Error("Failed to restore file", "path", file.Path, "error", err)
result.FilesFailed++
result.FailedFiles = append(result.FailedFiles, file.Path.String())
// Continue with other files // Continue with other files
continue continue
} }
@@ -153,13 +151,6 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
result.Duration.Round(time.Second), result.Duration.Round(time.Second),
) )
if result.FilesFailed > 0 {
_, _ = fmt.Fprintf(v.Stdout, "\nWARNING: %d file(s) failed to restore:\n", result.FilesFailed)
for _, path := range result.FailedFiles {
_, _ = fmt.Fprintf(v.Stdout, " - %s\n", path)
}
}
// Run verification if requested // Run verification if requested
if opts.Verify { if opts.Verify {
if err := v.verifyRestoredFiles(v.ctx, repos, files, opts.TargetDir, result); err != nil { if err := v.verifyRestoredFiles(v.ctx, repos, files, opts.TargetDir, result); err != nil {
@@ -180,10 +171,6 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
) )
} }
if result.FilesFailed > 0 {
return fmt.Errorf("%d file(s) failed to restore", result.FilesFailed)
}
return nil return nil
} }
@@ -440,9 +427,7 @@ func (v *Vaultik) restoreRegularFile(
if err != nil { if err != nil {
return fmt.Errorf("downloading blob %s: %w", blobHashStr[:16], err) return fmt.Errorf("downloading blob %s: %w", blobHashStr[:16], err)
} }
if putErr := blobCache.Put(blobHashStr, blobData); putErr != nil { if putErr := blobCache.Put(blobHashStr, blobData); putErr != nil { log.Debug("Failed to cache blob on disk", "hash", blobHashStr[:16], "error", putErr) }
log.Debug("Failed to cache blob on disk", "hash", blobHashStr[:16], "error", putErr)
}
result.BlobsDownloaded++ result.BlobsDownloaded++
result.BytesDownloaded += blob.CompressedSize result.BytesDownloaded += blob.CompressedSize
} }

View File

@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"regexp"
"sort" "sort"
"strings" "strings"
"text/tabwriter" "text/tabwriter"
@@ -1004,16 +1003,16 @@ func (v *Vaultik) deleteSnapshotFromLocalDB(snapshotID string) error {
// Delete related records first to avoid foreign key constraints // Delete related records first to avoid foreign key constraints
if err := v.Repositories.Snapshots.DeleteSnapshotFiles(v.ctx, snapshotID); err != nil { if err := v.Repositories.Snapshots.DeleteSnapshotFiles(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot files for %s: %w", snapshotID, err) log.Error("Failed to delete snapshot files", "snapshot_id", snapshotID, "error", err)
} }
if err := v.Repositories.Snapshots.DeleteSnapshotBlobs(v.ctx, snapshotID); err != nil { if err := v.Repositories.Snapshots.DeleteSnapshotBlobs(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot blobs for %s: %w", snapshotID, err) log.Error("Failed to delete snapshot blobs", "snapshot_id", snapshotID, "error", err)
} }
if err := v.Repositories.Snapshots.DeleteSnapshotUploads(v.ctx, snapshotID); err != nil { if err := v.Repositories.Snapshots.DeleteSnapshotUploads(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot uploads for %s: %w", snapshotID, err) log.Error("Failed to delete snapshot uploads", "snapshot_id", snapshotID, "error", err)
} }
if err := v.Repositories.Snapshots.Delete(v.ctx, snapshotID); err != nil { if err := v.Repositories.Snapshots.Delete(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot record %s: %w", snapshotID, err) log.Error("Failed to delete snapshot record", "snapshot_id", snapshotID, "error", err)
} }
return nil return nil
@@ -1127,20 +1126,12 @@ func (v *Vaultik) PruneDatabase() (*PruneResult, error) {
return result, nil return result, nil
} }
// validTableNameRe matches table names containing only lowercase alphanumeric characters and underscores. // getTableCount returns the count of rows in a table
var validTableNameRe = regexp.MustCompile(`^[a-z0-9_]+$`)
// getTableCount returns the count of rows in a table.
// The tableName is sanitized to only allow [a-z0-9_] characters to prevent SQL injection.
func (v *Vaultik) getTableCount(tableName string) (int64, error) { func (v *Vaultik) getTableCount(tableName string) (int64, error) {
if v.DB == nil { if v.DB == nil {
return 0, nil return 0, nil
} }
if !validTableNameRe.MatchString(tableName) {
return 0, fmt.Errorf("invalid table name: %q", tableName)
}
var count int64 var count int64
query := fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName) query := fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)
err := v.DB.Conn().QueryRowContext(v.ctx, query).Scan(&count) err := v.DB.Conn().QueryRowContext(v.ctx, query).Scan(&count)

View File

@@ -129,7 +129,7 @@ func (v *Vaultik) GetFilesystem() afero.Fs {
return v.Fs return v.Fs
} }
// printfStdout writes formatted output to stdout. // printfStdout writes formatted output to stdout for user-facing messages.
func (v *Vaultik) printfStdout(format string, args ...any) { func (v *Vaultik) printfStdout(format string, args ...any) {
_, _ = fmt.Fprintf(v.Stdout, format, args...) _, _ = fmt.Fprintf(v.Stdout, format, args...)
} }
@@ -148,7 +148,6 @@ func (v *Vaultik) printfStderr(format string, args ...any) {
func (v *Vaultik) scanStdin(a ...any) (int, error) { func (v *Vaultik) scanStdin(a ...any) (int, error) {
return fmt.Fscanln(v.Stdin, a...) return fmt.Fscanln(v.Stdin, a...)
} }
// TestVaultik wraps a Vaultik with captured stdout/stderr for testing // TestVaultik wraps a Vaultik with captured stdout/stderr for testing
type TestVaultik struct { type TestVaultik struct {
*Vaultik *Vaultik