The bug: fully-deduplicated snapshots (every chunk already in storage from a prior run) had an empty snapshot_blobs table. The metadata- export pipeline then dropped all blob/blob_chunks rows from the exported database, leaving file_chunks references to chunks whose blobs were no longer recorded. Restore fails on every file with "chunk X not found in any blob". Fix: at CompleteSnapshot time, run an INSERT OR IGNORE that links every blob holding a chunk referenced by this snapshot's files into snapshot_blobs. New blobs uploaded during the snapshot are already recorded (no-op for them); dedup-referenced blobs are added. The cleanup query in deleteOrphanedBlobs already restricts to snapshot_blobs entries for the current snapshot — so once snapshot_blobs is correctly populated, the exported database contains the full set of blob/blob_chunks rows needed for restore. Regression test: TestDedupOnlySnapshotRestores creates two identical snapshots (the second uploads zero new blobs) and restores the second. Without the fix, restore fails on every file.
927 lines
34 KiB
Go
927 lines
34 KiB
Go
package snapshot
|
|
|
|
// Snapshot Metadata Export Process
|
|
// ================================
|
|
//
|
|
// The snapshot metadata contains all information needed to restore a snapshot.
|
|
// Instead of creating a custom format, we use a trimmed copy of the SQLite
|
|
// database containing only data relevant to the current snapshot.
|
|
//
|
|
// Process Overview:
|
|
// 1. After all files/chunks/blobs are backed up, create a snapshot record
|
|
// 2. Close the main database to ensure consistency
|
|
// 3. Copy the entire database to a temporary file
|
|
// 4. Open the temporary database
|
|
// 5. Delete all snapshots except the current one
|
|
// 6. Delete all orphaned records:
|
|
// - Files not referenced by any remaining snapshot
|
|
// - Chunks not referenced by any remaining files
|
|
// - Blobs not containing any remaining chunks
|
|
// - All related mapping tables (file_chunks, chunk_files, blob_chunks)
|
|
// 7. Close the temporary database
|
|
// 8. VACUUM the database to remove deleted data and compact (security critical)
|
|
// 9. Compress the binary database with zstd
|
|
// 10. Encrypt the compressed database with age (if encryption is enabled)
|
|
// 11. Upload to S3 as: metadata/{snapshot-id}/db.zst.age
|
|
// 12. Reopen the main database
|
|
//
|
|
// Advantages of this approach:
|
|
// - No custom metadata format needed
|
|
// - Reuses existing database schema and relationships
|
|
// - Binary SQLite files are portable and compress well
|
|
// - Fast restore - just decompress and open (no SQL parsing)
|
|
// - VACUUM ensures no deleted data leaks
|
|
// - Atomic and consistent snapshot of all metadata
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"io"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/spf13/afero"
|
|
"go.uber.org/fx"
|
|
"sneak.berlin/go/vaultik/internal/blobgen"
|
|
"sneak.berlin/go/vaultik/internal/config"
|
|
"sneak.berlin/go/vaultik/internal/database"
|
|
"sneak.berlin/go/vaultik/internal/log"
|
|
"sneak.berlin/go/vaultik/internal/storage"
|
|
"sneak.berlin/go/vaultik/internal/types"
|
|
)
|
|
|
|
// SnapshotManager handles snapshot creation and metadata export
|
|
type SnapshotManager struct {
|
|
repos *database.Repositories
|
|
storage storage.Storer
|
|
config *config.Config
|
|
fs afero.Fs
|
|
}
|
|
|
|
// SnapshotManagerParams holds dependencies for NewSnapshotManager
|
|
type SnapshotManagerParams struct {
|
|
fx.In
|
|
|
|
Repos *database.Repositories
|
|
Storage storage.Storer
|
|
Config *config.Config
|
|
}
|
|
|
|
// NewSnapshotManager creates a new snapshot manager for dependency injection
|
|
func NewSnapshotManager(params SnapshotManagerParams) *SnapshotManager {
|
|
return &SnapshotManager{
|
|
repos: params.Repos,
|
|
storage: params.Storage,
|
|
config: params.Config,
|
|
}
|
|
}
|
|
|
|
// SetFilesystem sets the filesystem to use for all file operations
|
|
func (sm *SnapshotManager) SetFilesystem(fs afero.Fs) {
|
|
sm.fs = fs
|
|
}
|
|
|
|
// CreateSnapshot creates a new snapshot record in the database at the start of a backup.
|
|
// Deprecated: Use CreateSnapshotWithName instead for multi-snapshot support.
|
|
func (sm *SnapshotManager) CreateSnapshot(ctx context.Context, hostname, version, gitRevision string) (string, error) {
|
|
return sm.CreateSnapshotWithName(ctx, hostname, "", version, gitRevision)
|
|
}
|
|
|
|
// CreateSnapshotWithName creates a new snapshot record with an optional snapshot name.
|
|
// The snapshot ID format is: hostname_name_timestamp or hostname_timestamp if name is empty.
|
|
func (sm *SnapshotManager) CreateSnapshotWithName(ctx context.Context, hostname, name, version, gitRevision string) (string, error) {
|
|
// Use short hostname (strip domain if present)
|
|
shortHostname := hostname
|
|
if idx := strings.Index(hostname, "."); idx != -1 {
|
|
shortHostname = hostname[:idx]
|
|
}
|
|
|
|
// Build snapshot ID with optional name
|
|
timestamp := time.Now().UTC().Format("2006-01-02T15:04:05Z")
|
|
var snapshotID string
|
|
if name != "" {
|
|
snapshotID = fmt.Sprintf("%s_%s_%s", shortHostname, name, timestamp)
|
|
} else {
|
|
snapshotID = fmt.Sprintf("%s_%s", shortHostname, timestamp)
|
|
}
|
|
|
|
snapshot := &database.Snapshot{
|
|
ID: types.SnapshotID(snapshotID),
|
|
Hostname: types.Hostname(hostname),
|
|
VaultikVersion: types.Version(version),
|
|
VaultikGitRevision: types.GitRevision(gitRevision),
|
|
StartedAt: time.Now().UTC(),
|
|
CompletedAt: nil, // Not completed yet
|
|
FileCount: 0,
|
|
ChunkCount: 0,
|
|
BlobCount: 0,
|
|
TotalSize: 0,
|
|
BlobSize: 0,
|
|
CompressionRatio: 1.0,
|
|
}
|
|
|
|
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
return sm.repos.Snapshots.Create(ctx, tx, snapshot)
|
|
})
|
|
|
|
if err != nil {
|
|
return "", fmt.Errorf("creating snapshot: %w", err)
|
|
}
|
|
|
|
log.Info("Created snapshot", "snapshot_id", snapshotID)
|
|
return snapshotID, nil
|
|
}
|
|
|
|
// UpdateSnapshotStats updates the statistics for a snapshot during backup
|
|
func (sm *SnapshotManager) UpdateSnapshotStats(ctx context.Context, snapshotID string, stats BackupStats) error {
|
|
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
return sm.repos.Snapshots.UpdateCounts(ctx, tx, snapshotID,
|
|
int64(stats.FilesScanned),
|
|
int64(stats.ChunksCreated),
|
|
int64(stats.BlobsCreated),
|
|
stats.BytesScanned,
|
|
stats.BytesUploaded,
|
|
)
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("updating snapshot stats: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateSnapshotStatsExtended updates snapshot statistics with extended metrics.
|
|
// This includes compression level, uncompressed blob size, and upload duration.
|
|
func (sm *SnapshotManager) UpdateSnapshotStatsExtended(ctx context.Context, snapshotID string, stats ExtendedBackupStats) error {
|
|
return sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
// First update basic stats
|
|
if err := sm.repos.Snapshots.UpdateCounts(ctx, tx, snapshotID,
|
|
int64(stats.FilesScanned),
|
|
int64(stats.ChunksCreated),
|
|
int64(stats.BlobsCreated),
|
|
stats.BytesScanned,
|
|
stats.BytesUploaded,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Then update extended stats
|
|
return sm.repos.Snapshots.UpdateExtendedStats(ctx, tx, snapshotID,
|
|
stats.BlobUncompressedSize,
|
|
stats.CompressionLevel,
|
|
stats.UploadDurationMs,
|
|
)
|
|
})
|
|
}
|
|
|
|
// CompleteSnapshot marks a snapshot as completed and ensures snapshot_blobs
|
|
// is populated with every blob holding any chunk referenced by the
|
|
// snapshot's files (including deduplicated blobs uploaded by prior
|
|
// snapshots). Without this, fully-deduplicated snapshots are unrestorable.
|
|
func (sm *SnapshotManager) CompleteSnapshot(ctx context.Context, snapshotID string) error {
|
|
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
added, err := sm.repos.Snapshots.PopulateReferencedBlobs(ctx, tx, snapshotID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if added > 0 {
|
|
log.Info("Populated snapshot_blobs with dedup-referenced blobs",
|
|
"snapshot_id", snapshotID, "added", added)
|
|
}
|
|
return sm.repos.Snapshots.MarkComplete(ctx, tx, snapshotID)
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("marking snapshot complete: %w", err)
|
|
}
|
|
|
|
log.Info("Completed snapshot", "snapshot_id", snapshotID)
|
|
return nil
|
|
}
|
|
|
|
// ExportSnapshotMetadata exports snapshot metadata to S3
|
|
//
|
|
// This method executes the complete snapshot metadata export process:
|
|
// 1. Creates a temporary directory for working files
|
|
// 2. Copies the main database to preserve its state
|
|
// 3. Cleans the copy to contain only current snapshot data
|
|
// 4. Dumps the cleaned database to SQL
|
|
// 5. Compresses the SQL dump with zstd
|
|
// 6. Encrypts the compressed data (if encryption is enabled)
|
|
// 7. Uploads to S3 at: snapshots/{snapshot-id}.sql.zst[.age]
|
|
//
|
|
// The caller is responsible for:
|
|
// - Ensuring the main database is closed before calling this method
|
|
// - Reopening the main database after this method returns
|
|
//
|
|
// This ensures database consistency during the copy operation.
|
|
func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath string, snapshotID string) error {
|
|
log.Info("Phase 3/3: Exporting snapshot metadata", "snapshot_id", snapshotID, "source_db", dbPath)
|
|
|
|
// Create temp directory for all temporary files
|
|
tempDir, err := afero.TempDir(sm.fs, "", "vaultik-snapshot-*")
|
|
if err != nil {
|
|
return fmt.Errorf("creating temp dir: %w", err)
|
|
}
|
|
log.Debug("Created temporary directory", "path", tempDir)
|
|
defer func() {
|
|
log.Debug("Cleaning up temporary directory", "path", tempDir)
|
|
if err := sm.fs.RemoveAll(tempDir); err != nil {
|
|
log.Debug("Failed to remove temp dir", "path", tempDir, "error", err)
|
|
}
|
|
}()
|
|
|
|
// Steps 1-5: Copy, clean, vacuum, compress, and read the database
|
|
finalData, tempDBPath, err := sm.prepareExportDB(ctx, dbPath, snapshotID, tempDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Step 6: Generate blob manifest (before closing temp DB)
|
|
blobManifest, err := sm.generateBlobManifest(ctx, tempDBPath, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("generating blob manifest: %w", err)
|
|
}
|
|
|
|
// Step 7: Upload to S3 in snapshot subdirectory
|
|
if err := sm.uploadSnapshotArtifacts(ctx, snapshotID, finalData, blobManifest); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("Uploaded snapshot metadata",
|
|
"snapshot_id", snapshotID,
|
|
"db_size", len(finalData),
|
|
"manifest_size", len(blobManifest))
|
|
return nil
|
|
}
|
|
|
|
// prepareExportDB copies, cleans, vacuums, and compresses the snapshot database for export.
|
|
// Returns the compressed data and the path to the temporary database (needed for manifest generation).
|
|
func (sm *SnapshotManager) prepareExportDB(ctx context.Context, dbPath, snapshotID, tempDir string) ([]byte, string, error) {
|
|
// Step 1: Copy database to temp file
|
|
// The main database should be closed at this point
|
|
tempDBPath := filepath.Join(tempDir, "snapshot.db")
|
|
log.Debug("Copying database to temporary location", "source", dbPath, "destination", tempDBPath)
|
|
if err := sm.copyFile(dbPath, tempDBPath); err != nil {
|
|
return nil, "", fmt.Errorf("copying database: %w", err)
|
|
}
|
|
log.Debug("Database copy complete", "size", sm.getFileSize(tempDBPath))
|
|
|
|
// Step 2: Clean the temp database to only contain current snapshot data
|
|
log.Debug("Cleaning temporary database", "snapshot_id", snapshotID)
|
|
stats, err := sm.cleanSnapshotDB(ctx, tempDBPath, snapshotID)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("cleaning snapshot database: %w", err)
|
|
}
|
|
log.Info("Temporary database cleanup complete",
|
|
"db_path", tempDBPath,
|
|
"size_after_clean", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))),
|
|
"files", stats.FileCount,
|
|
"chunks", stats.ChunkCount,
|
|
"blobs", stats.BlobCount,
|
|
"total_compressed_size", humanize.Bytes(uint64(stats.CompressedSize)),
|
|
"total_uncompressed_size", humanize.Bytes(uint64(stats.UncompressedSize)),
|
|
"compression_ratio", fmt.Sprintf("%.2fx", float64(stats.UncompressedSize)/float64(stats.CompressedSize)))
|
|
|
|
// Step 3: VACUUM the database to remove deleted data and compact
|
|
// This is critical for security - ensures no stale/deleted data is uploaded
|
|
if err := sm.vacuumDatabase(tempDBPath); err != nil {
|
|
return nil, "", fmt.Errorf("vacuuming database: %w", err)
|
|
}
|
|
log.Debug("Database vacuumed", "size", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))))
|
|
|
|
// Step 4: Compress and encrypt the binary database file
|
|
compressedPath := filepath.Join(tempDir, "db.zst.age")
|
|
if err := sm.compressFile(tempDBPath, compressedPath); err != nil {
|
|
return nil, "", fmt.Errorf("compressing database: %w", err)
|
|
}
|
|
log.Debug("Compression complete",
|
|
"original_size", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))),
|
|
"compressed_size", humanize.Bytes(uint64(sm.getFileSize(compressedPath))))
|
|
|
|
// Step 5: Read compressed and encrypted data for upload
|
|
finalData, err := afero.ReadFile(sm.fs, compressedPath)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("reading compressed dump: %w", err)
|
|
}
|
|
|
|
return finalData, tempDBPath, nil
|
|
}
|
|
|
|
// uploadSnapshotArtifacts uploads the database backup and blob manifest to S3
|
|
func (sm *SnapshotManager) uploadSnapshotArtifacts(ctx context.Context, snapshotID string, dbData, manifestData []byte) error {
|
|
// Upload database backup (compressed and encrypted)
|
|
dbKey := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
|
|
|
dbUploadStart := time.Now()
|
|
if err := sm.storage.Put(ctx, dbKey, bytes.NewReader(dbData)); err != nil {
|
|
return fmt.Errorf("uploading snapshot database: %w", err)
|
|
}
|
|
dbUploadDuration := time.Since(dbUploadStart)
|
|
dbUploadSpeed := float64(len(dbData)) * 8 / dbUploadDuration.Seconds() // bits per second
|
|
log.Info("Uploaded snapshot database",
|
|
"path", dbKey,
|
|
"size", humanize.Bytes(uint64(len(dbData))),
|
|
"duration", dbUploadDuration,
|
|
"speed", humanize.SI(dbUploadSpeed, "bps"))
|
|
|
|
// Upload blob manifest (compressed only, not encrypted)
|
|
manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
|
manifestUploadStart := time.Now()
|
|
if err := sm.storage.Put(ctx, manifestKey, bytes.NewReader(manifestData)); err != nil {
|
|
return fmt.Errorf("uploading blob manifest: %w", err)
|
|
}
|
|
manifestUploadDuration := time.Since(manifestUploadStart)
|
|
manifestUploadSpeed := float64(len(manifestData)) * 8 / manifestUploadDuration.Seconds() // bits per second
|
|
log.Info("Uploaded blob manifest",
|
|
"path", manifestKey,
|
|
"size", humanize.Bytes(uint64(len(manifestData))),
|
|
"duration", manifestUploadDuration,
|
|
"speed", humanize.SI(manifestUploadSpeed, "bps"))
|
|
|
|
return nil
|
|
}
|
|
|
|
// CleanupStats contains statistics about cleaned snapshot database
|
|
type CleanupStats struct {
|
|
FileCount int
|
|
ChunkCount int
|
|
BlobCount int
|
|
CompressedSize int64
|
|
UncompressedSize int64
|
|
}
|
|
|
|
// cleanSnapshotDB removes all data except for the specified snapshot
|
|
//
|
|
// The cleanup is performed in a specific order to maintain referential integrity:
|
|
// 1. Delete other snapshots
|
|
// 2. Delete orphaned snapshot associations (snapshot_files, snapshot_blobs) for deleted snapshots
|
|
// 3. Delete orphaned files (not in the current snapshot)
|
|
// 4. Delete orphaned chunk-to-file mappings (references to deleted files)
|
|
// 5. Delete orphaned blobs (not in the current snapshot)
|
|
// 6. Delete orphaned blob-to-chunk mappings (references to deleted chunks)
|
|
// 7. Delete orphaned chunks (not referenced by any file)
|
|
//
|
|
// Each step is implemented as a separate method for clarity and maintainability.
|
|
func (sm *SnapshotManager) cleanSnapshotDB(ctx context.Context, dbPath string, snapshotID string) (*CleanupStats, error) {
|
|
// Open the temp database
|
|
db, err := database.New(ctx, dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening temp database: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := db.Close(); err != nil {
|
|
log.Debug("Failed to close temp database", "error", err)
|
|
}
|
|
}()
|
|
|
|
// Start a transaction
|
|
tx, err := db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("beginning transaction: %w", err)
|
|
}
|
|
defer func() {
|
|
if rbErr := tx.Rollback(); rbErr != nil && rbErr != sql.ErrTxDone {
|
|
log.Debug("Failed to rollback transaction", "error", rbErr)
|
|
}
|
|
}()
|
|
|
|
// Execute cleanup steps in order
|
|
if err := sm.deleteOtherSnapshots(ctx, tx, snapshotID); err != nil {
|
|
return nil, fmt.Errorf("step 1 - delete other snapshots: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedSnapshotAssociations(ctx, tx, snapshotID); err != nil {
|
|
return nil, fmt.Errorf("step 2 - delete orphaned snapshot associations: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedFiles(ctx, tx, snapshotID); err != nil {
|
|
return nil, fmt.Errorf("step 3 - delete orphaned files: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedChunkToFileMappings(ctx, tx); err != nil {
|
|
return nil, fmt.Errorf("step 4 - delete orphaned chunk-to-file mappings: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedBlobs(ctx, tx, snapshotID); err != nil {
|
|
return nil, fmt.Errorf("step 5 - delete orphaned blobs: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedBlobToChunkMappings(ctx, tx); err != nil {
|
|
return nil, fmt.Errorf("step 6 - delete orphaned blob-to-chunk mappings: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedChunks(ctx, tx); err != nil {
|
|
return nil, fmt.Errorf("step 7 - delete orphaned chunks: %w", err)
|
|
}
|
|
|
|
// Commit transaction
|
|
log.Debug("[Temp DB Cleanup] Committing cleanup transaction")
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, fmt.Errorf("committing transaction: %w", err)
|
|
}
|
|
|
|
// Collect statistics about the cleaned database
|
|
stats := &CleanupStats{}
|
|
|
|
// Count files
|
|
var fileCount int
|
|
err = db.QueryRowWithLog(ctx, "SELECT COUNT(*) FROM files").Scan(&fileCount)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("counting files: %w", err)
|
|
}
|
|
stats.FileCount = fileCount
|
|
|
|
// Count chunks
|
|
var chunkCount int
|
|
err = db.QueryRowWithLog(ctx, "SELECT COUNT(*) FROM chunks").Scan(&chunkCount)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("counting chunks: %w", err)
|
|
}
|
|
stats.ChunkCount = chunkCount
|
|
|
|
// Count blobs and get sizes
|
|
var blobCount int
|
|
var compressedSize, uncompressedSize sql.NullInt64
|
|
err = db.QueryRowWithLog(ctx, `
|
|
SELECT COUNT(*), COALESCE(SUM(compressed_size), 0), COALESCE(SUM(uncompressed_size), 0)
|
|
FROM blobs
|
|
WHERE blob_hash IN (SELECT blob_hash FROM snapshot_blobs WHERE snapshot_id = ?)
|
|
`, snapshotID).Scan(&blobCount, &compressedSize, &uncompressedSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("counting blobs and sizes: %w", err)
|
|
}
|
|
stats.BlobCount = blobCount
|
|
stats.CompressedSize = compressedSize.Int64
|
|
stats.UncompressedSize = uncompressedSize.Int64
|
|
|
|
return stats, nil
|
|
}
|
|
|
|
// vacuumDatabase runs VACUUM on the database to remove deleted data and compact
|
|
// This is critical for security - ensures no stale/deleted data pages are uploaded
|
|
func (sm *SnapshotManager) vacuumDatabase(dbPath string) error {
|
|
log.Debug("Running VACUUM on database", "path", dbPath)
|
|
cmd := exec.Command("sqlite3", dbPath, "VACUUM;")
|
|
|
|
if output, err := cmd.CombinedOutput(); err != nil {
|
|
return fmt.Errorf("running VACUUM: %w (output: %s)", err, string(output))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// compressFile compresses a file using zstd and encrypts with age
|
|
func (sm *SnapshotManager) compressFile(inputPath, outputPath string) error {
|
|
input, err := sm.fs.Open(inputPath)
|
|
if err != nil {
|
|
return fmt.Errorf("opening input file: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := input.Close(); err != nil {
|
|
log.Debug("Failed to close input file", "path", inputPath, "error", err)
|
|
}
|
|
}()
|
|
|
|
output, err := sm.fs.Create(outputPath)
|
|
if err != nil {
|
|
return fmt.Errorf("creating output file: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := output.Close(); err != nil {
|
|
log.Debug("Failed to close output file", "path", outputPath, "error", err)
|
|
}
|
|
}()
|
|
|
|
// Use blobgen for compression and encryption
|
|
log.Debug("Compressing and encrypting data")
|
|
writer, err := blobgen.NewWriter(output, sm.config.CompressionLevel, sm.config.AgeRecipients)
|
|
if err != nil {
|
|
return fmt.Errorf("creating blobgen writer: %w", err)
|
|
}
|
|
|
|
// Track if writer has been closed to avoid double-close
|
|
writerClosed := false
|
|
defer func() {
|
|
if !writerClosed {
|
|
if err := writer.Close(); err != nil {
|
|
log.Debug("Failed to close writer", "error", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
if _, err := io.Copy(writer, input); err != nil {
|
|
return fmt.Errorf("compressing data: %w", err)
|
|
}
|
|
|
|
// Close writer to flush all data
|
|
if err := writer.Close(); err != nil {
|
|
return fmt.Errorf("closing writer: %w", err)
|
|
}
|
|
writerClosed = true
|
|
|
|
log.Debug("Compression complete", "hash", fmt.Sprintf("%x", writer.Sum256()))
|
|
|
|
return nil
|
|
}
|
|
|
|
// copyFile copies a file from src to dst
|
|
func (sm *SnapshotManager) copyFile(src, dst string) error {
|
|
log.Debug("Opening source file for copy", "path", src)
|
|
sourceFile, err := sm.fs.Open(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
log.Debug("Closing source file", "path", src)
|
|
if err := sourceFile.Close(); err != nil {
|
|
log.Debug("Failed to close source file", "path", src, "error", err)
|
|
}
|
|
}()
|
|
|
|
log.Debug("Creating destination file", "path", dst)
|
|
destFile, err := sm.fs.Create(dst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
log.Debug("Closing destination file", "path", dst)
|
|
if err := destFile.Close(); err != nil {
|
|
log.Debug("Failed to close destination file", "path", dst, "error", err)
|
|
}
|
|
}()
|
|
|
|
log.Debug("Copying file data")
|
|
n, err := io.Copy(destFile, sourceFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Debug("File copy complete", "bytes_copied", n)
|
|
|
|
return nil
|
|
}
|
|
|
|
// generateBlobManifest creates a compressed JSON list of all blobs in the snapshot
|
|
func (sm *SnapshotManager) generateBlobManifest(ctx context.Context, dbPath string, snapshotID string) ([]byte, error) {
|
|
|
|
// Open the cleaned database using the database package
|
|
db, err := database.New(ctx, dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening database: %w", err)
|
|
}
|
|
defer func() { _ = db.Close() }()
|
|
|
|
// Create repositories to access the data
|
|
repos := database.NewRepositories(db)
|
|
|
|
// Get all blobs for this snapshot
|
|
log.Debug("Querying blobs for snapshot", "snapshot_id", snapshotID)
|
|
blobHashes, err := repos.Snapshots.GetBlobHashes(ctx, snapshotID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting snapshot blobs: %w", err)
|
|
}
|
|
log.Debug("Found blobs", "count", len(blobHashes))
|
|
|
|
// Get blob details including sizes
|
|
blobs := make([]BlobInfo, 0, len(blobHashes))
|
|
totalCompressedSize := int64(0)
|
|
|
|
for _, hash := range blobHashes {
|
|
blob, err := repos.Blobs.GetByHash(ctx, hash)
|
|
if err != nil {
|
|
log.Warn("Failed to get blob details", "hash", hash, "error", err)
|
|
continue
|
|
}
|
|
if blob != nil {
|
|
blobs = append(blobs, BlobInfo{
|
|
Hash: hash,
|
|
CompressedSize: blob.CompressedSize,
|
|
})
|
|
totalCompressedSize += blob.CompressedSize
|
|
}
|
|
}
|
|
|
|
// Create manifest
|
|
manifest := &Manifest{
|
|
SnapshotID: snapshotID,
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
BlobCount: len(blobs),
|
|
TotalCompressedSize: totalCompressedSize,
|
|
Blobs: blobs,
|
|
}
|
|
|
|
// Encode manifest
|
|
compressedData, err := EncodeManifest(manifest, sm.config.CompressionLevel)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("encoding manifest: %w", err)
|
|
}
|
|
|
|
log.Info("Generated blob manifest",
|
|
"snapshot_id", snapshotID,
|
|
"blob_count", len(blobs),
|
|
"total_compressed_size", totalCompressedSize,
|
|
"manifest_size", len(compressedData))
|
|
|
|
return compressedData, nil
|
|
}
|
|
|
|
// compressData compresses data using zstd
|
|
|
|
// getFileSize returns the size of a file in bytes, or -1 if error
|
|
func (sm *SnapshotManager) getFileSize(path string) int64 {
|
|
info, err := sm.fs.Stat(path)
|
|
if err != nil {
|
|
return -1
|
|
}
|
|
return info.Size()
|
|
}
|
|
|
|
// BackupStats contains statistics from a backup operation
|
|
type BackupStats struct {
|
|
FilesScanned int
|
|
BytesScanned int64
|
|
ChunksCreated int
|
|
BlobsCreated int
|
|
BytesUploaded int64
|
|
}
|
|
|
|
// ExtendedBackupStats contains additional statistics for comprehensive tracking
|
|
type ExtendedBackupStats struct {
|
|
BackupStats
|
|
BlobUncompressedSize int64 // Total uncompressed size of all referenced blobs
|
|
CompressionLevel int // Compression level used for this snapshot
|
|
UploadDurationMs int64 // Total milliseconds spent uploading to S3
|
|
}
|
|
|
|
// CleanupIncompleteSnapshots removes incomplete snapshots that don't have metadata in S3.
|
|
// This is critical for data safety: incomplete snapshots can cause deduplication to skip
|
|
// files that were never successfully backed up, resulting in data loss.
|
|
func (sm *SnapshotManager) CleanupIncompleteSnapshots(ctx context.Context, hostname string) error {
|
|
log.Info("Checking for incomplete snapshots", "hostname", hostname)
|
|
|
|
// Get all incomplete snapshots for this hostname
|
|
incompleteSnapshots, err := sm.repos.Snapshots.GetIncompleteByHostname(ctx, hostname)
|
|
if err != nil {
|
|
return fmt.Errorf("getting incomplete snapshots: %w", err)
|
|
}
|
|
|
|
if len(incompleteSnapshots) == 0 {
|
|
log.Debug("No incomplete snapshots found")
|
|
return nil
|
|
}
|
|
|
|
log.Info("Found incomplete snapshots", "count", len(incompleteSnapshots))
|
|
|
|
// Check each incomplete snapshot for metadata in storage
|
|
for _, snapshot := range incompleteSnapshots {
|
|
// Check if metadata exists in storage
|
|
metadataKey := fmt.Sprintf("metadata/%s/db.zst", snapshot.ID)
|
|
_, err := sm.storage.Stat(ctx, metadataKey)
|
|
|
|
if err != nil {
|
|
// Metadata doesn't exist in S3 - this is an incomplete snapshot
|
|
log.Info("Cleaning up incomplete snapshot record", "snapshot_id", snapshot.ID, "started_at", snapshot.StartedAt)
|
|
|
|
// Delete the snapshot and all its associations
|
|
if err := sm.deleteSnapshot(ctx, snapshot.ID.String()); err != nil {
|
|
return fmt.Errorf("deleting incomplete snapshot %s: %w", snapshot.ID, err)
|
|
}
|
|
|
|
log.Info("Deleted incomplete snapshot record and associated data", "snapshot_id", snapshot.ID)
|
|
} else {
|
|
// Metadata exists - this snapshot was completed but database wasn't updated
|
|
// This shouldn't happen in normal operation, but mark it complete
|
|
log.Warn("Found snapshot with remote metadata but incomplete in database", "snapshot_id", snapshot.ID)
|
|
if err := sm.repos.Snapshots.MarkComplete(ctx, nil, snapshot.ID.String()); err != nil {
|
|
log.Error("Failed to mark snapshot as complete in database", "snapshot_id", snapshot.ID, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteSnapshot removes a snapshot and all its associations from the database
|
|
func (sm *SnapshotManager) deleteSnapshot(ctx context.Context, snapshotID string) error {
|
|
// Delete snapshot_files entries
|
|
if err := sm.repos.Snapshots.DeleteSnapshotFiles(ctx, snapshotID); err != nil {
|
|
return fmt.Errorf("deleting snapshot files: %w", err)
|
|
}
|
|
|
|
// Delete snapshot_blobs entries
|
|
if err := sm.repos.Snapshots.DeleteSnapshotBlobs(ctx, snapshotID); err != nil {
|
|
return fmt.Errorf("deleting snapshot blobs: %w", err)
|
|
}
|
|
|
|
// Delete uploads entries (has foreign key to snapshots without CASCADE)
|
|
if err := sm.repos.Snapshots.DeleteSnapshotUploads(ctx, snapshotID); err != nil {
|
|
return fmt.Errorf("deleting snapshot uploads: %w", err)
|
|
}
|
|
|
|
// Delete the snapshot itself
|
|
if err := sm.repos.Snapshots.Delete(ctx, snapshotID); err != nil {
|
|
return fmt.Errorf("deleting snapshot: %w", err)
|
|
}
|
|
|
|
// Clean up orphaned data
|
|
log.Debug("Cleaning up orphaned records in main database")
|
|
if err := sm.CleanupOrphanedData(ctx); err != nil {
|
|
return fmt.Errorf("cleaning up orphaned data: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CleanupOrphanedData removes files, chunks, and blobs that are no longer referenced by any snapshot.
|
|
// This should be called periodically to clean up data from deleted or incomplete snapshots.
|
|
func (sm *SnapshotManager) CleanupOrphanedData(ctx context.Context) error {
|
|
// Order is important to respect foreign key constraints:
|
|
// 1. Delete orphaned files (will cascade delete file_chunks)
|
|
// 2. Delete orphaned blobs (will cascade delete blob_chunks for deleted blobs)
|
|
// 3. Delete orphaned blob_chunks (where blob exists but chunk doesn't)
|
|
// 4. Delete orphaned chunks (now safe after all blob_chunks are gone)
|
|
|
|
// Delete orphaned files (files not in any snapshot)
|
|
log.Debug("Deleting orphaned file records from database")
|
|
if err := sm.repos.Files.DeleteOrphaned(ctx); err != nil {
|
|
return fmt.Errorf("deleting orphaned files: %w", err)
|
|
}
|
|
|
|
// Delete orphaned blobs (blobs not in any snapshot)
|
|
// This will cascade delete blob_chunks for deleted blobs
|
|
log.Debug("Deleting orphaned blob records from database")
|
|
if err := sm.repos.Blobs.DeleteOrphaned(ctx); err != nil {
|
|
return fmt.Errorf("deleting orphaned blobs: %w", err)
|
|
}
|
|
|
|
// Delete orphaned blob_chunks entries
|
|
// This handles cases where the blob still exists but chunks were deleted
|
|
log.Debug("Deleting orphaned blob_chunks associations from database")
|
|
if err := sm.repos.BlobChunks.DeleteOrphaned(ctx); err != nil {
|
|
return fmt.Errorf("deleting orphaned blob_chunks: %w", err)
|
|
}
|
|
|
|
// Delete orphaned chunks (chunks not referenced by any file)
|
|
// This must come after cleaning up blob_chunks to avoid foreign key violations
|
|
log.Debug("Deleting orphaned chunk records from database")
|
|
if err := sm.repos.Chunks.DeleteOrphaned(ctx); err != nil {
|
|
return fmt.Errorf("deleting orphaned chunks: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteOtherSnapshots deletes all snapshots except the current one
|
|
func (sm *SnapshotManager) deleteOtherSnapshots(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting all snapshot records except current", "keeping", currentSnapshotID)
|
|
|
|
// First delete uploads that reference other snapshots (no CASCADE DELETE on this FK)
|
|
database.LogSQL("Execute", "DELETE FROM uploads WHERE snapshot_id != ?", currentSnapshotID)
|
|
uploadResult, err := tx.ExecContext(ctx, "DELETE FROM uploads WHERE snapshot_id != ?", currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting uploads for other snapshots: %w", err)
|
|
}
|
|
uploadsDeleted, _ := uploadResult.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted upload records", "count", uploadsDeleted)
|
|
|
|
// Now we can safely delete the snapshots
|
|
database.LogSQL("Execute", "DELETE FROM snapshots WHERE id != ?", currentSnapshotID)
|
|
result, err := tx.ExecContext(ctx, "DELETE FROM snapshots WHERE id != ?", currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting other snapshots: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted snapshot records from database", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedSnapshotAssociations deletes snapshot_files and snapshot_blobs for deleted snapshots
|
|
func (sm *SnapshotManager) deleteOrphanedSnapshotAssociations(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
|
// Delete orphaned snapshot_files
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_files associations")
|
|
database.LogSQL("Execute", "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID)
|
|
result, err := tx.ExecContext(ctx, "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned snapshot_files: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted snapshot_files associations", "count", rowsAffected)
|
|
|
|
// Delete orphaned snapshot_blobs
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_blobs associations")
|
|
database.LogSQL("Execute", "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID)
|
|
result, err = tx.ExecContext(ctx, "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned snapshot_blobs: %w", err)
|
|
}
|
|
rowsAffected, _ = result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted snapshot_blobs associations", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedFiles deletes files not in the current snapshot
|
|
func (sm *SnapshotManager) deleteOrphanedFiles(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting file records not referenced by current snapshot")
|
|
database.LogSQL("Execute", `DELETE FROM files WHERE NOT EXISTS (SELECT 1 FROM snapshot_files WHERE snapshot_files.file_id = files.id AND snapshot_files.snapshot_id = ?)`, currentSnapshotID)
|
|
result, err := tx.ExecContext(ctx, `
|
|
DELETE FROM files
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM snapshot_files
|
|
WHERE snapshot_files.file_id = files.id
|
|
AND snapshot_files.snapshot_id = ?
|
|
)`, currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned files: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted file records from database", "count", rowsAffected)
|
|
|
|
// Note: file_chunks will be deleted via CASCADE
|
|
log.Debug("[Temp DB Cleanup] file_chunks associations deleted via CASCADE")
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedChunkToFileMappings deletes chunk_files entries for deleted files
|
|
func (sm *SnapshotManager) deleteOrphanedChunkToFileMappings(ctx context.Context, tx *sql.Tx) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned chunk_files associations")
|
|
database.LogSQL("Execute", `DELETE FROM chunk_files WHERE NOT EXISTS (SELECT 1 FROM files WHERE files.id = chunk_files.file_id)`)
|
|
result, err := tx.ExecContext(ctx, `
|
|
DELETE FROM chunk_files
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM files
|
|
WHERE files.id = chunk_files.file_id
|
|
)`)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned chunk_files: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted chunk_files associations", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedBlobs deletes blobs not in the current snapshot
|
|
func (sm *SnapshotManager) deleteOrphanedBlobs(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting blob records not referenced by current snapshot")
|
|
database.LogSQL("Execute", `DELETE FROM blobs WHERE NOT EXISTS (SELECT 1 FROM snapshot_blobs WHERE snapshot_blobs.blob_hash = blobs.blob_hash AND snapshot_blobs.snapshot_id = ?)`, currentSnapshotID)
|
|
result, err := tx.ExecContext(ctx, `
|
|
DELETE FROM blobs
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM snapshot_blobs
|
|
WHERE snapshot_blobs.blob_hash = blobs.blob_hash
|
|
AND snapshot_blobs.snapshot_id = ?
|
|
)`, currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned blobs: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted blob records from database", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedBlobToChunkMappings deletes blob_chunks entries for deleted blobs
|
|
func (sm *SnapshotManager) deleteOrphanedBlobToChunkMappings(ctx context.Context, tx *sql.Tx) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned blob_chunks associations")
|
|
database.LogSQL("Execute", `DELETE FROM blob_chunks WHERE NOT EXISTS (SELECT 1 FROM blobs WHERE blobs.id = blob_chunks.blob_id)`)
|
|
result, err := tx.ExecContext(ctx, `
|
|
DELETE FROM blob_chunks
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM blobs
|
|
WHERE blobs.id = blob_chunks.blob_id
|
|
)`)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned blob_chunks: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted blob_chunks associations", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedChunks deletes chunks not referenced by any file or blob
|
|
func (sm *SnapshotManager) deleteOrphanedChunks(ctx context.Context, tx *sql.Tx) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned chunk records")
|
|
query := `
|
|
DELETE FROM chunks
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM file_chunks
|
|
WHERE file_chunks.chunk_hash = chunks.chunk_hash
|
|
)
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM blob_chunks
|
|
WHERE blob_chunks.chunk_hash = chunks.chunk_hash
|
|
)`
|
|
database.LogSQL("Execute", query)
|
|
result, err := tx.ExecContext(ctx, query)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned chunks: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted chunk records from database", "count", rowsAffected)
|
|
return nil
|
|
}
|