Files
vaultik/internal/vaultik/snapshot.go
sneak 0e9c96c8b5 Add uncompressed-size and new-chunk-size columns to snapshot list
The remote snapshot table now shows the total plaintext size of all
chunks referenced by each snapshot, plus the plaintext size of chunks
newly referenced by that snapshot (chunks not in any earlier completed
snapshot known to the local DB). The latter is the marginal data
introduced by each backup — useful for spotting which snapshots
actually added bytes vs. dedup'd against prior state.

Both new columns are computed from the local database only. Snapshots
that exist in remote storage but not in the local DB show
"<remote only>" in those cells; their COMPRESSED SIZE column still
reflects the value fetched from the remote manifest.
2026-06-17 06:33:59 +02:00

1386 lines
43 KiB
Go

package vaultik
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
"github.com/dustin/go-humanize"
"golang.org/x/sync/errgroup"
"sneak.berlin/go/vaultik/internal/database"
"sneak.berlin/go/vaultik/internal/log"
"sneak.berlin/go/vaultik/internal/snapshot"
"sneak.berlin/go/vaultik/internal/types"
)
// SnapshotCreateOptions contains options for the snapshot create command
type SnapshotCreateOptions struct {
Cron bool
Prune bool
KeepNewerThan string // With --prune: keep snapshots newer than this duration (e.g. "4w"); default: keep only latest
SkipErrors bool // Skip file read errors (log them loudly but continue)
Snapshots []string // Optional list of snapshot names to process (empty = all)
}
// CreateSnapshot executes the snapshot creation operation
func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error {
overallStartTime := time.Now()
log.Info("Starting snapshot creation",
"version", v.Globals.Version,
"commit", v.Globals.Commit,
"index_path", v.Config.IndexPath,
)
// Clean up incomplete snapshots FIRST, before any scanning
// This is critical for data safety - see CleanupIncompleteSnapshots for details
hostname := v.Config.Hostname
if hostname == "" {
hostname, _ = os.Hostname()
}
// CRITICAL: This MUST succeed. If we fail to clean up incomplete snapshots,
// the deduplication logic will think files from the incomplete snapshot were
// already backed up and skip them, resulting in data loss.
//
// Prune the database before starting: delete incomplete snapshots and orphaned data.
// This ensures the database is consistent before we start a new snapshot.
// Since we use locking, only one vaultik instance accesses the DB at a time.
if _, err := v.PruneDatabase(); err != nil {
return fmt.Errorf("prune database: %w", err)
}
// Determine which snapshots to process
snapshotNames := opts.Snapshots
if len(snapshotNames) == 0 {
snapshotNames = v.Config.SnapshotNames()
} else {
// Validate requested snapshot names exist
for _, name := range snapshotNames {
if _, ok := v.Config.Snapshots[name]; !ok {
return fmt.Errorf("snapshot %q not found in config", name)
}
}
}
if len(snapshotNames) == 0 {
return fmt.Errorf("no snapshots configured")
}
// Process each named snapshot
for snapIdx, snapName := range snapshotNames {
if err := v.createNamedSnapshot(opts, hostname, snapName, snapIdx+1, len(snapshotNames)); err != nil {
return err
}
}
// Print overall summary if multiple snapshots
if len(snapshotNames) > 1 {
v.UI.Complete("All %d snapshots completed in %s.", len(snapshotNames), v.UI.Duration(time.Since(overallStartTime)))
}
if opts.Prune {
if err := v.runPostBackupPrune(snapshotNames, opts.KeepNewerThan); err != nil {
return fmt.Errorf("post-backup prune: %w", err)
}
}
if v.UI.WarningCount() > 0 {
v.UI.Complete("Finished (with %d warnings).", v.UI.WarningCount())
} else {
v.UI.Complete("Finished successfully.")
}
return nil
}
// runPostBackupPrune drops older snapshots of the given names and removes
// orphan blobs from remote storage. If keepNewerThan is set (e.g. "4w"),
// snapshots newer than that duration are kept. Otherwise only the latest
// snapshot of each name is kept.
func (v *Vaultik) runPostBackupPrune(snapshotNames []string, keepNewerThan string) error {
log.Info("Running post-backup prune", "snapshots", snapshotNames, "keep_newer_than", keepNewerThan)
v.UI.Begin("Running post-backup prune.")
purgeOpts := &SnapshotPurgeOptions{
Force: true,
Names: snapshotNames,
Quiet: true,
}
if keepNewerThan != "" {
purgeOpts.OlderThan = keepNewerThan
} else {
purgeOpts.KeepLatest = true
}
if err := v.PurgeSnapshotsWithOptions(purgeOpts); err != nil {
return fmt.Errorf("purging old snapshots: %w", err)
}
if err := v.PruneBlobs(&PruneOptions{Force: true}); err != nil {
return fmt.Errorf("pruning orphaned blobs: %w", err)
}
return nil
}
// snapshotStats tracks aggregate statistics across directory scans
type snapshotStats struct {
totalFiles int
totalBytes int64
totalChunks int
totalBlobs int
totalBytesSkipped int64
totalFilesSkipped int
totalFilesDeleted int
totalBytesDeleted int64
totalBytesUploaded int64
totalBlobsUploaded int
uploadDuration time.Duration
}
// createNamedSnapshot creates a single named snapshot
func (v *Vaultik) createNamedSnapshot(opts *SnapshotCreateOptions, hostname, snapName string, idx, total int) error {
snapshotStartTime := time.Now()
if total > 1 {
v.UI.Info("Snapshot %d/%d: %s.", idx, total, snapName)
}
resolvedDirs, err := v.resolveSnapshotPaths(snapName)
if err != nil {
return err
}
scanner := v.ScannerFactory(snapshot.ScannerParams{
EnableProgress: !opts.Cron,
UI: v.UI,
Fs: v.Fs,
Exclude: v.Config.GetExcludes(snapName),
SkipErrors: opts.SkipErrors,
})
snapshotID, err := v.SnapshotManager.CreateSnapshotWithName(v.ctx, hostname, snapName, v.Globals.Version, v.Globals.Commit)
if err != nil {
return fmt.Errorf("creating snapshot: %w", err)
}
log.Info("Beginning snapshot", "snapshot_id", snapshotID, "name", snapName)
v.UI.Begin("Creating snapshot %s.", v.UI.Snapshot(snapshotID))
stats, err := v.scanAllDirectories(scanner, resolvedDirs, snapshotID)
if err != nil {
return err
}
v.collectUploadStats(scanner, stats)
if err := v.finalizeSnapshotMetadata(snapshotID, stats); err != nil {
return err
}
log.Info("Snapshot complete",
"snapshot_id", snapshotID,
"name", snapName,
"files", stats.totalFiles,
"blobs_uploaded", stats.totalBlobsUploaded,
"bytes_uploaded", stats.totalBytesUploaded,
"duration", time.Since(snapshotStartTime))
v.printSnapshotSummary(snapshotID, snapshotStartTime, stats)
return nil
}
// resolveSnapshotPaths resolves source directories to absolute paths with symlink resolution
func (v *Vaultik) resolveSnapshotPaths(snapName string) ([]string, error) {
snapConfig := v.Config.Snapshots[snapName]
resolvedDirs := make([]string, 0, len(snapConfig.Paths))
for _, dir := range snapConfig.Paths {
absPath, err := filepath.Abs(dir)
if err != nil {
return nil, fmt.Errorf("failed to resolve absolute path for %s: %w", dir, err)
}
resolvedPath, err := filepath.EvalSymlinks(absPath)
if err != nil {
if os.IsNotExist(err) {
resolvedPath = absPath
} else {
return nil, fmt.Errorf("failed to resolve symlinks for %s: %w", absPath, err)
}
}
resolvedDirs = append(resolvedDirs, resolvedPath)
}
return resolvedDirs, nil
}
// scanAllDirectories runs the scanner on each resolved directory and accumulates stats
func (v *Vaultik) scanAllDirectories(scanner *snapshot.Scanner, resolvedDirs []string, snapshotID string) (*snapshotStats, error) {
stats := &snapshotStats{}
for i, dir := range resolvedDirs {
select {
case <-v.ctx.Done():
log.Info("Snapshot creation cancelled")
return nil, v.ctx.Err()
default:
}
log.Info("Scanning directory", "path", dir)
v.UI.Begin("Enumerating snapshot source files in %s (%d of %d).", v.UI.Path(dir), i+1, len(resolvedDirs))
result, err := scanner.Scan(v.ctx, dir, snapshotID)
if err != nil {
return nil, fmt.Errorf("failed to scan %s: %w", dir, err)
}
stats.totalFiles += result.FilesScanned
stats.totalBytes += result.BytesScanned
stats.totalChunks += result.ChunksCreated
stats.totalBlobs += result.BlobsCreated
stats.totalFilesSkipped += result.FilesSkipped
stats.totalBytesSkipped += result.BytesSkipped
stats.totalFilesDeleted += result.FilesDeleted
stats.totalBytesDeleted += result.BytesDeleted
log.Info("Directory scan complete",
"path", dir,
"files", result.FilesScanned,
"files_skipped", result.FilesSkipped,
"bytes", result.BytesScanned,
"bytes_skipped", result.BytesSkipped,
"chunks", result.ChunksCreated,
"blobs", result.BlobsCreated,
"duration", result.EndTime.Sub(result.StartTime))
}
return stats, nil
}
// collectUploadStats gathers upload statistics from the scanner's progress reporter
func (v *Vaultik) collectUploadStats(scanner *snapshot.Scanner, stats *snapshotStats) {
if s := scanner.GetProgress(); s != nil {
progressStats := s.GetStats()
stats.totalBytesUploaded = progressStats.BytesUploaded.Load()
stats.totalBlobsUploaded = int(progressStats.BlobsUploaded.Load())
stats.uploadDuration = time.Duration(progressStats.UploadDurationMs.Load()) * time.Millisecond
}
}
// finalizeSnapshotMetadata updates stats, marks complete, and exports metadata
func (v *Vaultik) finalizeSnapshotMetadata(snapshotID string, stats *snapshotStats) error {
extStats := snapshot.ExtendedBackupStats{
BackupStats: snapshot.BackupStats{
FilesScanned: stats.totalFiles,
BytesScanned: stats.totalBytes,
ChunksCreated: stats.totalChunks,
BlobsCreated: stats.totalBlobs,
BytesUploaded: stats.totalBytesUploaded,
},
BlobUncompressedSize: 0,
CompressionLevel: v.Config.CompressionLevel,
UploadDurationMs: stats.uploadDuration.Milliseconds(),
}
if err := v.SnapshotManager.UpdateSnapshotStatsExtended(v.ctx, snapshotID, extStats); err != nil {
return fmt.Errorf("updating snapshot stats: %w", err)
}
if err := v.SnapshotManager.CompleteSnapshot(v.ctx, snapshotID); err != nil {
return fmt.Errorf("completing snapshot: %w", err)
}
if err := v.SnapshotManager.ExportSnapshotMetadata(v.ctx, v.Config.IndexPath, snapshotID); err != nil {
return fmt.Errorf("exporting snapshot metadata: %w", err)
}
return nil
}
// uploadSpeed returns the average network upload rate as a colorized
// bits/sec string, or "N/A" when there's no usable data.
func (v *Vaultik) uploadSpeed(bytesUploaded int64, duration time.Duration) string {
if bytesUploaded <= 0 || duration <= 0 {
return v.UI.Speed(0)
}
return v.UI.Speed(float64(bytesUploaded) / duration.Seconds())
}
// printSnapshotSummary prints the comprehensive snapshot completion summary
func (v *Vaultik) printSnapshotSummary(snapshotID string, startTime time.Time, stats *snapshotStats) {
snapshotDuration := time.Since(startTime)
totalFilesChanged := stats.totalFiles - stats.totalFilesSkipped
totalBytesAll := stats.totalBytes + stats.totalBytesSkipped
// Get total blob sizes from database
totalBlobSizeCompressed, totalBlobSizeUncompressed := v.getSnapshotBlobSizes(snapshotID)
var compressionRatio float64
if totalBlobSizeUncompressed > 0 {
compressionRatio = float64(totalBlobSizeCompressed) / float64(totalBlobSizeUncompressed)
} else {
compressionRatio = 1.0
}
v.UI.Complete("Created snapshot %s.", v.UI.Snapshot(snapshotID))
filesMsg := fmt.Sprintf("Files: %s examined, %s backed up, %s unchanged",
v.UI.Count(stats.totalFiles),
v.UI.Count(totalFilesChanged),
v.UI.Count(stats.totalFilesSkipped))
if stats.totalFilesDeleted > 0 {
filesMsg += fmt.Sprintf(", %s deleted", v.UI.Count(stats.totalFilesDeleted))
}
v.UI.Detail("%s.", filesMsg)
dataMsg := fmt.Sprintf("Data: %s total (%s backed up)",
v.UI.Size(totalBytesAll),
v.UI.Size(stats.totalBytes))
if stats.totalBytesDeleted > 0 {
dataMsg += fmt.Sprintf(", %s deleted", v.UI.Size(stats.totalBytesDeleted))
}
v.UI.Detail("%s.", dataMsg)
if stats.totalBlobsUploaded > 0 {
v.UI.Detail("Storage: %s compressed from %s (%.2fx ratio).",
v.UI.Size(totalBlobSizeCompressed),
v.UI.Size(totalBlobSizeUncompressed),
compressionRatio)
v.UI.Detail("Upload: %d blobs, %s in %s (%s).",
stats.totalBlobsUploaded,
v.UI.Size(stats.totalBytesUploaded),
v.UI.Duration(stats.uploadDuration),
v.uploadSpeed(stats.totalBytesUploaded, stats.uploadDuration))
}
v.UI.Detail("Snapshot create duration: %s.", v.UI.Duration(snapshotDuration))
}
// getSnapshotBlobSizes returns total compressed and uncompressed blob sizes for a snapshot
func (v *Vaultik) getSnapshotBlobSizes(snapshotID string) (compressed int64, uncompressed int64) {
blobHashes, err := v.Repositories.Snapshots.GetBlobHashes(v.ctx, snapshotID)
if err != nil {
return 0, 0
}
for _, hash := range blobHashes {
if blob, err := v.Repositories.Blobs.GetByHash(v.ctx, hash); err == nil && blob != nil {
compressed += blob.CompressedSize
uncompressed += blob.UncompressedSize
}
}
return compressed, uncompressed
}
// ListSnapshots lists all snapshots
func (v *Vaultik) ListSnapshots(jsonOutput bool) error {
log.Info("Listing snapshots")
remoteSnapshots, err := v.listRemoteSnapshotIDs()
if err != nil {
return err
}
localSnapshotMap, err := v.reconcileLocalWithRemote(remoteSnapshots)
if err != nil {
return err
}
snapshots, err := v.buildSnapshotInfoList(remoteSnapshots, localSnapshotMap)
if err != nil {
return err
}
// Sort by timestamp (newest first)
sort.Slice(snapshots, func(i, j int) bool {
return snapshots[i].Timestamp.After(snapshots[j].Timestamp)
})
if jsonOutput {
encoder := json.NewEncoder(v.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(snapshots)
}
if err := v.printSnapshotTable(snapshots); err != nil {
return err
}
// Warn about local snapshots that don't exist in remote storage.
var stale []string
for id := range localSnapshotMap {
if !remoteSnapshots[id] {
stale = append(stale, id)
}
}
if len(stale) > 0 {
v.UI.Warning("%d local snapshot record(s) not found in backup destination store:", len(stale))
for _, id := range stale {
v.UI.Info("%s", v.UI.Snapshot(id))
}
v.UI.Info("Run 'vaultik snapshot cleanup' to remove stale local records.")
}
return nil
}
// listRemoteSnapshotIDs returns a set of snapshot IDs found in remote storage
func (v *Vaultik) listRemoteSnapshotIDs() (map[string]bool, error) {
remoteSnapshots := make(map[string]bool)
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
for object := range objectCh {
if object.Err != nil {
return nil, fmt.Errorf("listing remote snapshots: %w", object.Err)
}
parts := strings.Split(object.Key, "/")
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
if strings.HasPrefix(parts[1], ".") {
continue
}
remoteSnapshots[parts[1]] = true
}
}
return remoteSnapshots, nil
}
// reconcileLocalWithRemote builds a map of local snapshots keyed by ID for cross-referencing with remote
func (v *Vaultik) reconcileLocalWithRemote(remoteSnapshots map[string]bool) (map[string]*database.Snapshot, error) {
localSnapshots, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000)
if err != nil {
return nil, fmt.Errorf("listing local snapshots: %w", err)
}
localSnapshotMap := make(map[string]*database.Snapshot)
for _, s := range localSnapshots {
localSnapshotMap[s.ID.String()] = s
}
return localSnapshotMap, nil
}
// buildSnapshotInfoList constructs SnapshotInfo entries from remote IDs and local data
func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSnapshotMap map[string]*database.Snapshot) ([]SnapshotInfo, error) {
snapshots := make([]SnapshotInfo, 0, len(remoteSnapshots))
// remoteOnly collects snapshot IDs that need a manifest download.
var remoteOnly []string
for snapshotID := range remoteSnapshots {
if localSnap, exists := localSnapshotMap[snapshotID]; exists && localSnap.CompletedAt != nil {
totalSize, err := v.Repositories.Snapshots.GetSnapshotTotalCompressedSize(v.ctx, snapshotID)
if err != nil {
log.Warn("Failed to get total compressed size", "id", snapshotID, "error", err)
totalSize = localSnap.BlobSize
}
uncompressedSize, err := v.Repositories.Snapshots.GetSnapshotUncompressedChunkSize(v.ctx, snapshotID)
if err != nil {
log.Warn("Failed to get uncompressed chunk size", "id", snapshotID, "error", err)
}
newChunkSize, err := v.Repositories.Snapshots.GetSnapshotNewChunkSize(v.ctx, snapshotID)
if err != nil {
log.Warn("Failed to get new chunk size", "id", snapshotID, "error", err)
}
snapshots = append(snapshots, SnapshotInfo{
ID: localSnap.ID,
Timestamp: localSnap.StartedAt,
CompressedSize: totalSize,
UncompressedSize: uncompressedSize,
NewChunkSize: newChunkSize,
LocallyTracked: true,
})
} else {
timestamp, err := parseSnapshotTimestamp(snapshotID)
if err != nil {
log.Warn("Failed to parse snapshot timestamp", "id", snapshotID, "error", err)
continue
}
// Pre-add with zero size; will be filled by concurrent downloads.
snapshots = append(snapshots, SnapshotInfo{
ID: types.SnapshotID(snapshotID),
Timestamp: timestamp,
CompressedSize: 0,
LocallyTracked: false,
})
remoteOnly = append(remoteOnly, snapshotID)
}
}
// Download manifests concurrently for remote-only snapshots.
if len(remoteOnly) > 0 {
// maxConcurrentManifestDownloads bounds parallel manifest fetches to
// avoid overwhelming the S3 endpoint while still being much faster
// than serial downloads.
const maxConcurrentManifestDownloads = 10
type manifestResult struct {
snapshotID string
size int64
}
var (
mu sync.Mutex
results []manifestResult
)
g, gctx := errgroup.WithContext(v.ctx)
g.SetLimit(maxConcurrentManifestDownloads)
for _, sid := range remoteOnly {
g.Go(func() error {
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", sid)
reader, err := v.Storage.Get(gctx, manifestPath)
if err != nil {
return fmt.Errorf("downloading manifest for %s: %w", sid, err)
}
defer func() { _ = reader.Close() }()
manifest, err := snapshot.DecodeManifest(reader)
if err != nil {
return fmt.Errorf("decoding manifest for %s: %w", sid, err)
}
mu.Lock()
results = append(results, manifestResult{
snapshotID: sid,
size: manifest.TotalCompressedSize,
})
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, fmt.Errorf("fetching manifest sizes: %w", err)
}
// Build a lookup from results and patch the pre-added entries.
sizeMap := make(map[string]int64, len(results))
for _, r := range results {
sizeMap[r.snapshotID] = r.size
}
for i := range snapshots {
if sz, ok := sizeMap[string(snapshots[i].ID)]; ok {
snapshots[i].CompressedSize = sz
}
}
}
return snapshots, nil
}
// printSnapshotTable renders the snapshot list as a formatted table
func (v *Vaultik) printSnapshotTable(snapshots []SnapshotInfo) error {
w := tabwriter.NewWriter(v.Stdout, 0, 0, 3, ' ', 0)
if _, err := fmt.Fprintln(w, "CONFIGURED SNAPSHOTS:"); err != nil {
return err
}
if _, err := fmt.Fprintln(w, "NAME\tPATHS"); err != nil {
return err
}
if _, err := fmt.Fprintln(w, "────\t─────"); err != nil {
return err
}
for _, name := range v.Config.SnapshotNames() {
snap := v.Config.Snapshots[name]
paths := strings.Join(snap.Paths, ", ")
if _, err := fmt.Fprintf(w, "%s\t%s\n", name, paths); err != nil {
return err
}
}
if _, err := fmt.Fprintln(w); err != nil {
return err
}
if _, err := fmt.Fprintln(w, "REMOTE SNAPSHOTS:"); err != nil {
return err
}
if _, err := fmt.Fprintln(w, "SNAPSHOT ID\tTIMESTAMP\tCOMPRESSED SIZE\tUNCOMPRESSED SIZE\tNEW CHUNK SIZE"); err != nil {
return err
}
if _, err := fmt.Fprintln(w, "───────────\t─────────\t───────────────\t─────────────────\t──────────────"); err != nil {
return err
}
const remoteOnlyCell = "<remote only>"
for _, snap := range snapshots {
uncompressed := remoteOnlyCell
newChunks := remoteOnlyCell
if snap.LocallyTracked {
uncompressed = formatBytes(snap.UncompressedSize)
newChunks = formatBytes(snap.NewChunkSize)
}
if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
snap.ID,
snap.Timestamp.Format("2006-01-02 15:04:05"),
formatBytes(snap.CompressedSize),
uncompressed,
newChunks); err != nil {
return err
}
}
return w.Flush()
}
// SnapshotPurgeOptions contains options for the snapshot purge command.
type SnapshotPurgeOptions struct {
KeepLatest bool // Keep only the most recent snapshot per name
OlderThan string // Drop snapshots older than this duration (e.g. "30d", "6m", "1y")
Force bool // Skip confirmation prompt
Names []string // If non-empty, only operate on snapshots with one of these names
Quiet bool // Suppress informational output (used by --prune flag)
}
// PurgeSnapshotsWithOptions removes old snapshots based on criteria.
// Retention is per-snapshot-name: KeepLatest keeps the latest of EACH configured
// snapshot name, not the latest globally. This prevents `home` and `system`
// snapshots from cannibalizing each other.
func (v *Vaultik) PurgeSnapshotsWithOptions(opts *SnapshotPurgeOptions) error {
// Sync with remote first
if err := v.syncWithRemote(); err != nil {
return fmt.Errorf("syncing with remote: %w", err)
}
// Get snapshots from local database
dbSnapshots, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000)
if err != nil {
return fmt.Errorf("listing snapshots: %w", err)
}
// Build name filter set if --snapshot was specified.
nameFilter := make(map[string]struct{}, len(opts.Names))
for _, n := range opts.Names {
nameFilter[n] = struct{}{}
}
// Collect completed snapshots, applying the name filter.
snapshots := make([]SnapshotInfo, 0, len(dbSnapshots))
for _, s := range dbSnapshots {
if s.CompletedAt == nil {
continue
}
if len(nameFilter) > 0 {
if _, ok := nameFilter[parseSnapshotName(s.ID.String())]; !ok {
continue
}
}
snapshots = append(snapshots, SnapshotInfo{
ID: s.ID,
Timestamp: s.StartedAt,
CompressedSize: s.BlobSize,
})
}
// Sort by timestamp (newest first)
sort.Slice(snapshots, func(i, j int) bool {
return snapshots[i].Timestamp.After(snapshots[j].Timestamp)
})
var toDelete []SnapshotInfo
if opts.KeepLatest {
// Keep the latest snapshot per snapshot name. Snapshots are sorted
// newest-first, so the first occurrence of each name is kept.
seen := make(map[string]bool)
for _, snap := range snapshots {
name := parseSnapshotName(snap.ID.String())
if seen[name] {
toDelete = append(toDelete, snap)
continue
}
seen[name] = true
}
} else if opts.OlderThan != "" {
duration, err := parseDuration(opts.OlderThan)
if err != nil {
return fmt.Errorf("invalid duration: %w", err)
}
cutoff := time.Now().UTC().Add(-duration)
for _, snap := range snapshots {
if snap.Timestamp.Before(cutoff) {
toDelete = append(toDelete, snap)
}
}
}
if len(toDelete) == 0 {
if !opts.Quiet {
v.printlnStdout("No snapshots to delete")
}
return nil
}
return v.confirmAndExecutePurge(toDelete, opts.Force, opts.Quiet)
}
// confirmAndExecutePurge shows deletion candidates, confirms with user, and deletes snapshots
func (v *Vaultik) confirmAndExecutePurge(toDelete []SnapshotInfo, force, quiet bool) error {
if !quiet {
v.printfStdout("The following snapshots will be deleted:\n\n")
for _, snap := range toDelete {
v.printfStdout(" %s (%s, %s)\n",
snap.ID,
snap.Timestamp.Format("2006-01-02 15:04:05"),
formatBytes(snap.CompressedSize))
}
}
// Confirm unless --force is used
if !force {
v.printfStdout("\nDelete %d snapshot(s)? [y/N] ", len(toDelete))
var confirm string
if _, err := v.scanStdin(&confirm); err != nil {
// Treat EOF or error as "no"
v.printlnStdout("Cancelled")
return nil
}
if strings.ToLower(confirm) != "y" {
v.printlnStdout("Cancelled")
return nil
}
} else if !quiet {
v.printfStdout("\nDeleting %d snapshot(s) (--force specified)\n", len(toDelete))
}
// Delete snapshots (both local and remote)
for _, snap := range toDelete {
snapshotID := snap.ID.String()
log.Info("Deleting snapshot", "id", snapshotID)
if err := v.deleteSnapshotFromLocalDB(snapshotID); err != nil {
log.Error("Failed to delete from local database", "snapshot_id", snapshotID, "error", err)
}
if err := v.deleteSnapshotFromRemote(snapshotID); err != nil {
return fmt.Errorf("deleting snapshot %s from remote: %w", snapshotID, err)
}
}
if !quiet {
v.printfStdout("Deleted %d snapshot(s)\n", len(toDelete))
v.printlnStdout("\nNote: Run 'vaultik prune' to clean up unreferenced blobs.")
}
return nil
}
// VerifySnapshot checks snapshot integrity
func (v *Vaultik) VerifySnapshot(snapshotID string, deep bool) error {
opts := &VerifyOptions{Deep: deep}
if deep {
return v.RunDeepVerify(snapshotID, opts)
}
return v.VerifySnapshotWithOptions(snapshotID, opts)
}
// VerifySnapshotWithOptions checks snapshot integrity with full options.
// Deep verification is delegated to RunDeepVerify so this function only
// implements the shallow (existence-only) path.
func (v *Vaultik) VerifySnapshotWithOptions(snapshotID string, opts *VerifyOptions) error {
if opts.Deep {
return v.RunDeepVerify(snapshotID, opts)
}
result := &VerifyResult{
SnapshotID: snapshotID,
Mode: "shallow",
}
v.printVerifyHeader(snapshotID, opts)
// Download and parse manifest
manifest, err := v.downloadManifest(snapshotID)
if err != nil {
if opts.JSON {
result.Status = "failed"
result.ErrorMessage = fmt.Sprintf("downloading manifest: %v", err)
return v.outputVerifyJSON(result)
}
return fmt.Errorf("downloading manifest: %w", err)
}
result.BlobCount = manifest.BlobCount
result.TotalSize = manifest.TotalCompressedSize
if !opts.JSON {
v.printfStdout("Snapshot information:\n")
v.printfStdout(" Blob count: %d\n", manifest.BlobCount)
v.printfStdout(" Total size: %s\n", humanize.Bytes(uint64(manifest.TotalCompressedSize)))
if manifest.Timestamp != "" {
if t, err := time.Parse(time.RFC3339, manifest.Timestamp); err == nil {
v.printfStdout(" Created: %s\n", t.Format("2006-01-02 15:04:05 MST"))
}
}
v.printlnStdout()
// Check each blob exists
v.printfStdout("Checking blob existence...\n")
}
result.Verified, result.Missing, result.MissingSize = v.verifyManifestBlobsExist(manifest, opts)
return v.formatVerifyResult(result, manifest, opts)
}
// printVerifyHeader prints the snapshot ID and parsed timestamp for verification output.
// Snapshot ID format: hostname[_name]_<RFC3339>
func (v *Vaultik) printVerifyHeader(snapshotID string, opts *VerifyOptions) {
var snapshotTime time.Time
if t, err := parseSnapshotTimestamp(snapshotID); err == nil {
snapshotTime = t
}
if !opts.JSON {
v.printfStdout("Verifying snapshot %s\n", snapshotID)
if !snapshotTime.IsZero() {
v.printfStdout("Snapshot time: %s\n", snapshotTime.Format("2006-01-02 15:04:05 MST"))
}
v.printlnStdout()
}
}
// verifyManifestBlobsExist checks that each blob in the manifest exists in storage
func (v *Vaultik) verifyManifestBlobsExist(manifest *snapshot.Manifest, opts *VerifyOptions) (verified, missing int, missingSize int64) {
for _, blob := range manifest.Blobs {
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blob.Hash[:2], blob.Hash[2:4], blob.Hash)
// Shallow: just check existence (deep verification is handled by RunDeepVerify)
_, err := v.Storage.Stat(v.ctx, blobPath)
if err != nil {
if !opts.JSON {
v.printfStdout(" Missing: %s (%s)\n", blob.Hash, humanize.Bytes(uint64(blob.CompressedSize)))
}
missing++
missingSize += blob.CompressedSize
} else {
verified++
}
}
return verified, missing, missingSize
}
// formatVerifyResult outputs the final verification results as JSON or human-readable text
func (v *Vaultik) formatVerifyResult(result *VerifyResult, manifest *snapshot.Manifest, opts *VerifyOptions) error {
if opts.JSON {
if result.Missing > 0 {
result.Status = "failed"
result.ErrorMessage = fmt.Sprintf("%d blobs are missing", result.Missing)
} else {
result.Status = "ok"
}
return v.outputVerifyJSON(result)
}
v.printfStdout("\nVerification complete:\n")
v.printfStdout(" Verified: %d blobs (%s)\n", result.Verified,
humanize.Bytes(uint64(manifest.TotalCompressedSize-result.MissingSize)))
if result.Missing > 0 {
v.printfStdout(" Missing: %d blobs (%s)\n", result.Missing, humanize.Bytes(uint64(result.MissingSize)))
} else {
v.printfStdout(" Missing: 0 blobs\n")
}
v.printfStdout(" Status: ")
if result.Missing > 0 {
v.printfStdout("FAILED - %d blobs are missing\n", result.Missing)
return fmt.Errorf("%d blobs are missing", result.Missing)
}
v.printfStdout("OK - All blobs verified\n")
return nil
}
// outputVerifyJSON outputs the verification result as JSON
func (v *Vaultik) outputVerifyJSON(result *VerifyResult) error {
encoder := json.NewEncoder(v.Stdout)
encoder.SetIndent("", " ")
if err := encoder.Encode(result); err != nil {
return fmt.Errorf("encoding JSON: %w", err)
}
if result.Status == "failed" {
return fmt.Errorf("verification failed: %s", result.ErrorMessage)
}
return nil
}
// CleanupLocalSnapshots removes local snapshot records that have no
// corresponding metadata in remote storage. These are typically left
// behind by incomplete or interrupted backups.
func (v *Vaultik) CleanupLocalSnapshots() error {
remoteSnapshots, err := v.listRemoteSnapshotIDs()
if err != nil {
return err
}
localSnapshots, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000)
if err != nil {
return fmt.Errorf("listing local snapshots: %w", err)
}
var removed int
for _, snap := range localSnapshots {
id := snap.ID.String()
if !remoteSnapshots[id] {
v.printfStdout("Removing stale local record: %s\n", id)
if err := v.deleteSnapshotFromLocalDB(id); err != nil {
log.Error("Failed to delete local snapshot", "snapshot_id", id, "error", err)
continue
}
removed++
}
}
if removed == 0 {
v.printlnStdout("No stale local snapshots found.")
} else {
v.printfStdout("Removed %d stale local snapshot record(s).\n", removed)
}
return nil
}
// Helper methods that were previously on SnapshotApp
func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error) {
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
reader, err := v.Storage.Get(v.ctx, manifestPath)
if err != nil {
return nil, err
}
defer func() { _ = reader.Close() }()
manifest, err := snapshot.DecodeManifest(reader)
if err != nil {
return nil, fmt.Errorf("decoding manifest: %w", err)
}
return manifest, nil
}
func (v *Vaultik) syncWithRemote() error {
log.Info("Syncing with remote snapshots")
// Get all remote snapshot IDs
remoteSnapshots := make(map[string]bool)
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
for object := range objectCh {
if object.Err != nil {
return fmt.Errorf("listing remote snapshots: %w", object.Err)
}
// Extract snapshot ID from paths like metadata/hostname-20240115-143052Z/
parts := strings.Split(object.Key, "/")
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
// Skip macOS resource fork files (._*) and other hidden files
if strings.HasPrefix(parts[1], ".") {
continue
}
remoteSnapshots[parts[1]] = true
}
}
log.Debug("Found remote snapshots", "count", len(remoteSnapshots))
// Get all local snapshots (use a high limit to get all)
localSnapshots, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000)
if err != nil {
return fmt.Errorf("listing local snapshots: %w", err)
}
// Remove local snapshots that don't exist remotely
removedCount := 0
for _, snapshot := range localSnapshots {
snapshotIDStr := snapshot.ID.String()
if !remoteSnapshots[snapshotIDStr] {
log.Info("Removing local snapshot not found in remote", "snapshot_id", snapshot.ID)
if err := v.deleteSnapshotFromLocalDB(snapshotIDStr); err != nil {
log.Error("Failed to delete local snapshot", "snapshot_id", snapshot.ID, "error", err)
} else {
removedCount++
}
}
}
if removedCount > 0 {
log.Info("Removed local snapshots not found in remote", "count", removedCount)
}
return nil
}
// RemoveOptions contains options for the snapshot remove command
type RemoveOptions struct {
Force bool
DryRun bool
JSON bool
Remote bool // Also remove metadata from remote storage
All bool // Remove all snapshots (requires Force)
}
// RemoveResult contains the result of a snapshot removal
type RemoveResult struct {
SnapshotID string `json:"snapshot_id,omitempty"`
SnapshotsRemoved []string `json:"snapshots_removed,omitempty"`
RemoteRemoved bool `json:"remote_removed,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
// RemoveSnapshot removes a snapshot from the local database and optionally from remote storage
// Note: This does NOT remove blobs. Use 'vaultik prune' to remove orphaned blobs.
func (v *Vaultik) RemoveSnapshot(snapshotID string, opts *RemoveOptions) (*RemoveResult, error) {
result := &RemoveResult{
SnapshotID: snapshotID,
}
if opts.DryRun {
result.DryRun = true
if !opts.JSON {
v.printfStdout("Would remove snapshot: %s\n", snapshotID)
if opts.Remote {
v.printlnStdout("Would also remove from remote storage")
}
v.printlnStdout("[Dry run - no changes made]")
}
if opts.JSON {
return result, v.outputRemoveJSON(result)
}
return result, nil
}
// Confirm unless --force is used (skip in JSON mode - require --force)
if !opts.Force && !opts.JSON {
if opts.Remote {
v.printfStdout("Remove snapshot '%s' from local database and remote storage? [y/N] ", snapshotID)
} else {
v.printfStdout("Remove snapshot '%s' from local database? [y/N] ", snapshotID)
}
var confirm string
if _, err := v.scanStdin(&confirm); err != nil {
v.printlnStdout("Cancelled")
return result, nil
}
if strings.ToLower(confirm) != "y" {
v.printlnStdout("Cancelled")
return result, nil
}
}
log.Info("Removing snapshot from local database", "snapshot_id", snapshotID)
// Remove from local database
if err := v.deleteSnapshotFromLocalDB(snapshotID); err != nil {
return result, fmt.Errorf("removing from local database: %w", err)
}
// If --remote, also remove from remote storage
if opts.Remote {
log.Info("Removing snapshot metadata from remote storage", "snapshot_id", snapshotID)
if err := v.deleteSnapshotFromRemote(snapshotID); err != nil {
return result, fmt.Errorf("removing from remote storage: %w", err)
}
result.RemoteRemoved = true
}
// Output result
if opts.JSON {
return result, v.outputRemoveJSON(result)
}
// Print summary
v.printfStdout("Removed snapshot '%s' from local database\n", snapshotID)
if opts.Remote {
v.printlnStdout("Removed snapshot metadata from remote storage")
v.printlnStdout("\nNote: Blobs were not removed. Run 'vaultik prune' to remove orphaned blobs.")
}
return result, nil
}
// RemoveAllSnapshots removes all snapshots from local database and optionally from remote
func (v *Vaultik) RemoveAllSnapshots(opts *RemoveOptions) (*RemoveResult, error) {
snapshotIDs, err := v.listAllRemoteSnapshotIDs()
if err != nil {
return nil, err
}
if len(snapshotIDs) == 0 {
if !opts.JSON {
v.printlnStdout("No snapshots found")
}
return &RemoveResult{}, nil
}
if opts.DryRun {
return v.handleRemoveAllDryRun(snapshotIDs, opts)
}
return v.executeRemoveAll(snapshotIDs, opts)
}
// listAllRemoteSnapshotIDs collects all unique snapshot IDs from remote storage
func (v *Vaultik) listAllRemoteSnapshotIDs() ([]string, error) {
log.Info("Listing all snapshots")
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
seen := make(map[string]bool)
var snapshotIDs []string
for object := range objectCh {
if object.Err != nil {
return nil, fmt.Errorf("listing remote snapshots: %w", object.Err)
}
parts := strings.Split(object.Key, "/")
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
// Skip macOS resource fork files (._*) and other hidden files
if strings.HasPrefix(parts[1], ".") {
continue
}
if strings.HasSuffix(object.Key, "/") || strings.Contains(object.Key, "/manifest.json.zst") {
sid := parts[1]
if !seen[sid] {
seen[sid] = true
snapshotIDs = append(snapshotIDs, sid)
}
}
}
}
return snapshotIDs, nil
}
// handleRemoveAllDryRun handles the dry-run mode for removing all snapshots
func (v *Vaultik) handleRemoveAllDryRun(snapshotIDs []string, opts *RemoveOptions) (*RemoveResult, error) {
result := &RemoveResult{
DryRun: true,
SnapshotsRemoved: snapshotIDs,
}
if !opts.JSON {
v.printfStdout("Would remove %d snapshot(s):\n", len(snapshotIDs))
for _, id := range snapshotIDs {
v.printfStdout(" %s\n", id)
}
if opts.Remote {
v.printlnStdout("Would also remove from remote storage")
}
v.printlnStdout("[Dry run - no changes made]")
}
if opts.JSON {
return result, v.outputRemoveJSON(result)
}
return result, nil
}
// executeRemoveAll removes all snapshots from local database and optionally from remote storage
func (v *Vaultik) executeRemoveAll(snapshotIDs []string, opts *RemoveOptions) (*RemoveResult, error) {
// --all requires --force
if !opts.Force {
return nil, fmt.Errorf("--all requires --force")
}
log.Info("Removing all snapshots", "count", len(snapshotIDs))
result := &RemoveResult{}
for _, snapshotID := range snapshotIDs {
log.Info("Removing snapshot", "snapshot_id", snapshotID)
if err := v.deleteSnapshotFromLocalDB(snapshotID); err != nil {
log.Error("Failed to remove from local database", "snapshot_id", snapshotID, "error", err)
continue
}
if opts.Remote {
if err := v.deleteSnapshotFromRemote(snapshotID); err != nil {
log.Error("Failed to remove from remote", "snapshot_id", snapshotID, "error", err)
continue
}
}
result.SnapshotsRemoved = append(result.SnapshotsRemoved, snapshotID)
}
if opts.Remote {
result.RemoteRemoved = true
}
if opts.JSON {
return result, v.outputRemoveJSON(result)
}
v.printfStdout("Removed %d snapshot(s)\n", len(result.SnapshotsRemoved))
if opts.Remote {
v.printlnStdout("Removed snapshot metadata from remote storage")
v.printlnStdout("\nNote: Blobs were not removed. Run 'vaultik prune' to remove orphaned blobs.")
}
return result, nil
}
// deleteSnapshotFromLocalDB removes a snapshot from the local database only
func (v *Vaultik) deleteSnapshotFromLocalDB(snapshotID string) error {
if v.Repositories == nil {
return nil // No local database
}
// Delete related records first to avoid foreign key constraints
if err := v.Repositories.Snapshots.DeleteSnapshotFiles(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot files for %s: %w", snapshotID, err)
}
if err := v.Repositories.Snapshots.DeleteSnapshotBlobs(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot blobs for %s: %w", snapshotID, err)
}
if err := v.Repositories.Snapshots.DeleteSnapshotUploads(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot uploads for %s: %w", snapshotID, err)
}
if err := v.Repositories.Snapshots.Delete(v.ctx, snapshotID); err != nil {
return fmt.Errorf("deleting snapshot record %s: %w", snapshotID, err)
}
return nil
}
// deleteSnapshotFromRemote removes snapshot metadata files from remote storage
func (v *Vaultik) deleteSnapshotFromRemote(snapshotID string) error {
prefix := fmt.Sprintf("metadata/%s/", snapshotID)
objectCh := v.Storage.ListStream(v.ctx, prefix)
var objectsToDelete []string
for object := range objectCh {
if object.Err != nil {
return fmt.Errorf("listing objects: %w", object.Err)
}
objectsToDelete = append(objectsToDelete, object.Key)
}
for _, key := range objectsToDelete {
if err := v.Storage.Delete(v.ctx, key); err != nil {
return fmt.Errorf("removing %s: %w", key, err)
}
log.Debug("Deleted remote object", "key", key)
}
return nil
}
// outputRemoveJSON outputs the removal result as JSON
func (v *Vaultik) outputRemoveJSON(result *RemoveResult) error {
encoder := json.NewEncoder(v.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
// PruneResult contains statistics about the prune operation
type PruneResult struct {
SnapshotsDeleted int64
FilesDeleted int64
ChunksDeleted int64
BlobsDeleted int64
}
// PruneDatabase removes incomplete snapshots and orphaned files, chunks,
// and blobs from the local database. This ensures database consistency
// before starting a new backup or on-demand via the prune command.
func (v *Vaultik) PruneDatabase() (*PruneResult, error) {
log.Info("Pruning local database: removing incomplete snapshots and orphaned data")
v.UI.Begin("Pruning local index database (removing incomplete snapshots and orphaned data).")
result := &PruneResult{}
// Snapshot counts before deletion of incompletes.
snapshotCountBefore, _ := v.getTableCount("snapshots")
// First, delete any incomplete snapshots
incompleteSnapshots, err := v.Repositories.Snapshots.GetIncompleteSnapshots(v.ctx)
if err != nil {
return nil, fmt.Errorf("getting incomplete snapshots: %w", err)
}
for _, snapshot := range incompleteSnapshots {
snapshotIDStr := snapshot.ID.String()
log.Info("Deleting incomplete snapshot", "snapshot_id", snapshot.ID)
// Delete related records first
if err := v.Repositories.Snapshots.DeleteSnapshotFiles(v.ctx, snapshotIDStr); err != nil {
log.Error("Failed to delete snapshot files", "snapshot_id", snapshot.ID, "error", err)
}
if err := v.Repositories.Snapshots.DeleteSnapshotBlobs(v.ctx, snapshotIDStr); err != nil {
log.Error("Failed to delete snapshot blobs", "snapshot_id", snapshot.ID, "error", err)
}
if err := v.Repositories.Snapshots.DeleteSnapshotUploads(v.ctx, snapshotIDStr); err != nil {
log.Error("Failed to delete snapshot uploads", "snapshot_id", snapshot.ID, "error", err)
}
if err := v.Repositories.Snapshots.Delete(v.ctx, snapshotIDStr); err != nil {
log.Error("Failed to delete snapshot", "snapshot_id", snapshot.ID, "error", err)
} else {
result.SnapshotsDeleted++
}
}
// Get counts before cleanup for reporting
fileCountBefore, _ := v.getTableCount("files")
chunkCountBefore, _ := v.getTableCount("chunks")
blobCountBefore, _ := v.getTableCount("blobs")
// Run the cleanup
if err := v.SnapshotManager.CleanupOrphanedData(v.ctx); err != nil {
return nil, fmt.Errorf("cleanup orphaned data: %w", err)
}
// Get counts after cleanup
fileCountAfter, _ := v.getTableCount("files")
chunkCountAfter, _ := v.getTableCount("chunks")
blobCountAfter, _ := v.getTableCount("blobs")
result.FilesDeleted = fileCountBefore - fileCountAfter
result.ChunksDeleted = chunkCountBefore - chunkCountAfter
result.BlobsDeleted = blobCountBefore - blobCountAfter
log.Info("Local database prune complete",
"incomplete_snapshots", result.SnapshotsDeleted,
"orphaned_files", result.FilesDeleted,
"orphaned_chunks", result.ChunksDeleted,
"orphaned_blobs", result.BlobsDeleted,
)
snapshotCountAfter := snapshotCountBefore - result.SnapshotsDeleted
v.UI.Complete("Pruned local index database.")
v.UI.Detail("Incomplete snapshots: %d removed (%d remain).", result.SnapshotsDeleted, snapshotCountAfter)
v.UI.Detail("Orphaned files: %d removed (%d remain).", result.FilesDeleted, fileCountAfter)
v.UI.Detail("Orphaned chunks: %d removed (%d remain).", result.ChunksDeleted, chunkCountAfter)
v.UI.Detail("Orphaned blobs: %d removed (%d remain).", result.BlobsDeleted, blobCountAfter)
return result, nil
}
// validTableNameRe matches table names containing only lowercase alphanumeric characters and underscores.
var validTableNameRe = regexp.MustCompile(`^[a-z0-9_]+$`)
// getTableCount returns the count of rows in a table.
// The tableName is sanitized to only allow [a-z0-9_] characters to prevent SQL injection.
func (v *Vaultik) getTableCount(tableName string) (int64, error) {
if v.DB == nil {
return 0, nil
}
if !validTableNameRe.MatchString(tableName) {
return 0, fmt.Errorf("invalid table name: %q", tableName)
}
var count int64
query := fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)
err := v.DB.Conn().QueryRowContext(v.ctx, query).Scan(&count)
if err != nil {
return 0, err
}
return count, nil
}