Compare commits

..

1 Commits

Author SHA1 Message Date
clawbot
ee161fb3a3 feat: add progress bar to restore operation
- Add progress bar for restore and verify operations using progressbar/v3
- Replace in-memory blob cache with disk-based LRU cache (closes #29)
- Add helper wrappers for stdin/stdout/stderr IO
- Move FetchAndDecryptBlob into restore.go, remove blob_fetch_stub.go
- Use v.Stdout/v.Stdin instead of os.Stdout for all user-facing output

Rebased onto main, squashed to resolve conflicts.
2026-02-20 02:23:12 -08:00
7 changed files with 143 additions and 189 deletions

View File

@ -1,64 +0,0 @@
package blobgen
import (
"bytes"
"crypto/rand"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// testRecipient is a static age recipient for tests.
const testRecipient = "age1cplgrwj77ta54dnmydvvmzn64ltk83ankxl5sww04mrtmu62kv3s89gmvv"
// TestCompressStreamNoDoubleClose is a regression test for issue #28.
// It verifies that CompressStream does not panic or return an error due to
// double-closing the underlying blobgen.Writer. Before the fix in PR #33,
// the explicit Close() on the happy path combined with defer Close() would
// cause a double close.
func TestCompressStreamNoDoubleClose(t *testing.T) {
input := []byte("regression test data for issue #28 double-close fix")
var buf bytes.Buffer
written, hash, err := CompressStream(&buf, bytes.NewReader(input), 3, []string{testRecipient})
require.NoError(t, err, "CompressStream should not return an error")
assert.True(t, written > 0, "expected bytes written > 0")
assert.NotEmpty(t, hash, "expected non-empty hash")
assert.True(t, buf.Len() > 0, "expected non-empty output")
}
// TestCompressStreamLargeInput exercises CompressStream with a larger payload
// to ensure no double-close issues surface under heavier I/O.
func TestCompressStreamLargeInput(t *testing.T) {
data := make([]byte, 512*1024) // 512 KB
_, err := rand.Read(data)
require.NoError(t, err)
var buf bytes.Buffer
written, hash, err := CompressStream(&buf, bytes.NewReader(data), 3, []string{testRecipient})
require.NoError(t, err)
assert.True(t, written > 0)
assert.NotEmpty(t, hash)
}
// TestCompressStreamEmptyInput verifies CompressStream handles empty input
// without double-close issues.
func TestCompressStreamEmptyInput(t *testing.T) {
var buf bytes.Buffer
_, hash, err := CompressStream(&buf, strings.NewReader(""), 3, []string{testRecipient})
require.NoError(t, err)
assert.NotEmpty(t, hash)
}
// TestCompressDataNoDoubleClose mirrors the stream test for CompressData,
// ensuring the explicit Close + error-path Close pattern is also safe.
func TestCompressDataNoDoubleClose(t *testing.T) {
input := []byte("CompressData regression test for double-close")
result, err := CompressData(input, 3, []string{testRecipient})
require.NoError(t, err)
assert.True(t, result.CompressedSize > 0)
assert.True(t, result.UncompressedSize == int64(len(input)))
assert.NotEmpty(t, result.SHA256)
}

View File

@ -1,55 +0,0 @@
package vaultik
import (
"context"
"fmt"
"io"
"filippo.io/age"
"git.eeqj.de/sneak/vaultik/internal/blobgen"
)
// FetchAndDecryptBlobResult holds the result of fetching and decrypting a blob.
type FetchAndDecryptBlobResult struct {
Data []byte
}
// FetchAndDecryptBlob downloads a blob, decrypts it, and returns the plaintext data.
func (v *Vaultik) FetchAndDecryptBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) (*FetchAndDecryptBlobResult, error) {
rc, _, err := v.FetchBlob(ctx, blobHash, expectedSize)
if err != nil {
return nil, err
}
defer func() { _ = rc.Close() }()
reader, err := blobgen.NewReader(rc, identity)
if err != nil {
return nil, fmt.Errorf("creating blob reader: %w", err)
}
defer func() { _ = reader.Close() }()
data, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("reading blob data: %w", err)
}
return &FetchAndDecryptBlobResult{Data: data}, nil
}
// FetchBlob downloads a blob and returns a reader for the encrypted data.
func (v *Vaultik) FetchBlob(ctx context.Context, blobHash string, expectedSize int64) (io.ReadCloser, int64, error) {
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash)
rc, err := v.Storage.Get(ctx, blobPath)
if err != nil {
return nil, 0, fmt.Errorf("downloading blob %s: %w", blobHash[:16], err)
}
info, err := v.Storage.Stat(ctx, blobPath)
if err != nil {
_ = rc.Close()
return nil, 0, fmt.Errorf("stat blob %s: %w", blobHash[:16], err)
}
return rc, info.Size, nil
}

