Compare commits
No commits in common. "a544fa80f2222462a170fe7adc258bea4384f51e" and "0cbb5aa0a64e8e76b6360c1f4ba096053591a736" have entirely different histories.
a544fa80f2
...
0cbb5aa0a6
@ -1,4 +1,4 @@
|
|||||||
package snapshot
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,4 +1,4 @@
|
|||||||
package snapshot_test
|
package backup_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -6,9 +6,9 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.eeqj.de/sneak/vaultik/internal/backup"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/database"
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/snapshot"
|
|
||||||
"github.com/spf13/afero"
|
"github.com/spf13/afero"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -39,7 +39,7 @@ func TestFileContentChange(t *testing.T) {
|
|||||||
repos := database.NewRepositories(db)
|
repos := database.NewRepositories(db)
|
||||||
|
|
||||||
// Create scanner
|
// Create scanner
|
||||||
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
scanner := backup.NewScanner(backup.ScannerConfig{
|
||||||
FS: fs,
|
FS: fs,
|
||||||
ChunkSize: int64(1024 * 16), // 16KB chunks for testing
|
ChunkSize: int64(1024 * 16), // 16KB chunks for testing
|
||||||
Repositories: repos,
|
Repositories: repos,
|
||||||
@ -168,7 +168,7 @@ func TestMultipleFileChanges(t *testing.T) {
|
|||||||
repos := database.NewRepositories(db)
|
repos := database.NewRepositories(db)
|
||||||
|
|
||||||
// Create scanner
|
// Create scanner
|
||||||
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
scanner := backup.NewScanner(backup.ScannerConfig{
|
||||||
FS: fs,
|
FS: fs,
|
||||||
ChunkSize: int64(1024 * 16), // 16KB chunks for testing
|
ChunkSize: int64(1024 * 16), // 16KB chunks for testing
|
||||||
Repositories: repos,
|
Repositories: repos,
|
@ -1,4 +1,4 @@
|
|||||||
package snapshot
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"git.eeqj.de/sneak/vaultik/internal/config"
|
"git.eeqj.de/sneak/vaultik/internal/config"
|
@ -1,4 +1,4 @@
|
|||||||
package snapshot
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,4 +1,4 @@
|
|||||||
package snapshot
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,4 +1,4 @@
|
|||||||
package snapshot_test
|
package backup_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -7,9 +7,9 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.eeqj.de/sneak/vaultik/internal/backup"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/database"
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/snapshot"
|
|
||||||
"github.com/spf13/afero"
|
"github.com/spf13/afero"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -60,7 +60,7 @@ func TestScannerSimpleDirectory(t *testing.T) {
|
|||||||
repos := database.NewRepositories(db)
|
repos := database.NewRepositories(db)
|
||||||
|
|
||||||
// Create scanner
|
// Create scanner
|
||||||
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
scanner := backup.NewScanner(backup.ScannerConfig{
|
||||||
FS: fs,
|
FS: fs,
|
||||||
ChunkSize: int64(1024 * 16), // 16KB chunks for testing
|
ChunkSize: int64(1024 * 16), // 16KB chunks for testing
|
||||||
Repositories: repos,
|
Repositories: repos,
|
||||||
@ -93,7 +93,7 @@ func TestScannerSimpleDirectory(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Scan the directory
|
// Scan the directory
|
||||||
var result *snapshot.ScanResult
|
var result *backup.ScanResult
|
||||||
result, err = scanner.Scan(ctx, "/source", snapshotID)
|
result, err = scanner.Scan(ctx, "/source", snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("scan failed: %v", err)
|
t.Fatalf("scan failed: %v", err)
|
||||||
@ -207,7 +207,7 @@ func TestScannerWithSymlinks(t *testing.T) {
|
|||||||
repos := database.NewRepositories(db)
|
repos := database.NewRepositories(db)
|
||||||
|
|
||||||
// Create scanner
|
// Create scanner
|
||||||
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
scanner := backup.NewScanner(backup.ScannerConfig{
|
||||||
FS: fs,
|
FS: fs,
|
||||||
ChunkSize: 1024 * 16,
|
ChunkSize: 1024 * 16,
|
||||||
Repositories: repos,
|
Repositories: repos,
|
||||||
@ -240,7 +240,7 @@ func TestScannerWithSymlinks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Scan the directory
|
// Scan the directory
|
||||||
var result *snapshot.ScanResult
|
var result *backup.ScanResult
|
||||||
result, err = scanner.Scan(ctx, "/source", snapshotID)
|
result, err = scanner.Scan(ctx, "/source", snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("scan failed: %v", err)
|
t.Fatalf("scan failed: %v", err)
|
||||||
@ -308,7 +308,7 @@ func TestScannerLargeFile(t *testing.T) {
|
|||||||
repos := database.NewRepositories(db)
|
repos := database.NewRepositories(db)
|
||||||
|
|
||||||
// Create scanner with 64KB average chunk size
|
// Create scanner with 64KB average chunk size
|
||||||
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
scanner := backup.NewScanner(backup.ScannerConfig{
|
||||||
FS: fs,
|
FS: fs,
|
||||||
ChunkSize: int64(1024 * 64), // 64KB average chunks
|
ChunkSize: int64(1024 * 64), // 64KB average chunks
|
||||||
Repositories: repos,
|
Repositories: repos,
|
||||||
@ -341,7 +341,7 @@ func TestScannerLargeFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Scan the directory
|
// Scan the directory
|
||||||
var result *snapshot.ScanResult
|
var result *backup.ScanResult
|
||||||
result, err = scanner.Scan(ctx, "/source", snapshotID)
|
result, err = scanner.Scan(ctx, "/source", snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("scan failed: %v", err)
|
t.Fatalf("scan failed: %v", err)
|
@ -1,9 +1,9 @@
|
|||||||
package snapshot
|
package backup
|
||||||
|
|
||||||
// Snapshot Metadata Export Process
|
// Snapshot Metadata Export Process
|
||||||
// ================================
|
// ================================
|
||||||
//
|
//
|
||||||
// The snapshot metadata contains all information needed to restore a snapshot.
|
// The snapshot metadata contains all information needed to restore a backup.
|
||||||
// Instead of creating a custom format, we use a trimmed copy of the SQLite
|
// Instead of creating a custom format, we use a trimmed copy of the SQLite
|
||||||
// database containing only data relevant to the current snapshot.
|
// database containing only data relevant to the current snapshot.
|
||||||
//
|
//
|
||||||
@ -42,6 +42,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
@ -55,6 +56,7 @@ import (
|
|||||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/s3"
|
"git.eeqj.de/sneak/vaultik/internal/s3"
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -538,54 +540,60 @@ func (sm *SnapshotManager) generateBlobManifest(ctx context.Context, dbPath stri
|
|||||||
|
|
||||||
// Get all blobs for this snapshot
|
// Get all blobs for this snapshot
|
||||||
log.Debug("Querying blobs for snapshot", "snapshot_id", snapshotID)
|
log.Debug("Querying blobs for snapshot", "snapshot_id", snapshotID)
|
||||||
blobHashes, err := repos.Snapshots.GetBlobHashes(ctx, snapshotID)
|
blobs, err := repos.Snapshots.GetBlobHashes(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("getting snapshot blobs: %w", err)
|
return nil, fmt.Errorf("getting snapshot blobs: %w", err)
|
||||||
}
|
}
|
||||||
log.Debug("Found blobs", "count", len(blobHashes))
|
log.Debug("Found blobs", "count", len(blobs))
|
||||||
|
|
||||||
// Get blob details including sizes
|
// Create manifest structure
|
||||||
blobs := make([]BlobInfo, 0, len(blobHashes))
|
manifest := struct {
|
||||||
totalCompressedSize := int64(0)
|
SnapshotID string `json:"snapshot_id"`
|
||||||
|
Timestamp string `json:"timestamp"`
|
||||||
for _, hash := range blobHashes {
|
BlobCount int `json:"blob_count"`
|
||||||
blob, err := repos.Blobs.GetByHash(ctx, hash)
|
Blobs []string `json:"blobs"`
|
||||||
if err != nil {
|
}{
|
||||||
log.Warn("Failed to get blob details", "hash", hash, "error", err)
|
SnapshotID: snapshotID,
|
||||||
continue
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
||||||
}
|
BlobCount: len(blobs),
|
||||||
if blob != nil {
|
Blobs: blobs,
|
||||||
blobs = append(blobs, BlobInfo{
|
|
||||||
Hash: hash,
|
|
||||||
CompressedSize: blob.CompressedSize,
|
|
||||||
})
|
|
||||||
totalCompressedSize += blob.CompressedSize
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create manifest
|
// Marshal to JSON
|
||||||
manifest := &Manifest{
|
log.Debug("Marshaling manifest to JSON")
|
||||||
SnapshotID: snapshotID,
|
jsonData, err := json.MarshalIndent(manifest, "", " ")
|
||||||
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
||||||
BlobCount: len(blobs),
|
|
||||||
TotalCompressedSize: totalCompressedSize,
|
|
||||||
Blobs: blobs,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode manifest
|
|
||||||
log.Debug("Encoding manifest")
|
|
||||||
compressedData, err := EncodeManifest(manifest, sm.config.CompressionLevel)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("encoding manifest: %w", err)
|
return nil, fmt.Errorf("marshaling manifest: %w", err)
|
||||||
}
|
}
|
||||||
|
log.Debug("JSON manifest created", "size", len(jsonData))
|
||||||
|
|
||||||
|
// Compress only (no encryption) - manifests must be readable without private keys for pruning
|
||||||
|
log.Debug("Compressing manifest")
|
||||||
|
|
||||||
|
var compressedBuf bytes.Buffer
|
||||||
|
writer, err := zstd.NewWriter(&compressedBuf, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(sm.config.CompressionLevel)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating zstd writer: %w", err)
|
||||||
|
}
|
||||||
|
if _, err := writer.Write(jsonData); err != nil {
|
||||||
|
_ = writer.Close()
|
||||||
|
return nil, fmt.Errorf("writing compressed data: %w", err)
|
||||||
|
}
|
||||||
|
if err := writer.Close(); err != nil {
|
||||||
|
return nil, fmt.Errorf("closing zstd writer: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("Manifest compressed",
|
||||||
|
"original_size", len(jsonData),
|
||||||
|
"compressed_size", compressedBuf.Len())
|
||||||
|
|
||||||
log.Info("Generated blob manifest",
|
log.Info("Generated blob manifest",
|
||||||
"snapshot_id", snapshotID,
|
"snapshot_id", snapshotID,
|
||||||
"blob_count", len(blobs),
|
"blob_count", len(blobs),
|
||||||
"total_compressed_size", totalCompressedSize,
|
"json_size", len(jsonData),
|
||||||
"manifest_size", len(compressedData))
|
"compressed_size", compressedBuf.Len())
|
||||||
|
|
||||||
return compressedData, nil
|
return compressedBuf.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// compressData compresses data using zstd
|
// compressData compresses data using zstd
|
@ -1,4 +1,4 @@
|
|||||||
package snapshot
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -2,16 +2,18 @@ package cli
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"git.eeqj.de/sneak/vaultik/internal/backup"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/config"
|
"git.eeqj.de/sneak/vaultik/internal/config"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/database"
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/globals"
|
"git.eeqj.de/sneak/vaultik/internal/globals"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/s3"
|
"git.eeqj.de/sneak/vaultik/internal/s3"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/snapshot"
|
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
)
|
)
|
||||||
@ -64,7 +66,7 @@ specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
|||||||
Debug: rootFlags.Debug,
|
Debug: rootFlags.Debug,
|
||||||
},
|
},
|
||||||
Modules: []fx.Option{
|
Modules: []fx.Option{
|
||||||
snapshot.Module,
|
backup.Module,
|
||||||
s3.Module,
|
s3.Module,
|
||||||
fx.Provide(fx.Annotate(
|
fx.Provide(fx.Annotate(
|
||||||
func(g *globals.Globals, cfg *config.Config, repos *database.Repositories,
|
func(g *globals.Globals, cfg *config.Config, repos *database.Repositories,
|
||||||
@ -193,8 +195,8 @@ func (app *PruneApp) runPrune(ctx context.Context, opts *PruneOptions) error {
|
|||||||
|
|
||||||
// Step 5: Build set of referenced blobs
|
// Step 5: Build set of referenced blobs
|
||||||
referencedBlobs := make(map[string]bool)
|
referencedBlobs := make(map[string]bool)
|
||||||
for _, blob := range manifest.Blobs {
|
for _, blobHash := range manifest.Blobs {
|
||||||
referencedBlobs[blob.Hash] = true
|
referencedBlobs[blobHash] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 6: List all blobs in S3
|
// Step 6: List all blobs in S3
|
||||||
@ -275,8 +277,16 @@ func (app *PruneApp) runPrune(ctx context.Context, opts *PruneOptions) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlobManifest represents the structure of a snapshot's blob manifest
|
||||||
|
type BlobManifest struct {
|
||||||
|
SnapshotID string `json:"snapshot_id"`
|
||||||
|
Timestamp string `json:"timestamp"`
|
||||||
|
BlobCount int `json:"blob_count"`
|
||||||
|
Blobs []string `json:"blobs"`
|
||||||
|
}
|
||||||
|
|
||||||
// downloadManifest downloads and decompresses a snapshot manifest
|
// downloadManifest downloads and decompresses a snapshot manifest
|
||||||
func (app *PruneApp) downloadManifest(ctx context.Context, snapshotID string) (*snapshot.Manifest, error) {
|
func (app *PruneApp) downloadManifest(ctx context.Context, snapshotID string) (*BlobManifest, error) {
|
||||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||||
|
|
||||||
// Download the compressed manifest
|
// Download the compressed manifest
|
||||||
@ -286,11 +296,18 @@ func (app *PruneApp) downloadManifest(ctx context.Context, snapshotID string) (*
|
|||||||
}
|
}
|
||||||
defer func() { _ = reader.Close() }()
|
defer func() { _ = reader.Close() }()
|
||||||
|
|
||||||
// Decode manifest
|
// Decompress using zstd
|
||||||
manifest, err := snapshot.DecodeManifest(reader)
|
zr, err := zstd.NewReader(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating zstd reader: %w", err)
|
||||||
|
}
|
||||||
|
defer zr.Close()
|
||||||
|
|
||||||
|
// Decode JSON manifest
|
||||||
|
var manifest BlobManifest
|
||||||
|
if err := json.NewDecoder(zr).Decode(&manifest); err != nil {
|
||||||
return nil, fmt.Errorf("decoding manifest: %w", err)
|
return nil, fmt.Errorf("decoding manifest: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return manifest, nil
|
return &manifest, nil
|
||||||
}
|
}
|
||||||
|
@ -12,13 +12,14 @@ import (
|
|||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.eeqj.de/sneak/vaultik/internal/backup"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/config"
|
"git.eeqj.de/sneak/vaultik/internal/config"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/database"
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/globals"
|
"git.eeqj.de/sneak/vaultik/internal/globals"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/s3"
|
"git.eeqj.de/sneak/vaultik/internal/s3"
|
||||||
"git.eeqj.de/sneak/vaultik/internal/snapshot"
|
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
)
|
)
|
||||||
@ -35,8 +36,8 @@ type SnapshotCreateApp struct {
|
|||||||
Globals *globals.Globals
|
Globals *globals.Globals
|
||||||
Config *config.Config
|
Config *config.Config
|
||||||
Repositories *database.Repositories
|
Repositories *database.Repositories
|
||||||
ScannerFactory snapshot.ScannerFactory
|
ScannerFactory backup.ScannerFactory
|
||||||
SnapshotManager *snapshot.SnapshotManager
|
SnapshotManager *backup.SnapshotManager
|
||||||
S3Client *s3.Client
|
S3Client *s3.Client
|
||||||
DB *database.DB
|
DB *database.DB
|
||||||
Lifecycle fx.Lifecycle
|
Lifecycle fx.Lifecycle
|
||||||
@ -105,11 +106,11 @@ specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
|||||||
Cron: opts.Cron,
|
Cron: opts.Cron,
|
||||||
},
|
},
|
||||||
Modules: []fx.Option{
|
Modules: []fx.Option{
|
||||||
snapshot.Module,
|
backup.Module,
|
||||||
s3.Module,
|
s3.Module,
|
||||||
fx.Provide(fx.Annotate(
|
fx.Provide(fx.Annotate(
|
||||||
func(g *globals.Globals, cfg *config.Config, repos *database.Repositories,
|
func(g *globals.Globals, cfg *config.Config, repos *database.Repositories,
|
||||||
scannerFactory snapshot.ScannerFactory, snapshotManager *snapshot.SnapshotManager,
|
scannerFactory backup.ScannerFactory, snapshotManager *backup.SnapshotManager,
|
||||||
s3Client *s3.Client, db *database.DB,
|
s3Client *s3.Client, db *database.DB,
|
||||||
lc fx.Lifecycle, shutdowner fx.Shutdowner) *SnapshotCreateApp {
|
lc fx.Lifecycle, shutdowner fx.Shutdowner) *SnapshotCreateApp {
|
||||||
return &SnapshotCreateApp{
|
return &SnapshotCreateApp{
|
||||||
@ -225,7 +226,7 @@ func (app *SnapshotCreateApp) runSnapshot(ctx context.Context, opts *SnapshotCre
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create scanner with progress enabled (unless in cron mode)
|
// Create scanner with progress enabled (unless in cron mode)
|
||||||
scanner := app.ScannerFactory(snapshot.ScannerParams{
|
scanner := app.ScannerFactory(backup.ScannerParams{
|
||||||
EnableProgress: !opts.Cron,
|
EnableProgress: !opts.Cron,
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -305,8 +306,8 @@ func (app *SnapshotCreateApp) runSnapshot(ctx context.Context, opts *SnapshotCre
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update snapshot statistics with extended fields
|
// Update snapshot statistics with extended fields
|
||||||
extStats := snapshot.ExtendedBackupStats{
|
extStats := backup.ExtendedBackupStats{
|
||||||
BackupStats: snapshot.BackupStats{
|
BackupStats: backup.BackupStats{
|
||||||
FilesScanned: totalFiles,
|
FilesScanned: totalFiles,
|
||||||
BytesScanned: totalBytes,
|
BytesScanned: totalBytes,
|
||||||
ChunksCreated: totalChunks,
|
ChunksCreated: totalChunks,
|
||||||
@ -486,78 +487,15 @@ func newSnapshotVerifyCommand() *cobra.Command {
|
|||||||
|
|
||||||
// List lists all snapshots
|
// List lists all snapshots
|
||||||
func (app *SnapshotApp) List(ctx context.Context, jsonOutput bool) error {
|
func (app *SnapshotApp) List(ctx context.Context, jsonOutput bool) error {
|
||||||
// Get all remote snapshots
|
// First, sync with remote snapshots
|
||||||
remoteSnapshots := make(map[string]bool)
|
if err := app.syncWithRemote(ctx); err != nil {
|
||||||
objectCh := app.S3Client.ListObjectsStream(ctx, "metadata/", false)
|
return fmt.Errorf("syncing with remote: %w", err)
|
||||||
|
|
||||||
for object := range objectCh {
|
|
||||||
if object.Err != nil {
|
|
||||||
return fmt.Errorf("listing remote snapshots: %w", object.Err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract snapshot ID from paths like metadata/hostname-20240115-143052Z/
|
|
||||||
parts := strings.Split(object.Key, "/")
|
|
||||||
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
|
|
||||||
remoteSnapshots[parts[1]] = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get all local snapshots
|
// Now get snapshots from S3
|
||||||
localSnapshots, err := app.Repositories.Snapshots.ListRecent(ctx, 10000)
|
snapshots, err := app.getSnapshots(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("listing local snapshots: %w", err)
|
return err
|
||||||
}
|
|
||||||
|
|
||||||
// Build a map of local snapshots for quick lookup
|
|
||||||
localSnapshotMap := make(map[string]*database.Snapshot)
|
|
||||||
for _, s := range localSnapshots {
|
|
||||||
localSnapshotMap[s.ID] = s
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove local snapshots that don't exist remotely
|
|
||||||
for _, snapshot := range localSnapshots {
|
|
||||||
if !remoteSnapshots[snapshot.ID] {
|
|
||||||
log.Info("Removing local snapshot not found in remote", "snapshot_id", snapshot.ID)
|
|
||||||
if err := app.Repositories.Snapshots.Delete(ctx, snapshot.ID); err != nil {
|
|
||||||
log.Error("Failed to delete local snapshot", "snapshot_id", snapshot.ID, "error", err)
|
|
||||||
}
|
|
||||||
delete(localSnapshotMap, snapshot.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build final snapshot list
|
|
||||||
snapshots := make([]SnapshotInfo, 0, len(remoteSnapshots))
|
|
||||||
|
|
||||||
for snapshotID := range remoteSnapshots {
|
|
||||||
// Check if we have this snapshot locally
|
|
||||||
if localSnap, exists := localSnapshotMap[snapshotID]; exists && localSnap.CompletedAt != nil {
|
|
||||||
// Use local data
|
|
||||||
snapshots = append(snapshots, SnapshotInfo{
|
|
||||||
ID: localSnap.ID,
|
|
||||||
Timestamp: localSnap.StartedAt,
|
|
||||||
CompressedSize: localSnap.BlobSize,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
// Remote snapshot not in local DB - fetch manifest to get size
|
|
||||||
timestamp, err := parseSnapshotTimestamp(snapshotID)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Failed to parse snapshot timestamp", "id", snapshotID, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to download manifest to get size
|
|
||||||
totalSize, err := app.getManifestSize(ctx, snapshotID)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Failed to get manifest size", "id", snapshotID, "error", err)
|
|
||||||
totalSize = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshots = append(snapshots, SnapshotInfo{
|
|
||||||
ID: snapshotID,
|
|
||||||
Timestamp: timestamp,
|
|
||||||
CompressedSize: totalSize,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort by timestamp (newest first)
|
// Sort by timestamp (newest first)
|
||||||
@ -595,27 +533,9 @@ func (app *SnapshotApp) List(ctx context.Context, jsonOutput bool) error {
|
|||||||
|
|
||||||
// Purge removes old snapshots based on criteria
|
// Purge removes old snapshots based on criteria
|
||||||
func (app *SnapshotApp) Purge(ctx context.Context, keepLatest bool, olderThan string, force bool) error {
|
func (app *SnapshotApp) Purge(ctx context.Context, keepLatest bool, olderThan string, force bool) error {
|
||||||
// Sync with remote first
|
snapshots, err := app.getSnapshots(ctx)
|
||||||
if err := app.syncWithRemote(ctx); err != nil {
|
|
||||||
return fmt.Errorf("syncing with remote: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get snapshots from local database
|
|
||||||
dbSnapshots, err := app.Repositories.Snapshots.ListRecent(ctx, 10000)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("listing snapshots: %w", err)
|
return err
|
||||||
}
|
|
||||||
|
|
||||||
// Convert to SnapshotInfo format, only including completed snapshots
|
|
||||||
snapshots := make([]SnapshotInfo, 0, len(dbSnapshots))
|
|
||||||
for _, s := range dbSnapshots {
|
|
||||||
if s.CompletedAt != nil {
|
|
||||||
snapshots = append(snapshots, SnapshotInfo{
|
|
||||||
ID: s.ID,
|
|
||||||
Timestamp: s.StartedAt,
|
|
||||||
CompressedSize: s.BlobSize,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort by timestamp (newest first)
|
// Sort by timestamp (newest first)
|
||||||
@ -738,25 +658,74 @@ func (app *SnapshotApp) Verify(ctx context.Context, snapshotID string, deep bool
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getManifestSize downloads a manifest and returns the total compressed size
|
// getSnapshots retrieves all snapshots from S3
|
||||||
func (app *SnapshotApp) getManifestSize(ctx context.Context, snapshotID string) (int64, error) {
|
func (app *SnapshotApp) getSnapshots(ctx context.Context) ([]SnapshotInfo, error) {
|
||||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
var snapshots []SnapshotInfo
|
||||||
|
|
||||||
reader, err := app.S3Client.GetObject(ctx, manifestPath)
|
// List all objects under metadata/
|
||||||
if err != nil {
|
objectCh := app.S3Client.ListObjectsStream(ctx, "metadata/", true)
|
||||||
return 0, fmt.Errorf("downloading manifest: %w", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = reader.Close() }()
|
|
||||||
|
|
||||||
manifest, err := snapshot.DecodeManifest(reader)
|
// Track unique snapshots
|
||||||
if err != nil {
|
snapshotMap := make(map[string]*SnapshotInfo)
|
||||||
return 0, fmt.Errorf("decoding manifest: %w", err)
|
|
||||||
|
for object := range objectCh {
|
||||||
|
if object.Err != nil {
|
||||||
|
return nil, fmt.Errorf("listing objects: %w", object.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract snapshot ID from paths like metadata/2024-01-15-143052-hostname/manifest.json.zst
|
||||||
|
parts := strings.Split(object.Key, "/")
|
||||||
|
if len(parts) < 3 || parts[0] != "metadata" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshotID := parts[1]
|
||||||
|
if snapshotID == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize snapshot info if not seen
|
||||||
|
if _, exists := snapshotMap[snapshotID]; !exists {
|
||||||
|
timestamp, err := parseSnapshotTimestamp(snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Failed to parse snapshot timestamp", "id", snapshotID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshotMap[snapshotID] = &SnapshotInfo{
|
||||||
|
ID: snapshotID,
|
||||||
|
Timestamp: timestamp,
|
||||||
|
CompressedSize: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return manifest.TotalCompressedSize, nil
|
// For each snapshot, download manifest and calculate total blob size
|
||||||
|
for _, snap := range snapshotMap {
|
||||||
|
manifest, err := app.downloadManifest(ctx, snap.ID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Failed to download manifest", "id", snap.ID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate total size of referenced blobs
|
||||||
|
for _, blobHash := range manifest {
|
||||||
|
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash)
|
||||||
|
info, err := app.S3Client.StatObject(ctx, blobPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Failed to stat blob", "blob", blobHash, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
snap.CompressedSize += info.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshots = append(snapshots, *snap)
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshots, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// downloadManifest downloads and parses a snapshot manifest (for verify command)
|
// downloadManifest downloads and parses a snapshot manifest
|
||||||
func (app *SnapshotApp) downloadManifest(ctx context.Context, snapshotID string) ([]string, error) {
|
func (app *SnapshotApp) downloadManifest(ctx context.Context, snapshotID string) ([]string, error) {
|
||||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||||
|
|
||||||
@ -766,17 +735,25 @@ func (app *SnapshotApp) downloadManifest(ctx context.Context, snapshotID string)
|
|||||||
}
|
}
|
||||||
defer func() { _ = reader.Close() }()
|
defer func() { _ = reader.Close() }()
|
||||||
|
|
||||||
manifest, err := snapshot.DecodeManifest(reader)
|
// Decompress
|
||||||
|
zr, err := zstd.NewReader(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating zstd reader: %w", err)
|
||||||
|
}
|
||||||
|
defer zr.Close()
|
||||||
|
|
||||||
|
// Decode JSON - manifest is an object with a "blobs" field
|
||||||
|
var manifest struct {
|
||||||
|
SnapshotID string `json:"snapshot_id"`
|
||||||
|
Timestamp string `json:"timestamp"`
|
||||||
|
BlobCount int `json:"blob_count"`
|
||||||
|
Blobs []string `json:"blobs"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(zr).Decode(&manifest); err != nil {
|
||||||
return nil, fmt.Errorf("decoding manifest: %w", err)
|
return nil, fmt.Errorf("decoding manifest: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract blob hashes
|
return manifest.Blobs, nil
|
||||||
hashes := make([]string, len(manifest.Blobs))
|
|
||||||
for i, blob := range manifest.Blobs {
|
|
||||||
hashes[i] = blob.Hash
|
|
||||||
}
|
|
||||||
return hashes, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteSnapshot removes a snapshot and its metadata
|
// deleteSnapshot removes a snapshot and its metadata
|
||||||
|
@ -1,70 +0,0 @@
|
|||||||
package snapshot
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"github.com/klauspost/compress/zstd"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Manifest represents the structure of a snapshot's blob manifest
|
|
||||||
type Manifest struct {
|
|
||||||
SnapshotID string `json:"snapshot_id"`
|
|
||||||
Timestamp string `json:"timestamp"`
|
|
||||||
BlobCount int `json:"blob_count"`
|
|
||||||
TotalCompressedSize int64 `json:"total_compressed_size"`
|
|
||||||
Blobs []BlobInfo `json:"blobs"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// BlobInfo represents information about a single blob in the manifest
|
|
||||||
type BlobInfo struct {
|
|
||||||
Hash string `json:"hash"`
|
|
||||||
CompressedSize int64 `json:"compressed_size"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// DecodeManifest decodes a manifest from a reader containing compressed JSON
|
|
||||||
func DecodeManifest(r io.Reader) (*Manifest, error) {
|
|
||||||
// Decompress using zstd
|
|
||||||
zr, err := zstd.NewReader(r)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("creating zstd reader: %w", err)
|
|
||||||
}
|
|
||||||
defer zr.Close()
|
|
||||||
|
|
||||||
// Decode JSON manifest
|
|
||||||
var manifest Manifest
|
|
||||||
if err := json.NewDecoder(zr).Decode(&manifest); err != nil {
|
|
||||||
return nil, fmt.Errorf("decoding manifest: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &manifest, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// EncodeManifest encodes a manifest to compressed JSON
|
|
||||||
func EncodeManifest(manifest *Manifest, compressionLevel int) ([]byte, error) {
|
|
||||||
// Marshal to JSON
|
|
||||||
jsonData, err := json.MarshalIndent(manifest, "", " ")
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("marshaling manifest: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compress using zstd
|
|
||||||
var compressedBuf bytes.Buffer
|
|
||||||
writer, err := zstd.NewWriter(&compressedBuf, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(compressionLevel)))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("creating zstd writer: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := writer.Write(jsonData); err != nil {
|
|
||||||
_ = writer.Close()
|
|
||||||
return nil, fmt.Errorf("writing compressed data: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := writer.Close(); err != nil {
|
|
||||||
return nil, fmt.Errorf("closing zstd writer: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return compressedBuf.Bytes(), nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user