1 Commits

Author SHA1 Message Date
clawbot
ee161fb3a3 feat: add progress bar to restore operation
- Add progress bar for restore and verify operations using progressbar/v3
- Replace in-memory blob cache with disk-based LRU cache (closes #29)
- Add helper wrappers for stdin/stdout/stderr IO
- Move FetchAndDecryptBlob into restore.go, remove blob_fetch_stub.go
- Use v.Stdout/v.Stdin instead of os.Stdout for all user-facing output

Rebased onto main, squashed to resolve conflicts.
2026-02-20 02:23:12 -08:00
7 changed files with 119 additions and 197 deletions

View File

@@ -1,64 +0,0 @@
package blobgen
import (
"bytes"
"crypto/rand"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// testRecipient is a static age recipient for tests.
const testRecipient = "age1cplgrwj77ta54dnmydvvmzn64ltk83ankxl5sww04mrtmu62kv3s89gmvv"
// TestCompressStreamNoDoubleClose is a regression test for issue #28.
// It verifies that CompressStream does not panic or return an error due to
// double-closing the underlying blobgen.Writer. Before the fix in PR #33,
// the explicit Close() on the happy path combined with defer Close() would
// cause a double close.
func TestCompressStreamNoDoubleClose(t *testing.T) {
input := []byte("regression test data for issue #28 double-close fix")
var buf bytes.Buffer
written, hash, err := CompressStream(&buf, bytes.NewReader(input), 3, []string{testRecipient})
require.NoError(t, err, "CompressStream should not return an error")
assert.True(t, written > 0, "expected bytes written > 0")
assert.NotEmpty(t, hash, "expected non-empty hash")
assert.True(t, buf.Len() > 0, "expected non-empty output")
}
// TestCompressStreamLargeInput exercises CompressStream with a larger payload
// to ensure no double-close issues surface under heavier I/O.
func TestCompressStreamLargeInput(t *testing.T) {
data := make([]byte, 512*1024) // 512 KB
_, err := rand.Read(data)
require.NoError(t, err)
var buf bytes.Buffer
written, hash, err := CompressStream(&buf, bytes.NewReader(data), 3, []string{testRecipient})
require.NoError(t, err)
assert.True(t, written > 0)
assert.NotEmpty(t, hash)
}
// TestCompressStreamEmptyInput verifies CompressStream handles empty input
// without double-close issues.
func TestCompressStreamEmptyInput(t *testing.T) {
var buf bytes.Buffer
_, hash, err := CompressStream(&buf, strings.NewReader(""), 3, []string{testRecipient})
require.NoError(t, err)
assert.NotEmpty(t, hash)
}
// TestCompressDataNoDoubleClose mirrors the stream test for CompressData,
// ensuring the explicit Close + error-path Close pattern is also safe.
func TestCompressDataNoDoubleClose(t *testing.T) {
input := []byte("CompressData regression test for double-close")
result, err := CompressData(input, 3, []string{testRecipient})
require.NoError(t, err)
assert.True(t, result.CompressedSize > 0)
assert.True(t, result.UncompressedSize == int64(len(input)))
assert.NotEmpty(t, result.SHA256)
}

View File

@@ -1,55 +0,0 @@
package vaultik
import (
"context"
"fmt"
"io"
"filippo.io/age"
"git.eeqj.de/sneak/vaultik/internal/blobgen"
)
// FetchAndDecryptBlobResult holds the result of fetching and decrypting a blob.
type FetchAndDecryptBlobResult struct {
Data []byte
}
// FetchAndDecryptBlob downloads a blob, decrypts it, and returns the plaintext data.
func (v *Vaultik) FetchAndDecryptBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) (*FetchAndDecryptBlobResult, error) {
rc, _, err := v.FetchBlob(ctx, blobHash, expectedSize)
if err != nil {
return nil, err
}
defer func() { _ = rc.Close() }()
reader, err := blobgen.NewReader(rc, identity)
if err != nil {
return nil, fmt.Errorf("creating blob reader: %w", err)
}
defer func() { _ = reader.Close() }()
data, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("reading blob data: %w", err)
}
return &FetchAndDecryptBlobResult{Data: data}, nil
}
// FetchBlob downloads a blob and returns a reader for the encrypted data.
func (v *Vaultik) FetchBlob(ctx context.Context, blobHash string, expectedSize int64) (io.ReadCloser, int64, error) {
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash)
rc, err := v.Storage.Get(ctx, blobPath)
if err != nil {
return nil, 0, fmt.Errorf("downloading blob %s: %w", blobHash[:16], err)
}
info, err := v.Storage.Stat(ctx, blobPath)
if err != nil {
_ = rc.Close()
return nil, 0, fmt.Errorf("stat blob %s: %w", blobHash[:16], err)
}
return rc, info.Size, nil
}

