- Created manifest.go with proper Manifest structure including blob sizes - Updated manifest generation to include compressed size for each blob - Added TotalCompressedSize field to manifest for quick access - Renamed backup package to snapshot for clarity - Updated snapshot list to show all remote snapshots - Remote snapshots not in local DB fetch manifest to get size - Local snapshots not in remote are automatically deleted - Removed backwards compatibility code (pre-1.0, no users) - Fixed prune command to use new manifest format - Updated all imports and references from backup to snapshot
297 lines
8.6 KiB
Go
297 lines
8.6 KiB
Go
package cli
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"git.eeqj.de/sneak/vaultik/internal/config"
|
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
|
"git.eeqj.de/sneak/vaultik/internal/globals"
|
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
|
"git.eeqj.de/sneak/vaultik/internal/s3"
|
|
"git.eeqj.de/sneak/vaultik/internal/snapshot"
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/spf13/cobra"
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
// PruneOptions contains options for the prune command
|
|
type PruneOptions struct {
|
|
DryRun bool
|
|
}
|
|
|
|
// PruneApp contains all dependencies needed for pruning
|
|
type PruneApp struct {
|
|
Globals *globals.Globals
|
|
Config *config.Config
|
|
Repositories *database.Repositories
|
|
S3Client *s3.Client
|
|
DB *database.DB
|
|
Shutdowner fx.Shutdowner
|
|
}
|
|
|
|
// NewPruneCommand creates the prune command
|
|
func NewPruneCommand() *cobra.Command {
|
|
opts := &PruneOptions{}
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "prune",
|
|
Short: "Remove unreferenced blobs",
|
|
Long: `Delete blobs that are no longer referenced by any snapshot.
|
|
|
|
This command will:
|
|
1. Download the manifest from the last successful snapshot
|
|
2. List all blobs in S3
|
|
3. Delete any blobs not referenced in the manifest
|
|
|
|
Config is located at /etc/vaultik/config.yml by default, but can be overridden by
|
|
specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
|
Args: cobra.NoArgs,
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
// Use unified config resolution
|
|
configPath, err := ResolveConfigPath()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Use the app framework like other commands
|
|
rootFlags := GetRootFlags()
|
|
return RunWithApp(cmd.Context(), AppOptions{
|
|
ConfigPath: configPath,
|
|
LogOptions: log.LogOptions{
|
|
Verbose: rootFlags.Verbose,
|
|
Debug: rootFlags.Debug,
|
|
},
|
|
Modules: []fx.Option{
|
|
snapshot.Module,
|
|
s3.Module,
|
|
fx.Provide(fx.Annotate(
|
|
func(g *globals.Globals, cfg *config.Config, repos *database.Repositories,
|
|
s3Client *s3.Client, db *database.DB, shutdowner fx.Shutdowner) *PruneApp {
|
|
return &PruneApp{
|
|
Globals: g,
|
|
Config: cfg,
|
|
Repositories: repos,
|
|
S3Client: s3Client,
|
|
DB: db,
|
|
Shutdowner: shutdowner,
|
|
}
|
|
},
|
|
)),
|
|
},
|
|
Invokes: []fx.Option{
|
|
fx.Invoke(func(app *PruneApp, lc fx.Lifecycle) {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
// Start the prune operation in a goroutine
|
|
go func() {
|
|
// Run the prune operation
|
|
if err := app.runPrune(ctx, opts); err != nil {
|
|
if err != context.Canceled {
|
|
log.Error("Prune operation failed", "error", err)
|
|
}
|
|
}
|
|
|
|
// Shutdown the app when prune completes
|
|
if err := app.Shutdowner.Shutdown(); err != nil {
|
|
log.Error("Failed to shutdown", "error", err)
|
|
}
|
|
}()
|
|
return nil
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
log.Debug("Stopping prune operation")
|
|
return nil
|
|
},
|
|
})
|
|
}),
|
|
},
|
|
})
|
|
},
|
|
}
|
|
|
|
cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Show what would be deleted without actually deleting")
|
|
|
|
return cmd
|
|
}
|
|
|
|
// runPrune executes the prune operation
|
|
func (app *PruneApp) runPrune(ctx context.Context, opts *PruneOptions) error {
|
|
log.Info("Starting prune operation",
|
|
"bucket", app.Config.S3.Bucket,
|
|
"prefix", app.Config.S3.Prefix,
|
|
"dry_run", opts.DryRun,
|
|
)
|
|
|
|
// Step 1: Get the latest complete snapshot from the database
|
|
log.Info("Getting latest snapshot from database")
|
|
snapshots, err := app.Repositories.Snapshots.ListRecent(ctx, 1)
|
|
if err != nil {
|
|
return fmt.Errorf("listing snapshots: %w", err)
|
|
}
|
|
|
|
if len(snapshots) == 0 {
|
|
return fmt.Errorf("no snapshots found in database")
|
|
}
|
|
|
|
latestSnapshot := snapshots[0]
|
|
if latestSnapshot.CompletedAt == nil {
|
|
return fmt.Errorf("latest snapshot %s is incomplete", latestSnapshot.ID)
|
|
}
|
|
|
|
log.Info("Found latest snapshot",
|
|
"id", latestSnapshot.ID,
|
|
"completed_at", latestSnapshot.CompletedAt.Format("2006-01-02 15:04:05"))
|
|
|
|
// Step 2: Find and download the manifest from the last successful snapshot in S3
|
|
log.Info("Finding last successful snapshot in S3")
|
|
metadataPrefix := "metadata/"
|
|
|
|
// List all snapshots in S3
|
|
var s3Snapshots []string
|
|
objectCh := app.S3Client.ListObjectsStream(ctx, metadataPrefix, false)
|
|
for obj := range objectCh {
|
|
if obj.Err != nil {
|
|
return fmt.Errorf("listing metadata objects: %w", obj.Err)
|
|
}
|
|
// Extract snapshot ID from path like "metadata/hostname-20240115-143052Z/manifest.json.zst"
|
|
parts := strings.Split(obj.Key, "/")
|
|
if len(parts) >= 2 && strings.HasSuffix(obj.Key, "/manifest.json.zst") {
|
|
s3Snapshots = append(s3Snapshots, parts[1])
|
|
}
|
|
}
|
|
|
|
if len(s3Snapshots) == 0 {
|
|
return fmt.Errorf("no snapshot manifests found in S3")
|
|
}
|
|
|
|
// Find the most recent snapshot (they're named with timestamps)
|
|
var lastS3Snapshot string
|
|
for _, snap := range s3Snapshots {
|
|
if lastS3Snapshot == "" || snap > lastS3Snapshot {
|
|
lastS3Snapshot = snap
|
|
}
|
|
}
|
|
|
|
log.Info("Found last S3 snapshot", "id", lastS3Snapshot)
|
|
|
|
// Step 3: Verify the last S3 snapshot matches the latest DB snapshot
|
|
if lastS3Snapshot != latestSnapshot.ID {
|
|
return fmt.Errorf("latest snapshot in database (%s) does not match last successful snapshot in S3 (%s)",
|
|
latestSnapshot.ID, lastS3Snapshot)
|
|
}
|
|
|
|
// Step 4: Download and parse the manifest
|
|
log.Info("Downloading manifest", "snapshot_id", lastS3Snapshot)
|
|
manifest, err := app.downloadManifest(ctx, lastS3Snapshot)
|
|
if err != nil {
|
|
return fmt.Errorf("downloading manifest: %w", err)
|
|
}
|
|
|
|
log.Info("Manifest loaded", "blob_count", len(manifest.Blobs))
|
|
|
|
// Step 5: Build set of referenced blobs
|
|
referencedBlobs := make(map[string]bool)
|
|
for _, blob := range manifest.Blobs {
|
|
referencedBlobs[blob.Hash] = true
|
|
}
|
|
|
|
// Step 6: List all blobs in S3
|
|
log.Info("Listing all blobs in S3")
|
|
blobPrefix := "blobs/"
|
|
var totalBlobs int
|
|
var unreferencedBlobs []s3.ObjectInfo
|
|
var unreferencedSize int64
|
|
|
|
objectCh = app.S3Client.ListObjectsStream(ctx, blobPrefix, true)
|
|
for obj := range objectCh {
|
|
if obj.Err != nil {
|
|
return fmt.Errorf("listing blobs: %w", obj.Err)
|
|
}
|
|
|
|
totalBlobs++
|
|
|
|
// Extract blob hash from path like "blobs/ca/fe/cafebabe..."
|
|
parts := strings.Split(obj.Key, "/")
|
|
if len(parts) == 4 {
|
|
blobHash := parts[3]
|
|
if !referencedBlobs[blobHash] {
|
|
unreferencedBlobs = append(unreferencedBlobs, obj)
|
|
unreferencedSize += obj.Size
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Info("Blob scan complete",
|
|
"total_blobs", totalBlobs,
|
|
"referenced_blobs", len(referencedBlobs),
|
|
"unreferenced_blobs", len(unreferencedBlobs),
|
|
"unreferenced_size", humanize.Bytes(uint64(unreferencedSize)))
|
|
|
|
// Step 7: Delete or report unreferenced blobs
|
|
if opts.DryRun {
|
|
fmt.Printf("\nDry run mode - would delete %d unreferenced blobs\n", len(unreferencedBlobs))
|
|
fmt.Printf("Total size of blobs to delete: %s\n", humanize.Bytes(uint64(unreferencedSize)))
|
|
|
|
if len(unreferencedBlobs) > 0 {
|
|
log.Debug("Unreferenced blobs found", "count", len(unreferencedBlobs))
|
|
for _, obj := range unreferencedBlobs {
|
|
log.Debug("Would delete blob", "key", obj.Key, "size", humanize.Bytes(uint64(obj.Size)))
|
|
}
|
|
}
|
|
} else {
|
|
if len(unreferencedBlobs) == 0 {
|
|
fmt.Println("No unreferenced blobs to delete")
|
|
return nil
|
|
}
|
|
|
|
fmt.Printf("\nDeleting %d unreferenced blobs (%s)...\n",
|
|
len(unreferencedBlobs), humanize.Bytes(uint64(unreferencedSize)))
|
|
|
|
deletedCount := 0
|
|
deletedSize := int64(0)
|
|
|
|
for _, obj := range unreferencedBlobs {
|
|
if err := app.S3Client.RemoveObject(ctx, obj.Key); err != nil {
|
|
log.Error("Failed to delete blob", "key", obj.Key, "error", err)
|
|
continue
|
|
}
|
|
deletedCount++
|
|
deletedSize += obj.Size
|
|
|
|
// Show progress every 100 blobs
|
|
if deletedCount%100 == 0 {
|
|
fmt.Printf(" Deleted %d/%d blobs (%s)...\n",
|
|
deletedCount, len(unreferencedBlobs),
|
|
humanize.Bytes(uint64(deletedSize)))
|
|
}
|
|
}
|
|
|
|
fmt.Printf("\nDeleted %d blobs (%s)\n", deletedCount, humanize.Bytes(uint64(deletedSize)))
|
|
}
|
|
|
|
log.Info("Prune operation completed successfully")
|
|
return nil
|
|
}
|
|
|
|
// downloadManifest downloads and decompresses a snapshot manifest
|
|
func (app *PruneApp) downloadManifest(ctx context.Context, snapshotID string) (*snapshot.Manifest, error) {
|
|
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
|
|
|
// Download the compressed manifest
|
|
reader, err := app.S3Client.GetObject(ctx, manifestPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("downloading manifest: %w", err)
|
|
}
|
|
defer func() { _ = reader.Close() }()
|
|
|
|
// Decode manifest
|
|
manifest, err := snapshot.DecodeManifest(reader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("decoding manifest: %w", err)
|
|
}
|
|
|
|
return manifest, nil
|
|
}
|