- Add afero.Fs field to Vaultik struct for filesystem operations - Vaultik now owns and manages the filesystem instance - SnapshotManager receives filesystem via SetFilesystem() setter - Update blob packer to use afero for temporary files - Convert all filesystem operations to use afero abstraction - Remove filesystem module - Vaultik manages filesystem directly - Update tests: remove symlink test (unsupported by afero memfs) - Fix TestMultipleFileChanges to handle scanner examining directories This enables full end-to-end testing without touching disk by using memory-backed filesystems. Database operations continue using real filesystem as SQLite requires actual files.
878 lines
32 KiB
Go
878 lines
32 KiB
Go
package snapshot
|
|
|
|
// Snapshot Metadata Export Process
|
|
// ================================
|
|
//
|
|
// The snapshot metadata contains all information needed to restore a snapshot.
|
|
// Instead of creating a custom format, we use a trimmed copy of the SQLite
|
|
// database containing only data relevant to the current snapshot.
|
|
//
|
|
// Process Overview:
|
|
// 1. After all files/chunks/blobs are backed up, create a snapshot record
|
|
// 2. Close the main database to ensure consistency
|
|
// 3. Copy the entire database to a temporary file
|
|
// 4. Open the temporary database
|
|
// 5. Delete all snapshots except the current one
|
|
// 6. Delete all orphaned records:
|
|
// - Files not referenced by any remaining snapshot
|
|
// - Chunks not referenced by any remaining files
|
|
// - Blobs not containing any remaining chunks
|
|
// - All related mapping tables (file_chunks, chunk_files, blob_chunks)
|
|
// 7. Close the temporary database
|
|
// 8. Use sqlite3 to dump the cleaned database to SQL
|
|
// 9. Delete the temporary database file
|
|
// 10. Compress the SQL dump with zstd
|
|
// 11. Encrypt the compressed dump with age (if encryption is enabled)
|
|
// 12. Upload to S3 as: snapshots/{snapshot-id}.sql.zst[.age]
|
|
// 13. Reopen the main database
|
|
//
|
|
// Advantages of this approach:
|
|
// - No custom metadata format needed
|
|
// - Reuses existing database schema and relationships
|
|
// - SQL dumps are portable and compress well
|
|
// - Restore process can simply execute the SQL
|
|
// - Atomic and consistent snapshot of all metadata
|
|
//
|
|
// TODO: Future improvements:
|
|
// - Add snapshot-file relationships to track which files belong to which snapshot
|
|
// - Implement incremental snapshots that reference previous snapshots
|
|
// - Add snapshot manifest with additional metadata (size, chunk count, etc.)
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"io"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/vaultik/internal/blobgen"
|
|
"git.eeqj.de/sneak/vaultik/internal/config"
|
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
|
"git.eeqj.de/sneak/vaultik/internal/s3"
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/spf13/afero"
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
// SnapshotManager handles snapshot creation and metadata export
|
|
type SnapshotManager struct {
|
|
repos *database.Repositories
|
|
s3Client S3Client
|
|
config *config.Config
|
|
fs afero.Fs
|
|
}
|
|
|
|
// SnapshotManagerParams holds dependencies for NewSnapshotManager
|
|
type SnapshotManagerParams struct {
|
|
fx.In
|
|
|
|
Repos *database.Repositories
|
|
S3Client *s3.Client
|
|
Config *config.Config
|
|
}
|
|
|
|
// NewSnapshotManager creates a new snapshot manager for dependency injection
|
|
func NewSnapshotManager(params SnapshotManagerParams) *SnapshotManager {
|
|
return &SnapshotManager{
|
|
repos: params.Repos,
|
|
s3Client: params.S3Client,
|
|
config: params.Config,
|
|
}
|
|
}
|
|
|
|
// SetFilesystem sets the filesystem to use for all file operations
|
|
func (sm *SnapshotManager) SetFilesystem(fs afero.Fs) {
|
|
sm.fs = fs
|
|
}
|
|
|
|
// CreateSnapshot creates a new snapshot record in the database at the start of a backup
|
|
func (sm *SnapshotManager) CreateSnapshot(ctx context.Context, hostname, version, gitRevision string) (string, error) {
|
|
snapshotID := fmt.Sprintf("%s-%s", hostname, time.Now().UTC().Format("20060102-150405Z"))
|
|
|
|
snapshot := &database.Snapshot{
|
|
ID: snapshotID,
|
|
Hostname: hostname,
|
|
VaultikVersion: version,
|
|
VaultikGitRevision: gitRevision,
|
|
StartedAt: time.Now().UTC(),
|
|
CompletedAt: nil, // Not completed yet
|
|
FileCount: 0,
|
|
ChunkCount: 0,
|
|
BlobCount: 0,
|
|
TotalSize: 0,
|
|
BlobSize: 0,
|
|
CompressionRatio: 1.0,
|
|
}
|
|
|
|
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
return sm.repos.Snapshots.Create(ctx, tx, snapshot)
|
|
})
|
|
|
|
if err != nil {
|
|
return "", fmt.Errorf("creating snapshot: %w", err)
|
|
}
|
|
|
|
log.Info("Created snapshot", "snapshot_id", snapshotID)
|
|
return snapshotID, nil
|
|
}
|
|
|
|
// UpdateSnapshotStats updates the statistics for a snapshot during backup
|
|
func (sm *SnapshotManager) UpdateSnapshotStats(ctx context.Context, snapshotID string, stats BackupStats) error {
|
|
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
return sm.repos.Snapshots.UpdateCounts(ctx, tx, snapshotID,
|
|
int64(stats.FilesScanned),
|
|
int64(stats.ChunksCreated),
|
|
int64(stats.BlobsCreated),
|
|
stats.BytesScanned,
|
|
stats.BytesUploaded,
|
|
)
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("updating snapshot stats: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateSnapshotStatsExtended updates snapshot statistics with extended metrics.
|
|
// This includes compression level, uncompressed blob size, and upload duration.
|
|
func (sm *SnapshotManager) UpdateSnapshotStatsExtended(ctx context.Context, snapshotID string, stats ExtendedBackupStats) error {
|
|
return sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
// First update basic stats
|
|
if err := sm.repos.Snapshots.UpdateCounts(ctx, tx, snapshotID,
|
|
int64(stats.FilesScanned),
|
|
int64(stats.ChunksCreated),
|
|
int64(stats.BlobsCreated),
|
|
stats.BytesScanned,
|
|
stats.BytesUploaded,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Then update extended stats
|
|
return sm.repos.Snapshots.UpdateExtendedStats(ctx, tx, snapshotID,
|
|
stats.BlobUncompressedSize,
|
|
stats.CompressionLevel,
|
|
stats.UploadDurationMs,
|
|
)
|
|
})
|
|
}
|
|
|
|
// CompleteSnapshot marks a snapshot as completed and exports its metadata
|
|
func (sm *SnapshotManager) CompleteSnapshot(ctx context.Context, snapshotID string) error {
|
|
// Mark the snapshot as completed
|
|
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
return sm.repos.Snapshots.MarkComplete(ctx, tx, snapshotID)
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("marking snapshot complete: %w", err)
|
|
}
|
|
|
|
log.Info("Completed snapshot", "snapshot_id", snapshotID)
|
|
return nil
|
|
}
|
|
|
|
// ExportSnapshotMetadata exports snapshot metadata to S3
|
|
//
|
|
// This method executes the complete snapshot metadata export process:
|
|
// 1. Creates a temporary directory for working files
|
|
// 2. Copies the main database to preserve its state
|
|
// 3. Cleans the copy to contain only current snapshot data
|
|
// 4. Dumps the cleaned database to SQL
|
|
// 5. Compresses the SQL dump with zstd
|
|
// 6. Encrypts the compressed data (if encryption is enabled)
|
|
// 7. Uploads to S3 at: snapshots/{snapshot-id}.sql.zst[.age]
|
|
//
|
|
// The caller is responsible for:
|
|
// - Ensuring the main database is closed before calling this method
|
|
// - Reopening the main database after this method returns
|
|
//
|
|
// This ensures database consistency during the copy operation.
|
|
func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath string, snapshotID string) error {
|
|
log.Info("Phase 3/3: Exporting snapshot metadata", "snapshot_id", snapshotID, "source_db", dbPath)
|
|
|
|
// Create temp directory for all temporary files
|
|
tempDir, err := afero.TempDir(sm.fs, "", "vaultik-snapshot-*")
|
|
if err != nil {
|
|
return fmt.Errorf("creating temp dir: %w", err)
|
|
}
|
|
log.Debug("Created temporary directory", "path", tempDir)
|
|
defer func() {
|
|
log.Debug("Cleaning up temporary directory", "path", tempDir)
|
|
if err := sm.fs.RemoveAll(tempDir); err != nil {
|
|
log.Debug("Failed to remove temp dir", "path", tempDir, "error", err)
|
|
}
|
|
}()
|
|
|
|
// Step 1: Copy database to temp file
|
|
// The main database should be closed at this point
|
|
tempDBPath := filepath.Join(tempDir, "snapshot.db")
|
|
log.Debug("Copying database to temporary location", "source", dbPath, "destination", tempDBPath)
|
|
if err := sm.copyFile(dbPath, tempDBPath); err != nil {
|
|
return fmt.Errorf("copying database: %w", err)
|
|
}
|
|
log.Debug("Database copy complete", "size", sm.getFileSize(tempDBPath))
|
|
|
|
// Step 2: Clean the temp database to only contain current snapshot data
|
|
log.Debug("Cleaning temporary database", "snapshot_id", snapshotID)
|
|
stats, err := sm.cleanSnapshotDB(ctx, tempDBPath, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("cleaning snapshot database: %w", err)
|
|
}
|
|
log.Info("Temporary database cleanup complete",
|
|
"db_path", tempDBPath,
|
|
"size_after_clean", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))),
|
|
"files", stats.FileCount,
|
|
"chunks", stats.ChunkCount,
|
|
"blobs", stats.BlobCount,
|
|
"total_compressed_size", humanize.Bytes(uint64(stats.CompressedSize)),
|
|
"total_uncompressed_size", humanize.Bytes(uint64(stats.UncompressedSize)),
|
|
"compression_ratio", fmt.Sprintf("%.2fx", float64(stats.UncompressedSize)/float64(stats.CompressedSize)))
|
|
|
|
// Step 3: Dump the cleaned database to SQL
|
|
dumpPath := filepath.Join(tempDir, "snapshot.sql")
|
|
if err := sm.dumpDatabase(tempDBPath, dumpPath); err != nil {
|
|
return fmt.Errorf("dumping database: %w", err)
|
|
}
|
|
log.Debug("SQL dump complete", "size", humanize.Bytes(uint64(sm.getFileSize(dumpPath))))
|
|
|
|
// Step 4: Compress and encrypt the SQL dump
|
|
compressedPath := filepath.Join(tempDir, "snapshot.sql.zst.age")
|
|
if err := sm.compressDump(dumpPath, compressedPath); err != nil {
|
|
return fmt.Errorf("compressing dump: %w", err)
|
|
}
|
|
log.Debug("Compression complete",
|
|
"original_size", humanize.Bytes(uint64(sm.getFileSize(dumpPath))),
|
|
"compressed_size", humanize.Bytes(uint64(sm.getFileSize(compressedPath))))
|
|
|
|
// Step 5: Read compressed and encrypted data for upload
|
|
finalData, err := afero.ReadFile(sm.fs, compressedPath)
|
|
if err != nil {
|
|
return fmt.Errorf("reading compressed dump: %w", err)
|
|
}
|
|
|
|
// Step 6: Generate blob manifest (before closing temp DB)
|
|
blobManifest, err := sm.generateBlobManifest(ctx, tempDBPath, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("generating blob manifest: %w", err)
|
|
}
|
|
|
|
// Step 7: Upload to S3 in snapshot subdirectory
|
|
// Upload database backup (compressed and encrypted)
|
|
dbKey := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
|
|
|
dbUploadStart := time.Now()
|
|
if err := sm.s3Client.PutObject(ctx, dbKey, bytes.NewReader(finalData)); err != nil {
|
|
return fmt.Errorf("uploading snapshot database: %w", err)
|
|
}
|
|
dbUploadDuration := time.Since(dbUploadStart)
|
|
dbUploadSpeed := float64(len(finalData)) * 8 / dbUploadDuration.Seconds() // bits per second
|
|
log.Info("Uploaded snapshot database to S3",
|
|
"path", dbKey,
|
|
"size", humanize.Bytes(uint64(len(finalData))),
|
|
"duration", dbUploadDuration,
|
|
"speed", humanize.SI(dbUploadSpeed, "bps"))
|
|
|
|
// Upload blob manifest (compressed only, not encrypted)
|
|
manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
|
manifestUploadStart := time.Now()
|
|
if err := sm.s3Client.PutObject(ctx, manifestKey, bytes.NewReader(blobManifest)); err != nil {
|
|
return fmt.Errorf("uploading blob manifest: %w", err)
|
|
}
|
|
manifestUploadDuration := time.Since(manifestUploadStart)
|
|
manifestUploadSpeed := float64(len(blobManifest)) * 8 / manifestUploadDuration.Seconds() // bits per second
|
|
log.Info("Uploaded blob manifest to S3",
|
|
"path", manifestKey,
|
|
"size", humanize.Bytes(uint64(len(blobManifest))),
|
|
"duration", manifestUploadDuration,
|
|
"speed", humanize.SI(manifestUploadSpeed, "bps"))
|
|
|
|
log.Info("Uploaded snapshot metadata",
|
|
"snapshot_id", snapshotID,
|
|
"db_size", len(finalData),
|
|
"manifest_size", len(blobManifest))
|
|
return nil
|
|
}
|
|
|
|
// CleanupStats contains statistics about cleaned snapshot database
|
|
type CleanupStats struct {
|
|
FileCount int
|
|
ChunkCount int
|
|
BlobCount int
|
|
CompressedSize int64
|
|
UncompressedSize int64
|
|
}
|
|
|
|
// cleanSnapshotDB removes all data except for the specified snapshot
|
|
//
|
|
// The cleanup is performed in a specific order to maintain referential integrity:
|
|
// 1. Delete other snapshots
|
|
// 2. Delete orphaned snapshot associations (snapshot_files, snapshot_blobs) for deleted snapshots
|
|
// 3. Delete orphaned files (not in the current snapshot)
|
|
// 4. Delete orphaned chunk-to-file mappings (references to deleted files)
|
|
// 5. Delete orphaned blobs (not in the current snapshot)
|
|
// 6. Delete orphaned blob-to-chunk mappings (references to deleted chunks)
|
|
// 7. Delete orphaned chunks (not referenced by any file)
|
|
//
|
|
// Each step is implemented as a separate method for clarity and maintainability.
|
|
func (sm *SnapshotManager) cleanSnapshotDB(ctx context.Context, dbPath string, snapshotID string) (*CleanupStats, error) {
|
|
// Open the temp database
|
|
db, err := database.New(ctx, dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening temp database: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := db.Close(); err != nil {
|
|
log.Debug("Failed to close temp database", "error", err)
|
|
}
|
|
}()
|
|
|
|
// Start a transaction
|
|
tx, err := db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("beginning transaction: %w", err)
|
|
}
|
|
defer func() {
|
|
if rbErr := tx.Rollback(); rbErr != nil && rbErr != sql.ErrTxDone {
|
|
log.Debug("Failed to rollback transaction", "error", rbErr)
|
|
}
|
|
}()
|
|
|
|
// Execute cleanup steps in order
|
|
if err := sm.deleteOtherSnapshots(ctx, tx, snapshotID); err != nil {
|
|
return nil, fmt.Errorf("step 1 - delete other snapshots: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedSnapshotAssociations(ctx, tx, snapshotID); err != nil {
|
|
return nil, fmt.Errorf("step 2 - delete orphaned snapshot associations: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedFiles(ctx, tx, snapshotID); err != nil {
|
|
return nil, fmt.Errorf("step 3 - delete orphaned files: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedChunkToFileMappings(ctx, tx); err != nil {
|
|
return nil, fmt.Errorf("step 4 - delete orphaned chunk-to-file mappings: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedBlobs(ctx, tx, snapshotID); err != nil {
|
|
return nil, fmt.Errorf("step 5 - delete orphaned blobs: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedBlobToChunkMappings(ctx, tx); err != nil {
|
|
return nil, fmt.Errorf("step 6 - delete orphaned blob-to-chunk mappings: %w", err)
|
|
}
|
|
|
|
if err := sm.deleteOrphanedChunks(ctx, tx); err != nil {
|
|
return nil, fmt.Errorf("step 7 - delete orphaned chunks: %w", err)
|
|
}
|
|
|
|
// Commit transaction
|
|
log.Debug("[Temp DB Cleanup] Committing cleanup transaction")
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, fmt.Errorf("committing transaction: %w", err)
|
|
}
|
|
|
|
// Collect statistics about the cleaned database
|
|
stats := &CleanupStats{}
|
|
|
|
// Count files
|
|
var fileCount int
|
|
err = db.QueryRowWithLog(ctx, "SELECT COUNT(*) FROM files").Scan(&fileCount)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("counting files: %w", err)
|
|
}
|
|
stats.FileCount = fileCount
|
|
|
|
// Count chunks
|
|
var chunkCount int
|
|
err = db.QueryRowWithLog(ctx, "SELECT COUNT(*) FROM chunks").Scan(&chunkCount)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("counting chunks: %w", err)
|
|
}
|
|
stats.ChunkCount = chunkCount
|
|
|
|
// Count blobs and get sizes
|
|
var blobCount int
|
|
var compressedSize, uncompressedSize sql.NullInt64
|
|
err = db.QueryRowWithLog(ctx, `
|
|
SELECT COUNT(*), COALESCE(SUM(compressed_size), 0), COALESCE(SUM(uncompressed_size), 0)
|
|
FROM blobs
|
|
WHERE blob_hash IN (SELECT blob_hash FROM snapshot_blobs WHERE snapshot_id = ?)
|
|
`, snapshotID).Scan(&blobCount, &compressedSize, &uncompressedSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("counting blobs and sizes: %w", err)
|
|
}
|
|
stats.BlobCount = blobCount
|
|
stats.CompressedSize = compressedSize.Int64
|
|
stats.UncompressedSize = uncompressedSize.Int64
|
|
|
|
return stats, nil
|
|
}
|
|
|
|
// dumpDatabase creates a SQL dump of the database
|
|
func (sm *SnapshotManager) dumpDatabase(dbPath, dumpPath string) error {
|
|
log.Debug("Running sqlite3 dump command", "source", dbPath, "destination", dumpPath)
|
|
cmd := exec.Command("sqlite3", dbPath, ".dump")
|
|
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
return fmt.Errorf("running sqlite3 dump: %w", err)
|
|
}
|
|
|
|
log.Debug("SQL dump generated", "size", humanize.Bytes(uint64(len(output))))
|
|
if err := afero.WriteFile(sm.fs, dumpPath, output, 0644); err != nil {
|
|
return fmt.Errorf("writing dump file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// compressDump compresses the SQL dump using zstd
|
|
func (sm *SnapshotManager) compressDump(inputPath, outputPath string) error {
|
|
input, err := sm.fs.Open(inputPath)
|
|
if err != nil {
|
|
return fmt.Errorf("opening input file: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := input.Close(); err != nil {
|
|
log.Debug("Failed to close input file", "path", inputPath, "error", err)
|
|
}
|
|
}()
|
|
|
|
output, err := sm.fs.Create(outputPath)
|
|
if err != nil {
|
|
return fmt.Errorf("creating output file: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := output.Close(); err != nil {
|
|
log.Debug("Failed to close output file", "path", outputPath, "error", err)
|
|
}
|
|
}()
|
|
|
|
// Use blobgen for compression and encryption
|
|
log.Debug("Compressing and encrypting data")
|
|
writer, err := blobgen.NewWriter(output, sm.config.CompressionLevel, sm.config.AgeRecipients)
|
|
if err != nil {
|
|
return fmt.Errorf("creating blobgen writer: %w", err)
|
|
}
|
|
|
|
// Track if writer has been closed to avoid double-close
|
|
writerClosed := false
|
|
defer func() {
|
|
if !writerClosed {
|
|
if err := writer.Close(); err != nil {
|
|
log.Debug("Failed to close writer", "error", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
if _, err := io.Copy(writer, input); err != nil {
|
|
return fmt.Errorf("compressing data: %w", err)
|
|
}
|
|
|
|
// Close writer to flush all data
|
|
if err := writer.Close(); err != nil {
|
|
return fmt.Errorf("closing writer: %w", err)
|
|
}
|
|
writerClosed = true
|
|
|
|
log.Debug("Compression complete", "hash", fmt.Sprintf("%x", writer.Sum256()))
|
|
|
|
return nil
|
|
}
|
|
|
|
// copyFile copies a file from src to dst
|
|
func (sm *SnapshotManager) copyFile(src, dst string) error {
|
|
log.Debug("Opening source file for copy", "path", src)
|
|
sourceFile, err := sm.fs.Open(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
log.Debug("Closing source file", "path", src)
|
|
if err := sourceFile.Close(); err != nil {
|
|
log.Debug("Failed to close source file", "path", src, "error", err)
|
|
}
|
|
}()
|
|
|
|
log.Debug("Creating destination file", "path", dst)
|
|
destFile, err := sm.fs.Create(dst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
log.Debug("Closing destination file", "path", dst)
|
|
if err := destFile.Close(); err != nil {
|
|
log.Debug("Failed to close destination file", "path", dst, "error", err)
|
|
}
|
|
}()
|
|
|
|
log.Debug("Copying file data")
|
|
n, err := io.Copy(destFile, sourceFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Debug("File copy complete", "bytes_copied", n)
|
|
|
|
return nil
|
|
}
|
|
|
|
// generateBlobManifest creates a compressed JSON list of all blobs in the snapshot
|
|
func (sm *SnapshotManager) generateBlobManifest(ctx context.Context, dbPath string, snapshotID string) ([]byte, error) {
|
|
|
|
// Open the cleaned database using the database package
|
|
db, err := database.New(ctx, dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening database: %w", err)
|
|
}
|
|
defer func() { _ = db.Close() }()
|
|
|
|
// Create repositories to access the data
|
|
repos := database.NewRepositories(db)
|
|
|
|
// Get all blobs for this snapshot
|
|
log.Debug("Querying blobs for snapshot", "snapshot_id", snapshotID)
|
|
blobHashes, err := repos.Snapshots.GetBlobHashes(ctx, snapshotID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting snapshot blobs: %w", err)
|
|
}
|
|
log.Debug("Found blobs", "count", len(blobHashes))
|
|
|
|
// Get blob details including sizes
|
|
blobs := make([]BlobInfo, 0, len(blobHashes))
|
|
totalCompressedSize := int64(0)
|
|
|
|
for _, hash := range blobHashes {
|
|
blob, err := repos.Blobs.GetByHash(ctx, hash)
|
|
if err != nil {
|
|
log.Warn("Failed to get blob details", "hash", hash, "error", err)
|
|
continue
|
|
}
|
|
if blob != nil {
|
|
blobs = append(blobs, BlobInfo{
|
|
Hash: hash,
|
|
CompressedSize: blob.CompressedSize,
|
|
})
|
|
totalCompressedSize += blob.CompressedSize
|
|
}
|
|
}
|
|
|
|
// Create manifest
|
|
manifest := &Manifest{
|
|
SnapshotID: snapshotID,
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
BlobCount: len(blobs),
|
|
TotalCompressedSize: totalCompressedSize,
|
|
Blobs: blobs,
|
|
}
|
|
|
|
// Encode manifest
|
|
compressedData, err := EncodeManifest(manifest, sm.config.CompressionLevel)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("encoding manifest: %w", err)
|
|
}
|
|
|
|
log.Info("Generated blob manifest",
|
|
"snapshot_id", snapshotID,
|
|
"blob_count", len(blobs),
|
|
"total_compressed_size", totalCompressedSize,
|
|
"manifest_size", len(compressedData))
|
|
|
|
return compressedData, nil
|
|
}
|
|
|
|
// compressData compresses data using zstd
|
|
|
|
// getFileSize returns the size of a file in bytes, or -1 if error
|
|
func (sm *SnapshotManager) getFileSize(path string) int64 {
|
|
info, err := sm.fs.Stat(path)
|
|
if err != nil {
|
|
return -1
|
|
}
|
|
return info.Size()
|
|
}
|
|
|
|
// BackupStats contains statistics from a backup operation
|
|
type BackupStats struct {
|
|
FilesScanned int
|
|
BytesScanned int64
|
|
ChunksCreated int
|
|
BlobsCreated int
|
|
BytesUploaded int64
|
|
}
|
|
|
|
// ExtendedBackupStats contains additional statistics for comprehensive tracking
|
|
type ExtendedBackupStats struct {
|
|
BackupStats
|
|
BlobUncompressedSize int64 // Total uncompressed size of all referenced blobs
|
|
CompressionLevel int // Compression level used for this snapshot
|
|
UploadDurationMs int64 // Total milliseconds spent uploading to S3
|
|
}
|
|
|
|
// CleanupIncompleteSnapshots removes incomplete snapshots that don't have metadata in S3.
|
|
// This is critical for data safety: incomplete snapshots can cause deduplication to skip
|
|
// files that were never successfully backed up, resulting in data loss.
|
|
func (sm *SnapshotManager) CleanupIncompleteSnapshots(ctx context.Context, hostname string) error {
|
|
log.Info("Checking for incomplete snapshots", "hostname", hostname)
|
|
|
|
// Get all incomplete snapshots for this hostname
|
|
incompleteSnapshots, err := sm.repos.Snapshots.GetIncompleteByHostname(ctx, hostname)
|
|
if err != nil {
|
|
return fmt.Errorf("getting incomplete snapshots: %w", err)
|
|
}
|
|
|
|
if len(incompleteSnapshots) == 0 {
|
|
log.Debug("No incomplete snapshots found")
|
|
return nil
|
|
}
|
|
|
|
log.Info("Found incomplete snapshots", "count", len(incompleteSnapshots))
|
|
|
|
// Check each incomplete snapshot for metadata in S3
|
|
for _, snapshot := range incompleteSnapshots {
|
|
// Check if metadata exists in S3
|
|
metadataKey := fmt.Sprintf("metadata/%s/db.zst", snapshot.ID)
|
|
_, err := sm.s3Client.StatObject(ctx, metadataKey)
|
|
|
|
if err != nil {
|
|
// Metadata doesn't exist in S3 - this is an incomplete snapshot
|
|
log.Info("Cleaning up incomplete snapshot record", "snapshot_id", snapshot.ID, "started_at", snapshot.StartedAt)
|
|
|
|
// Delete the snapshot and all its associations
|
|
if err := sm.deleteSnapshot(ctx, snapshot.ID); err != nil {
|
|
return fmt.Errorf("deleting incomplete snapshot %s: %w", snapshot.ID, err)
|
|
}
|
|
|
|
log.Info("Deleted incomplete snapshot record and associated data", "snapshot_id", snapshot.ID)
|
|
} else {
|
|
// Metadata exists - this snapshot was completed but database wasn't updated
|
|
// This shouldn't happen in normal operation, but mark it complete
|
|
log.Warn("Found snapshot with S3 metadata but incomplete in database", "snapshot_id", snapshot.ID)
|
|
if err := sm.repos.Snapshots.MarkComplete(ctx, nil, snapshot.ID); err != nil {
|
|
log.Error("Failed to mark snapshot as complete in database", "snapshot_id", snapshot.ID, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteSnapshot removes a snapshot and all its associations from the database
|
|
func (sm *SnapshotManager) deleteSnapshot(ctx context.Context, snapshotID string) error {
|
|
// Delete snapshot_files entries
|
|
if err := sm.repos.Snapshots.DeleteSnapshotFiles(ctx, snapshotID); err != nil {
|
|
return fmt.Errorf("deleting snapshot files: %w", err)
|
|
}
|
|
|
|
// Delete snapshot_blobs entries
|
|
if err := sm.repos.Snapshots.DeleteSnapshotBlobs(ctx, snapshotID); err != nil {
|
|
return fmt.Errorf("deleting snapshot blobs: %w", err)
|
|
}
|
|
|
|
// Delete the snapshot itself
|
|
if err := sm.repos.Snapshots.Delete(ctx, snapshotID); err != nil {
|
|
return fmt.Errorf("deleting snapshot: %w", err)
|
|
}
|
|
|
|
// Clean up orphaned data
|
|
log.Debug("Cleaning up orphaned records in main database")
|
|
if err := sm.cleanupOrphanedData(ctx); err != nil {
|
|
return fmt.Errorf("cleaning up orphaned data: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// cleanupOrphanedData removes files, chunks, and blobs that are no longer referenced by any snapshot
|
|
func (sm *SnapshotManager) cleanupOrphanedData(ctx context.Context) error {
|
|
// Order is important to respect foreign key constraints:
|
|
// 1. Delete orphaned files (will cascade delete file_chunks)
|
|
// 2. Delete orphaned blobs (will cascade delete blob_chunks for deleted blobs)
|
|
// 3. Delete orphaned blob_chunks (where blob exists but chunk doesn't)
|
|
// 4. Delete orphaned chunks (now safe after all blob_chunks are gone)
|
|
|
|
// Delete orphaned files (files not in any snapshot)
|
|
log.Debug("Deleting orphaned file records from database")
|
|
if err := sm.repos.Files.DeleteOrphaned(ctx); err != nil {
|
|
return fmt.Errorf("deleting orphaned files: %w", err)
|
|
}
|
|
|
|
// Delete orphaned blobs (blobs not in any snapshot)
|
|
// This will cascade delete blob_chunks for deleted blobs
|
|
log.Debug("Deleting orphaned blob records from database")
|
|
if err := sm.repos.Blobs.DeleteOrphaned(ctx); err != nil {
|
|
return fmt.Errorf("deleting orphaned blobs: %w", err)
|
|
}
|
|
|
|
// Delete orphaned blob_chunks entries
|
|
// This handles cases where the blob still exists but chunks were deleted
|
|
log.Debug("Deleting orphaned blob_chunks associations from database")
|
|
if err := sm.repos.BlobChunks.DeleteOrphaned(ctx); err != nil {
|
|
return fmt.Errorf("deleting orphaned blob_chunks: %w", err)
|
|
}
|
|
|
|
// Delete orphaned chunks (chunks not referenced by any file)
|
|
// This must come after cleaning up blob_chunks to avoid foreign key violations
|
|
log.Debug("Deleting orphaned chunk records from database")
|
|
if err := sm.repos.Chunks.DeleteOrphaned(ctx); err != nil {
|
|
return fmt.Errorf("deleting orphaned chunks: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteOtherSnapshots deletes all snapshots except the current one
|
|
func (sm *SnapshotManager) deleteOtherSnapshots(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting all snapshot records except current", "keeping", currentSnapshotID)
|
|
|
|
// First delete uploads that reference other snapshots (no CASCADE DELETE on this FK)
|
|
database.LogSQL("Execute", "DELETE FROM uploads WHERE snapshot_id != ?", currentSnapshotID)
|
|
uploadResult, err := tx.ExecContext(ctx, "DELETE FROM uploads WHERE snapshot_id != ?", currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting uploads for other snapshots: %w", err)
|
|
}
|
|
uploadsDeleted, _ := uploadResult.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted upload records", "count", uploadsDeleted)
|
|
|
|
// Now we can safely delete the snapshots
|
|
database.LogSQL("Execute", "DELETE FROM snapshots WHERE id != ?", currentSnapshotID)
|
|
result, err := tx.ExecContext(ctx, "DELETE FROM snapshots WHERE id != ?", currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting other snapshots: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted snapshot records from database", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedSnapshotAssociations deletes snapshot_files and snapshot_blobs for deleted snapshots
|
|
func (sm *SnapshotManager) deleteOrphanedSnapshotAssociations(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
|
// Delete orphaned snapshot_files
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_files associations")
|
|
database.LogSQL("Execute", "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID)
|
|
result, err := tx.ExecContext(ctx, "DELETE FROM snapshot_files WHERE snapshot_id != ?", currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned snapshot_files: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted snapshot_files associations", "count", rowsAffected)
|
|
|
|
// Delete orphaned snapshot_blobs
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned snapshot_blobs associations")
|
|
database.LogSQL("Execute", "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID)
|
|
result, err = tx.ExecContext(ctx, "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned snapshot_blobs: %w", err)
|
|
}
|
|
rowsAffected, _ = result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted snapshot_blobs associations", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedFiles deletes files not in the current snapshot
|
|
func (sm *SnapshotManager) deleteOrphanedFiles(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting file records not referenced by current snapshot")
|
|
database.LogSQL("Execute", `DELETE FROM files WHERE NOT EXISTS (SELECT 1 FROM snapshot_files WHERE snapshot_files.file_id = files.id AND snapshot_files.snapshot_id = ?)`, currentSnapshotID)
|
|
result, err := tx.ExecContext(ctx, `
|
|
DELETE FROM files
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM snapshot_files
|
|
WHERE snapshot_files.file_id = files.id
|
|
AND snapshot_files.snapshot_id = ?
|
|
)`, currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned files: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted file records from database", "count", rowsAffected)
|
|
|
|
// Note: file_chunks will be deleted via CASCADE
|
|
log.Debug("[Temp DB Cleanup] file_chunks associations deleted via CASCADE")
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedChunkToFileMappings deletes chunk_files entries for deleted files
|
|
func (sm *SnapshotManager) deleteOrphanedChunkToFileMappings(ctx context.Context, tx *sql.Tx) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned chunk_files associations")
|
|
database.LogSQL("Execute", `DELETE FROM chunk_files WHERE NOT EXISTS (SELECT 1 FROM files WHERE files.id = chunk_files.file_id)`)
|
|
result, err := tx.ExecContext(ctx, `
|
|
DELETE FROM chunk_files
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM files
|
|
WHERE files.id = chunk_files.file_id
|
|
)`)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned chunk_files: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted chunk_files associations", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedBlobs deletes blobs not in the current snapshot
|
|
func (sm *SnapshotManager) deleteOrphanedBlobs(ctx context.Context, tx *sql.Tx, currentSnapshotID string) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting blob records not referenced by current snapshot")
|
|
database.LogSQL("Execute", `DELETE FROM blobs WHERE NOT EXISTS (SELECT 1 FROM snapshot_blobs WHERE snapshot_blobs.blob_hash = blobs.blob_hash AND snapshot_blobs.snapshot_id = ?)`, currentSnapshotID)
|
|
result, err := tx.ExecContext(ctx, `
|
|
DELETE FROM blobs
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM snapshot_blobs
|
|
WHERE snapshot_blobs.blob_hash = blobs.blob_hash
|
|
AND snapshot_blobs.snapshot_id = ?
|
|
)`, currentSnapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned blobs: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted blob records from database", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedBlobToChunkMappings deletes blob_chunks entries for deleted blobs
|
|
func (sm *SnapshotManager) deleteOrphanedBlobToChunkMappings(ctx context.Context, tx *sql.Tx) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned blob_chunks associations")
|
|
database.LogSQL("Execute", `DELETE FROM blob_chunks WHERE NOT EXISTS (SELECT 1 FROM blobs WHERE blobs.id = blob_chunks.blob_id)`)
|
|
result, err := tx.ExecContext(ctx, `
|
|
DELETE FROM blob_chunks
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM blobs
|
|
WHERE blobs.id = blob_chunks.blob_id
|
|
)`)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned blob_chunks: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted blob_chunks associations", "count", rowsAffected)
|
|
return nil
|
|
}
|
|
|
|
// deleteOrphanedChunks deletes chunks not referenced by any file or blob
|
|
func (sm *SnapshotManager) deleteOrphanedChunks(ctx context.Context, tx *sql.Tx) error {
|
|
log.Debug("[Temp DB Cleanup] Deleting orphaned chunk records")
|
|
query := `
|
|
DELETE FROM chunks
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM file_chunks
|
|
WHERE file_chunks.chunk_hash = chunks.chunk_hash
|
|
)
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM blob_chunks
|
|
WHERE blob_chunks.chunk_hash = chunks.chunk_hash
|
|
)`
|
|
database.LogSQL("Execute", query)
|
|
result, err := tx.ExecContext(ctx, query)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting orphaned chunks: %w", err)
|
|
}
|
|
rowsAffected, _ := result.RowsAffected()
|
|
log.Debug("[Temp DB Cleanup] Deleted chunk records from database", "count", rowsAffected)
|
|
return nil
|
|
}
|