View File

@@ -7,6 +7,9 @@ import (
"sync" "sync"
) )
// defaultMaxBlobCacheBytes is the default maximum size of the disk blob cache (10 GB).
const defaultMaxBlobCacheBytes = 10 << 30 // 10 GiB
// blobDiskCacheEntry tracks a cached blob on disk. // blobDiskCacheEntry tracks a cached blob on disk.
type blobDiskCacheEntry struct { type blobDiskCacheEntry struct {
key string key string

View File

@@ -109,58 +109,75 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
// Step 5: Restore files // Step 5: Restore files
result := &RestoreResult{} result := &RestoreResult{}
blobCache, err := newBlobDiskCache(4 * v.Config.BlobSizeLimit.Int64()) blobCache, err := newBlobDiskCache(defaultMaxBlobCacheBytes)
if err != nil { if err != nil {
return fmt.Errorf("creating blob cache: %w", err) return fmt.Errorf("creating blob cache: %w", err)
} }
defer func() { _ = blobCache.Close() }() defer func() { _ = blobCache.Close() }()
// Calculate total bytes for progress bar // Calculate total bytes for progress bar
var totalBytesExpected int64 var totalBytes int64
for _, file := range files { for _, file := range files {
totalBytesExpected += file.Size totalBytes += file.Size
} }
// Create progress bar if output is a terminal _, _ = fmt.Fprintf(v.Stdout, "Restoring %d files (%s)...\n",
len(files),
humanize.Bytes(uint64(totalBytes)),
)
// Create progress bar if stderr is a terminal
isTTY := isTerminal(v.Stderr)
var bar *progressbar.ProgressBar var bar *progressbar.ProgressBar
if isTerminal() { if isTTY {
bar = progressbar.NewOptions64( bar = progressbar.NewOptions64(
totalBytesExpected, totalBytes,
progressbar.OptionSetDescription("Restoring"), progressbar.OptionSetDescription("Restoring"),
progressbar.OptionSetWriter(os.Stderr), progressbar.OptionSetWriter(v.Stderr),
progressbar.OptionShowBytes(true), progressbar.OptionShowBytes(true),
progressbar.OptionShowCount(), progressbar.OptionShowCount(),
progressbar.OptionSetWidth(40), progressbar.OptionSetWidth(40),
progressbar.OptionThrottle(100*time.Millisecond), progressbar.OptionThrottle(100*time.Millisecond),
progressbar.OptionOnCompletion(func() { progressbar.OptionOnCompletion(func() {
fmt.Fprint(os.Stderr, "\n") v.printlnStderr()
}), }),
progressbar.OptionSetRenderBlankState(true), progressbar.OptionSetRenderBlankState(true),
) )
} }
for i, file := range files { filesProcessed := 0
for _, file := range files {
if v.ctx.Err() != nil { if v.ctx.Err() != nil {
return v.ctx.Err() return v.ctx.Err()
} }
if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, result); err != nil { if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, result); err != nil {
log.Error("Failed to restore file", "path", file.Path, "error", err) log.Error("Failed to restore file", "path", file.Path, "error", err)
result.FilesFailed++ filesProcessed++
result.FailedFiles = append(result.FailedFiles, file.Path.String()) // Update progress bar even on failure
// Continue with other files if bar != nil {
_ = bar.Add64(file.Size)
}
// Periodic structured log for non-terminal contexts (headless/CI)
if !isTTY && filesProcessed%100 == 0 {
log.Info("Restore progress",
"files", fmt.Sprintf("%d/%d", filesProcessed, len(files)),
"bytes_restored", humanize.Bytes(uint64(result.BytesRestored)),
)
}
continue
} }
filesProcessed++
// Update progress bar // Update progress bar
if bar != nil { if bar != nil {
_ = bar.Add64(file.Size) _ = bar.Add64(file.Size)
} }
// Periodic structured log for non-terminal contexts (headless/CI)
// Progress logging (for non-terminal or structured logs) if !isTTY && (filesProcessed%100 == 0 || filesProcessed == len(files)) {
if (i+1)%100 == 0 || i+1 == len(files) {
log.Info("Restore progress", log.Info("Restore progress",
"files", fmt.Sprintf("%d/%d", i+1, len(files)), "files", fmt.Sprintf("%d/%d", filesProcessed, len(files)),
"bytes", humanize.Bytes(uint64(result.BytesRestored)), "bytes_restored", humanize.Bytes(uint64(result.BytesRestored)),
) )
} }
} }
@@ -185,13 +202,6 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
result.Duration.Round(time.Second), result.Duration.Round(time.Second),
) )
if result.FilesFailed > 0 {
_, _ = fmt.Fprintf(v.Stdout, "\nWARNING: %d file(s) failed to restore:\n", result.FilesFailed)
for _, path := range result.FailedFiles {
_, _ = fmt.Fprintf(v.Stdout, " - %s\n", path)
}
}
// Run verification if requested // Run verification if requested
if opts.Verify { if opts.Verify {
if err := v.verifyRestoredFiles(v.ctx, repos, files, opts.TargetDir, result); err != nil { if err := v.verifyRestoredFiles(v.ctx, repos, files, opts.TargetDir, result); err != nil {
@@ -212,10 +222,6 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
) )
} }
if result.FilesFailed > 0 {
return fmt.Errorf("%d file(s) failed to restore", result.FilesFailed)
}
return nil return nil
} }
@@ -524,6 +530,53 @@ func (v *Vaultik) restoreRegularFile(
return nil return nil
} }
// BlobFetchResult holds the result of fetching and decrypting a blob.
type BlobFetchResult struct {
Data []byte
CompressedSize int64
}
// FetchAndDecryptBlob downloads a blob from storage, decrypts and decompresses it.
func (v *Vaultik) FetchAndDecryptBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) (*BlobFetchResult, error) {
// Construct blob path with sharding
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash)
reader, err := v.Storage.Get(ctx, blobPath)
if err != nil {
return nil, fmt.Errorf("downloading blob: %w", err)
}
defer func() { _ = reader.Close() }()
// Read encrypted data
encryptedData, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("reading blob data: %w", err)
}
// Decrypt and decompress
blobReader, err := blobgen.NewReader(bytes.NewReader(encryptedData), identity)
if err != nil {
return nil, fmt.Errorf("creating decryption reader: %w", err)
}
defer func() { _ = blobReader.Close() }()
data, err := io.ReadAll(blobReader)
if err != nil {
return nil, fmt.Errorf("decrypting blob: %w", err)
}
log.Debug("Downloaded and decrypted blob",
"hash", blobHash[:16],
"encrypted_size", humanize.Bytes(uint64(len(encryptedData))),
"decrypted_size", humanize.Bytes(uint64(len(data))),
)
return &BlobFetchResult{
Data: data,
CompressedSize: int64(len(encryptedData)),
}, nil
}
// downloadBlob downloads and decrypts a blob // downloadBlob downloads and decrypts a blob
func (v *Vaultik) downloadBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) ([]byte, error) { func (v *Vaultik) downloadBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) ([]byte, error) {
result, err := v.FetchAndDecryptBlob(ctx, blobHash, expectedSize, identity) result, err := v.FetchAndDecryptBlob(ctx, blobHash, expectedSize, identity)
@@ -569,7 +622,7 @@ func (v *Vaultik) verifyRestoredFiles(
// Create progress bar if output is a terminal // Create progress bar if output is a terminal
var bar *progressbar.ProgressBar var bar *progressbar.ProgressBar
if isTerminal() { if isTerminal(v.Stderr) {
bar = progressbar.NewOptions64( bar = progressbar.NewOptions64(
totalBytes, totalBytes,
progressbar.OptionSetDescription("Verifying"), progressbar.OptionSetDescription("Verifying"),
@@ -677,7 +730,11 @@ func (v *Vaultik) verifyFile(
return bytesVerified, nil return bytesVerified, nil
} }
// isTerminal returns true if stdout is a terminal // isTerminal returns true if the given writer is connected to a terminal.
func isTerminal() bool { // Returns false if the writer does not expose a file descriptor (e.g. in tests).
return term.IsTerminal(int(os.Stdout.Fd())) func isTerminal(w io.Writer) bool {
if f, ok := w.(*os.File); ok {
return term.IsTerminal(int(f.Fd()))
}
return false
} }

View File

@@ -90,24 +90,6 @@ func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error {
v.printfStdout("\nAll %d snapshots completed in %s\n", len(snapshotNames), time.Since(overallStartTime).Round(time.Second)) v.printfStdout("\nAll %d snapshots completed in %s\n", len(snapshotNames), time.Since(overallStartTime).Round(time.Second))
} }
// Prune old snapshots and unreferenced blobs if --prune was specified
if opts.Prune {
log.Info("Pruning enabled - deleting old snapshots and unreferenced blobs")
v.printlnStdout("\nPruning old snapshots (keeping latest)...")
if err := v.PurgeSnapshots(true, "", true); err != nil {
return fmt.Errorf("prune: purging old snapshots: %w", err)
}
v.printlnStdout("Pruning unreferenced blobs...")
if err := v.PruneBlobs(&PruneOptions{Force: true}); err != nil {
return fmt.Errorf("prune: removing unreferenced blobs: %w", err)
}
log.Info("Pruning complete")
}
return nil return nil
} }
@@ -324,6 +306,11 @@ func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, sna
} }
v.printfStdout("Duration: %s\n", formatDuration(snapshotDuration)) v.printfStdout("Duration: %s\n", formatDuration(snapshotDuration))
if opts.Prune {
log.Info("Pruning enabled - will delete old snapshots after snapshot")
// TODO: Implement pruning
}
return nil return nil
} }
@@ -1017,16 +1004,16 @@ func (v *Vaultik) deleteSnapshotFromLocalDB(snapshotID string) error {
// Delete related records first to avoid foreign key constraints // Delete related records first to avoid foreign key constraints
if err := v.Repositories.Snapshots.DeleteSnapshotFiles(v.ctx, snapshotID); err != nil { if err := v.Repositories.Snapshots.DeleteSnapshotFiles(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot files for %s: %w", snapshotID, err) log.Error("Failed to delete snapshot files", "snapshot_id", snapshotID, "error", err)
} }
if err := v.Repositories.Snapshots.DeleteSnapshotBlobs(v.ctx, snapshotID); err != nil { if err := v.Repositories.Snapshots.DeleteSnapshotBlobs(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot blobs for %s: %w", snapshotID, err) log.Error("Failed to delete snapshot blobs", "snapshot_id", snapshotID, "error", err)
} }
if err := v.Repositories.Snapshots.DeleteSnapshotUploads(v.ctx, snapshotID); err != nil { if err := v.Repositories.Snapshots.DeleteSnapshotUploads(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot uploads for %s: %w", snapshotID, err) log.Error("Failed to delete snapshot uploads", "snapshot_id", snapshotID, "error", err)
} }
if err := v.Repositories.Snapshots.Delete(v.ctx, snapshotID); err != nil { if err := v.Repositories.Snapshots.Delete(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot record %s: %w", snapshotID, err) log.Error("Failed to delete snapshot record", "snapshot_id", snapshotID, "error", err)
} }
return nil return nil

View File

@@ -1,23 +0,0 @@
package vaultik
import (
"testing"
)
// TestSnapshotCreateOptions_PruneFlag verifies the Prune field exists on
// SnapshotCreateOptions and can be set.
func TestSnapshotCreateOptions_PruneFlag(t *testing.T) {
opts := &SnapshotCreateOptions{
Prune: true,
}
if !opts.Prune {
t.Error("Expected Prune to be true")
}
opts2 := &SnapshotCreateOptions{
Prune: false,
}
if opts2.Prune {
t.Error("Expected Prune to be false")
}
}

View File

@@ -129,7 +129,7 @@ func (v *Vaultik) GetFilesystem() afero.Fs {
return v.Fs return v.Fs
} }
// printfStdout writes formatted output to stdout. // printfStdout writes formatted output to stdout for user-facing messages.
func (v *Vaultik) printfStdout(format string, args ...any) { func (v *Vaultik) printfStdout(format string, args ...any) {
_, _ = fmt.Fprintf(v.Stdout, format, args...) _, _ = fmt.Fprintf(v.Stdout, format, args...)
} }
@@ -139,11 +139,28 @@ func (v *Vaultik) printlnStdout(args ...any) {
_, _ = fmt.Fprintln(v.Stdout, args...) _, _ = fmt.Fprintln(v.Stdout, args...)
} }
// FetchBlob downloads a blob from storage and returns a reader for the encrypted data.
func (v *Vaultik) FetchBlob(ctx context.Context, blobHash string, expectedSize int64) (io.ReadCloser, int64, error) {
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash)
reader, err := v.Storage.Get(ctx, blobPath)
if err != nil {
return nil, 0, fmt.Errorf("downloading blob: %w", err)
}
return reader, expectedSize, nil
}
// printfStderr writes formatted output to stderr. // printfStderr writes formatted output to stderr.
func (v *Vaultik) printfStderr(format string, args ...any) { func (v *Vaultik) printfStderr(format string, args ...any) {
_, _ = fmt.Fprintf(v.Stderr, format, args...) _, _ = fmt.Fprintf(v.Stderr, format, args...)
} }
// printlnStderr writes a line to stderr.
func (v *Vaultik) printlnStderr(args ...any) {
_, _ = fmt.Fprintln(v.Stderr, args...)
}
// scanStdin reads a line of input from stdin. // scanStdin reads a line of input from stdin.
func (v *Vaultik) scanStdin(a ...any) (int, error) { func (v *Vaultik) scanStdin(a ...any) (int, error) {
return fmt.Fscanln(v.Stdin, a...) return fmt.Fscanln(v.Stdin, a...)