View File

@ -7,6 +7,9 @@ import (
"sync"
)
// defaultMaxBlobCacheBytes is the default maximum size of the disk blob cache (10 GB).
const defaultMaxBlobCacheBytes = 10 << 30 // 10 GiB
// blobDiskCacheEntry tracks a cached blob on disk.
type blobDiskCacheEntry struct {
key string

View File

@ -109,34 +109,83 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
// Step 5: Restore files
result := &RestoreResult{}
blobCache, err := newBlobDiskCache(4 * v.Config.BlobSizeLimit.Int64())
blobCache, err := newBlobDiskCache(defaultMaxBlobCacheBytes)
if err != nil {
return fmt.Errorf("creating blob cache: %w", err)
}
defer func() { _ = blobCache.Close() }()
for i, file := range files {
// Calculate total bytes for progress bar
var totalBytes int64
for _, file := range files {
totalBytes += file.Size
}
_, _ = fmt.Fprintf(v.Stdout, "Restoring %d files (%s)...\n",
len(files),
humanize.Bytes(uint64(totalBytes)),
)
// Create progress bar if stderr is a terminal
isTTY := isTerminal(v.Stderr)
var bar *progressbar.ProgressBar
if isTTY {
bar = progressbar.NewOptions64(
totalBytes,
progressbar.OptionSetDescription("Restoring"),
progressbar.OptionSetWriter(v.Stderr),
progressbar.OptionShowBytes(true),
progressbar.OptionShowCount(),
progressbar.OptionSetWidth(40),
progressbar.OptionThrottle(100*time.Millisecond),
progressbar.OptionOnCompletion(func() {
v.printlnStderr()
}),
progressbar.OptionSetRenderBlankState(true),
)
}
filesProcessed := 0
for _, file := range files {
if v.ctx.Err() != nil {
return v.ctx.Err()
}
if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, result); err != nil {
log.Error("Failed to restore file", "path", file.Path, "error", err)
result.FilesFailed++
result.FailedFiles = append(result.FailedFiles, file.Path.String())
// Continue with other files
filesProcessed++
// Update progress bar even on failure
if bar != nil {
_ = bar.Add64(file.Size)
}
// Periodic structured log for non-terminal contexts (headless/CI)
if !isTTY && filesProcessed%100 == 0 {
log.Info("Restore progress",
"files", fmt.Sprintf("%d/%d", filesProcessed, len(files)),
"bytes_restored", humanize.Bytes(uint64(result.BytesRestored)),
)
}
continue
}
// Progress logging
if (i+1)%100 == 0 || i+1 == len(files) {
filesProcessed++
// Update progress bar
if bar != nil {
_ = bar.Add64(file.Size)
}
// Periodic structured log for non-terminal contexts (headless/CI)
if !isTTY && (filesProcessed%100 == 0 || filesProcessed == len(files)) {
log.Info("Restore progress",
"files", fmt.Sprintf("%d/%d", i+1, len(files)),
"bytes", humanize.Bytes(uint64(result.BytesRestored)),
"files", fmt.Sprintf("%d/%d", filesProcessed, len(files)),
"bytes_restored", humanize.Bytes(uint64(result.BytesRestored)),
)
}
}
if bar != nil {
_ = bar.Finish()
}
result.Duration = time.Since(startTime)
log.Info("Restore complete",
@ -153,13 +202,6 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
result.Duration.Round(time.Second),
)
if result.FilesFailed > 0 {
_, _ = fmt.Fprintf(v.Stdout, "\nWARNING: %d file(s) failed to restore:\n", result.FilesFailed)
for _, path := range result.FailedFiles {
_, _ = fmt.Fprintf(v.Stdout, " - %s\n", path)
}
}
// Run verification if requested
if opts.Verify {
if err := v.verifyRestoredFiles(v.ctx, repos, files, opts.TargetDir, result); err != nil {
@ -180,10 +222,6 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
)
}
if result.FilesFailed > 0 {
return fmt.Errorf("%d file(s) failed to restore", result.FilesFailed)
}
return nil
}
@ -492,6 +530,53 @@ func (v *Vaultik) restoreRegularFile(
return nil
}
// BlobFetchResult holds the result of fetching and decrypting a blob.
type BlobFetchResult struct {
Data []byte
CompressedSize int64
}
// FetchAndDecryptBlob downloads a blob from storage, decrypts and decompresses it.
func (v *Vaultik) FetchAndDecryptBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) (*BlobFetchResult, error) {
// Construct blob path with sharding
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash)
reader, err := v.Storage.Get(ctx, blobPath)
if err != nil {
return nil, fmt.Errorf("downloading blob: %w", err)
}
defer func() { _ = reader.Close() }()
// Read encrypted data
encryptedData, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("reading blob data: %w", err)
}
// Decrypt and decompress
blobReader, err := blobgen.NewReader(bytes.NewReader(encryptedData), identity)
if err != nil {
return nil, fmt.Errorf("creating decryption reader: %w", err)
}
defer func() { _ = blobReader.Close() }()
data, err := io.ReadAll(blobReader)
if err != nil {
return nil, fmt.Errorf("decrypting blob: %w", err)
}
log.Debug("Downloaded and decrypted blob",
"hash", blobHash[:16],
"encrypted_size", humanize.Bytes(uint64(len(encryptedData))),
"decrypted_size", humanize.Bytes(uint64(len(data))),
)
return &BlobFetchResult{
Data: data,
CompressedSize: int64(len(encryptedData)),
}, nil
}
// downloadBlob downloads and decrypts a blob
func (v *Vaultik) downloadBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) ([]byte, error) {
result, err := v.FetchAndDecryptBlob(ctx, blobHash, expectedSize, identity)
@ -537,7 +622,7 @@ func (v *Vaultik) verifyRestoredFiles(
// Create progress bar if output is a terminal
var bar *progressbar.ProgressBar
if isTerminal() {
if isTerminal(v.Stderr) {
bar = progressbar.NewOptions64(
totalBytes,
progressbar.OptionSetDescription("Verifying"),
@ -645,7 +730,11 @@ func (v *Vaultik) verifyFile(
return bytesVerified, nil
}
// isTerminal returns true if stdout is a terminal
func isTerminal() bool {
return term.IsTerminal(int(os.Stdout.Fd()))
// isTerminal returns true if the given writer is connected to a terminal.
// Returns false if the writer does not expose a file descriptor (e.g. in tests).
func isTerminal(w io.Writer) bool {
if f, ok := w.(*os.File); ok {
return term.IsTerminal(int(f.Fd()))
}
return false
}

View File

@ -90,24 +90,6 @@ func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error {
v.printfStdout("\nAll %d snapshots completed in %s\n", len(snapshotNames), time.Since(overallStartTime).Round(time.Second))
}
// Prune old snapshots and unreferenced blobs if --prune was specified
if opts.Prune {
log.Info("Pruning enabled - deleting old snapshots and unreferenced blobs")
v.printlnStdout("\nPruning old snapshots (keeping latest)...")
if err := v.PurgeSnapshots(true, "", true); err != nil {
return fmt.Errorf("prune: purging old snapshots: %w", err)
}
v.printlnStdout("Pruning unreferenced blobs...")
if err := v.PruneBlobs(&PruneOptions{Force: true}); err != nil {
return fmt.Errorf("prune: removing unreferenced blobs: %w", err)
}
log.Info("Pruning complete")
}
return nil
}
@ -324,6 +306,11 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna
}
v.printfStdout("Duration: %s\n", formatDuration(snapshotDuration))
if opts.Prune {
log.Info("Pruning enabled - will delete old snapshots after snapshot")
// TODO: Implement pruning
}
return nil
}
@ -1017,16 +1004,16 @@ func (v *Vaultik) deleteSnapshotFromLocalDB(snapshotID string) error {
// Delete related records first to avoid foreign key constraints
if err := v.Repositories.Snapshots.DeleteSnapshotFiles(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot files for %s: %w", snapshotID, err)
log.Error("Failed to delete snapshot files", "snapshot_id", snapshotID, "error", err)
}
if err := v.Repositories.Snapshots.DeleteSnapshotBlobs(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot blobs for %s: %w", snapshotID, err)
log.Error("Failed to delete snapshot blobs", "snapshot_id", snapshotID, "error", err)
}
if err := v.Repositories.Snapshots.DeleteSnapshotUploads(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot uploads for %s: %w", snapshotID, err)
log.Error("Failed to delete snapshot uploads", "snapshot_id", snapshotID, "error", err)
}
if err := v.Repositories.Snapshots.Delete(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot record %s: %w", snapshotID, err)
log.Error("Failed to delete snapshot record", "snapshot_id", snapshotID, "error", err)
}
return nil

View File

@ -1,23 +0,0 @@
package vaultik
import (
"testing"
)
// TestSnapshotCreateOptions_PruneFlag verifies the Prune field exists on
// SnapshotCreateOptions and can be set.
func TestSnapshotCreateOptions_PruneFlag(t *testing.T) {
opts := &SnapshotCreateOptions{
Prune: true,
}
if !opts.Prune {
t.Error("Expected Prune to be true")
}
opts2 := &SnapshotCreateOptions{
Prune: false,
}
if opts2.Prune {
t.Error("Expected Prune to be false")
}
}

View File

@ -129,7 +129,7 @@ func (v *Vaultik) GetFilesystem() afero.Fs {
return v.Fs
}
// printfStdout writes formatted output to stdout.
// printfStdout writes formatted output to stdout for user-facing messages.
func (v *Vaultik) printfStdout(format string, args ...any) {
_, _ = fmt.Fprintf(v.Stdout, format, args...)
}
@ -139,11 +139,28 @@ func (v *Vaultik) printlnStdout(args ...any) {
_, _ = fmt.Fprintln(v.Stdout, args...)
}
// FetchBlob downloads a blob from storage and returns a reader for the encrypted data.
func (v *Vaultik) FetchBlob(ctx context.Context, blobHash string, expectedSize int64) (io.ReadCloser, int64, error) {
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash)
reader, err := v.Storage.Get(ctx, blobPath)
if err != nil {
return nil, 0, fmt.Errorf("downloading blob: %w", err)
}
return reader, expectedSize, nil
}
// printfStderr writes formatted output to stderr.
func (v *Vaultik) printfStderr(format string, args ...any) {
_, _ = fmt.Fprintf(v.Stderr, format, args...)
}
// printlnStderr writes a line to stderr.
func (v *Vaultik) printlnStderr(args ...any) {
_, _ = fmt.Fprintln(v.Stderr, args...)
}
// scanStdin reads a line of input from stdin.
func (v *Vaultik) scanStdin(a ...any) (int, error) {
return fmt.Fscanln(v.Stdin, a...)