Refactor blob storage to use UUID primary keys and implement streaming chunking

- Changed blob table to use ID (UUID) as primary key instead of hash
- Blob records are now created at packing start, enabling immediate chunk associations
- Implemented streaming chunking to process large files without memory exhaustion
- Fixed blob manifest generation to include all referenced blobs
- Updated all foreign key references from blob_hash to blob_id
- Added progress reporting and improved error handling
- Enforced encryption requirement for all blob packing
- Updated tests to use test encryption keys
- Added Cyrillic transliteration to README
This commit is contained in:
2025-07-22 07:43:39 +02:00
parent 26db096913
commit 86b533d6ee
49 changed files with 5709 additions and 324 deletions

View File

@@ -0,0 +1,524 @@
package backup
import (
"context"
"crypto/sha256"
"database/sql"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"testing"
"testing/fstest"
"time"
"git.eeqj.de/sneak/vaultik/internal/database"
)
// MockS3Client is a mock implementation of S3 operations for testing
type MockS3Client struct {
storage map[string][]byte
}
func NewMockS3Client() *MockS3Client {
return &MockS3Client{
storage: make(map[string][]byte),
}
}
func (m *MockS3Client) PutBlob(ctx context.Context, hash string, data []byte) error {
m.storage[hash] = data
return nil
}
func (m *MockS3Client) GetBlob(ctx context.Context, hash string) ([]byte, error) {
data, ok := m.storage[hash]
if !ok {
return nil, fmt.Errorf("blob not found: %s", hash)
}
return data, nil
}
func (m *MockS3Client) BlobExists(ctx context.Context, hash string) (bool, error) {
_, ok := m.storage[hash]
return ok, nil
}
func (m *MockS3Client) CreateBucket(ctx context.Context, bucket string) error {
return nil
}
func TestBackupWithInMemoryFS(t *testing.T) {
// Create a temporary directory for the database
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "test.db")
// Create test filesystem
testFS := fstest.MapFS{
"file1.txt": &fstest.MapFile{
Data: []byte("Hello, World!"),
Mode: 0644,
ModTime: time.Now(),
},
"dir1/file2.txt": &fstest.MapFile{
Data: []byte("This is a test file with some content."),
Mode: 0755,
ModTime: time.Now(),
},
"dir1/subdir/file3.txt": &fstest.MapFile{
Data: []byte("Another file in a subdirectory."),
Mode: 0600,
ModTime: time.Now(),
},
"largefile.bin": &fstest.MapFile{
Data: generateLargeFileContent(10 * 1024 * 1024), // 10MB file with varied content
Mode: 0644,
ModTime: time.Now(),
},
}
// Initialize the database
ctx := context.Background()
db, err := database.New(ctx, dbPath)
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer func() {
if err := db.Close(); err != nil {
t.Logf("Failed to close database: %v", err)
}
}()
repos := database.NewRepositories(db)
// Create mock S3 client
s3Client := NewMockS3Client()
// Run backup
backupEngine := &BackupEngine{
repos: repos,
s3Client: s3Client,
}
snapshotID, err := backupEngine.Backup(ctx, testFS, ".")
if err != nil {
t.Fatalf("Backup failed: %v", err)
}
// Verify snapshot was created
snapshot, err := repos.Snapshots.GetByID(ctx, snapshotID)
if err != nil {
t.Fatalf("Failed to get snapshot: %v", err)
}
if snapshot == nil {
t.Fatal("Snapshot not found")
}
if snapshot.FileCount == 0 {
t.Error("Expected snapshot to have files")
}
// Verify files in database
files, err := repos.Files.ListByPrefix(ctx, "")
if err != nil {
t.Fatalf("Failed to list files: %v", err)
}
expectedFiles := map[string]bool{
"file1.txt": true,
"dir1/file2.txt": true,
"dir1/subdir/file3.txt": true,
"largefile.bin": true,
}
if len(files) != len(expectedFiles) {
t.Errorf("Expected %d files, got %d", len(expectedFiles), len(files))
}
for _, file := range files {
if !expectedFiles[file.Path] {
t.Errorf("Unexpected file in database: %s", file.Path)
}
delete(expectedFiles, file.Path)
// Verify file metadata
fsFile := testFS[file.Path]
if fsFile == nil {
t.Errorf("File %s not found in test filesystem", file.Path)
continue
}
if file.Size != int64(len(fsFile.Data)) {
t.Errorf("File %s: expected size %d, got %d", file.Path, len(fsFile.Data), file.Size)
}
if file.Mode != uint32(fsFile.Mode) {
t.Errorf("File %s: expected mode %o, got %o", file.Path, fsFile.Mode, file.Mode)
}
}
if len(expectedFiles) > 0 {
t.Errorf("Files not found in database: %v", expectedFiles)
}
// Verify chunks
chunks, err := repos.Chunks.List(ctx)
if err != nil {
t.Fatalf("Failed to list chunks: %v", err)
}
if len(chunks) == 0 {
t.Error("No chunks found in database")
}
// The large file should create 10 chunks (10MB / 1MB chunk size)
// Plus the small files
minExpectedChunks := 10 + 3
if len(chunks) < minExpectedChunks {
t.Errorf("Expected at least %d chunks, got %d", minExpectedChunks, len(chunks))
}
// Verify at least one blob was created and uploaded
// We can't list blobs directly, but we can check via snapshot blobs
blobHashes, err := repos.Snapshots.GetBlobHashes(ctx, snapshotID)
if err != nil {
t.Fatalf("Failed to get blob hashes: %v", err)
}
if len(blobHashes) == 0 {
t.Error("Expected at least one blob to be created")
}
for _, blobHash := range blobHashes {
// Check blob exists in mock S3
exists, err := s3Client.BlobExists(ctx, blobHash)
if err != nil {
t.Errorf("Failed to check blob %s: %v", blobHash, err)
}
if !exists {
t.Errorf("Blob %s not found in S3", blobHash)
}
}
}
func TestBackupDeduplication(t *testing.T) {
// Create a temporary directory for the database
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "test.db")
// Create test filesystem with duplicate content
testFS := fstest.MapFS{
"file1.txt": &fstest.MapFile{
Data: []byte("Duplicate content"),
Mode: 0644,
ModTime: time.Now(),
},
"file2.txt": &fstest.MapFile{
Data: []byte("Duplicate content"),
Mode: 0644,
ModTime: time.Now(),
},
"file3.txt": &fstest.MapFile{
Data: []byte("Unique content"),
Mode: 0644,
ModTime: time.Now(),
},
}
// Initialize the database
ctx := context.Background()
db, err := database.New(ctx, dbPath)
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer func() {
if err := db.Close(); err != nil {
t.Logf("Failed to close database: %v", err)
}
}()
repos := database.NewRepositories(db)
// Create mock S3 client
s3Client := NewMockS3Client()
// Run backup
backupEngine := &BackupEngine{
repos: repos,
s3Client: s3Client,
}
_, err = backupEngine.Backup(ctx, testFS, ".")
if err != nil {
t.Fatalf("Backup failed: %v", err)
}
// Verify deduplication
chunks, err := repos.Chunks.List(ctx)
if err != nil {
t.Fatalf("Failed to list chunks: %v", err)
}
// Should have only 2 unique chunks (duplicate content + unique content)
if len(chunks) != 2 {
t.Errorf("Expected 2 unique chunks, got %d", len(chunks))
}
// Verify chunk references
for _, chunk := range chunks {
files, err := repos.ChunkFiles.GetByChunkHash(ctx, chunk.ChunkHash)
if err != nil {
t.Errorf("Failed to get files for chunk %s: %v", chunk.ChunkHash, err)
}
// The duplicate content chunk should be referenced by 2 files
if chunk.Size == int64(len("Duplicate content")) && len(files) != 2 {
t.Errorf("Expected duplicate chunk to be referenced by 2 files, got %d", len(files))
}
}
}
// BackupEngine performs backup operations
type BackupEngine struct {
repos *database.Repositories
s3Client interface {
PutBlob(ctx context.Context, hash string, data []byte) error
BlobExists(ctx context.Context, hash string) (bool, error)
}
}
// Backup performs a backup of the given filesystem
func (b *BackupEngine) Backup(ctx context.Context, fsys fs.FS, root string) (string, error) {
// Create a new snapshot
hostname, _ := os.Hostname()
snapshotID := time.Now().Format(time.RFC3339)
snapshot := &database.Snapshot{
ID: snapshotID,
Hostname: hostname,
VaultikVersion: "test",
StartedAt: time.Now(),
CompletedAt: nil,
}
// Create initial snapshot record
err := b.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return b.repos.Snapshots.Create(ctx, tx, snapshot)
})
if err != nil {
return "", err
}
// Track counters
var fileCount, chunkCount, blobCount, totalSize, blobSize int64
// Track which chunks we've seen to handle deduplication
processedChunks := make(map[string]bool)
// Scan the filesystem and process files
err = fs.WalkDir(fsys, root, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
// Skip directories
if d.IsDir() {
return nil
}
// Get file info
info, err := d.Info()
if err != nil {
return err
}
// Handle symlinks
if info.Mode()&fs.ModeSymlink != 0 {
// For testing, we'll skip symlinks since fstest doesn't support them well
return nil
}
// Process this file in a transaction
err = b.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
// Create file record
file := &database.File{
Path: path,
Size: info.Size(),
Mode: uint32(info.Mode()),
MTime: info.ModTime(),
CTime: info.ModTime(), // Use mtime as ctime for test
UID: 1000, // Default UID for test
GID: 1000, // Default GID for test
}
if err := b.repos.Files.Create(ctx, tx, file); err != nil {
return err
}
fileCount++
totalSize += info.Size()
// Read and process file in chunks
f, err := fsys.Open(path)
if err != nil {
return err
}
defer func() {
if err := f.Close(); err != nil {
// Log but don't fail since we're already in an error path potentially
fmt.Fprintf(os.Stderr, "Failed to close file: %v\n", err)
}
}()
// Process file in chunks
chunkIndex := 0
buffer := make([]byte, defaultChunkSize)
for {
n, err := f.Read(buffer)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
break
}
chunkData := buffer[:n]
chunkHash := calculateHash(chunkData)
// Check if chunk already exists
existingChunk, _ := b.repos.Chunks.GetByHash(ctx, chunkHash)
if existingChunk == nil {
// Create new chunk
chunk := &database.Chunk{
ChunkHash: chunkHash,
SHA256: chunkHash,
Size: int64(n),
}
if err := b.repos.Chunks.Create(ctx, tx, chunk); err != nil {
return err
}
processedChunks[chunkHash] = true
}
// Create file-chunk mapping
fileChunk := &database.FileChunk{
Path: path,
Idx: chunkIndex,
ChunkHash: chunkHash,
}
if err := b.repos.FileChunks.Create(ctx, tx, fileChunk); err != nil {
return err
}
// Create chunk-file mapping
chunkFile := &database.ChunkFile{
ChunkHash: chunkHash,
FilePath: path,
FileOffset: int64(chunkIndex * defaultChunkSize),
Length: int64(n),
}
if err := b.repos.ChunkFiles.Create(ctx, tx, chunkFile); err != nil {
return err
}
chunkIndex++
}
return nil
})
return err
})
if err != nil {
return "", err
}
// After all files are processed, create blobs for new chunks
err = b.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
for chunkHash := range processedChunks {
// Get chunk data
chunk, err := b.repos.Chunks.GetByHash(ctx, chunkHash)
if err != nil {
return err
}
chunkCount++
// In a real system, blobs would contain multiple chunks and be encrypted
// For testing, we'll create a blob with a "blob-" prefix to differentiate
blobHash := "blob-" + chunkHash
// For the test, we'll create dummy data since we don't have the original
dummyData := []byte(chunkHash)
// Upload to S3 as a blob
if err := b.s3Client.PutBlob(ctx, blobHash, dummyData); err != nil {
return err
}
// Create blob entry
blob := &database.Blob{
ID: "test-blob-" + blobHash[:8],
Hash: blobHash,
CreatedTS: time.Now(),
}
if err := b.repos.Blobs.Create(ctx, tx, blob); err != nil {
return err
}
blobCount++
blobSize += chunk.Size
// Create blob-chunk mapping
blobChunk := &database.BlobChunk{
BlobID: blob.ID,
ChunkHash: chunkHash,
Offset: 0,
Length: chunk.Size,
}
if err := b.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil {
return err
}
// Add blob to snapshot
if err := b.repos.Snapshots.AddBlob(ctx, tx, snapshotID, blob.ID, blob.Hash); err != nil {
return err
}
}
return nil
})
if err != nil {
return "", err
}
// Update snapshot with final counts
err = b.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return b.repos.Snapshots.UpdateCounts(ctx, tx, snapshotID, fileCount, chunkCount, blobCount, totalSize, blobSize)
})
if err != nil {
return "", err
}
return snapshotID, nil
}
func calculateHash(data []byte) string {
h := sha256.New()
h.Write(data)
return fmt.Sprintf("%x", h.Sum(nil))
}
func generateLargeFileContent(size int) []byte {
data := make([]byte, size)
// Fill with pattern that changes every chunk to avoid deduplication
for i := 0; i < size; i++ {
chunkNum := i / defaultChunkSize
data[i] = byte((i + chunkNum) % 256)
}
return data
}
const defaultChunkSize = 1024 * 1024 // 1MB chunks

View File

@@ -1,6 +1,39 @@
package backup
import "go.uber.org/fx"
import (
"git.eeqj.de/sneak/vaultik/internal/config"
"git.eeqj.de/sneak/vaultik/internal/database"
"git.eeqj.de/sneak/vaultik/internal/s3"
"github.com/spf13/afero"
"go.uber.org/fx"
)
// ScannerParams holds parameters for scanner creation
type ScannerParams struct {
EnableProgress bool
}
// Module exports backup functionality
var Module = fx.Module("backup")
var Module = fx.Module("backup",
fx.Provide(
provideScannerFactory,
),
)
// ScannerFactory creates scanners with custom parameters
type ScannerFactory func(params ScannerParams) *Scanner
func provideScannerFactory(cfg *config.Config, repos *database.Repositories, s3Client *s3.Client) ScannerFactory {
return func(params ScannerParams) *Scanner {
return NewScanner(ScannerConfig{
FS: afero.NewOsFs(),
ChunkSize: cfg.ChunkSize.Int64(),
Repositories: repos,
S3Client: s3Client,
MaxBlobSize: cfg.BlobSizeLimit.Int64(),
CompressionLevel: cfg.CompressionLevel,
AgeRecipients: cfg.AgeRecipients,
EnableProgress: params.EnableProgress,
})
}
}

389
internal/backup/progress.go Normal file
View File

@@ -0,0 +1,389 @@
package backup
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"git.eeqj.de/sneak/vaultik/internal/log"
"github.com/dustin/go-humanize"
)
const (
// Progress reporting intervals
SummaryInterval = 10 * time.Second // One-line status updates
DetailInterval = 60 * time.Second // Multi-line detailed status
)
// ProgressStats holds atomic counters for progress tracking
type ProgressStats struct {
FilesScanned atomic.Int64 // Total files seen during scan (includes skipped)
FilesProcessed atomic.Int64 // Files actually processed in phase 2
FilesSkipped atomic.Int64 // Files skipped due to no changes
BytesScanned atomic.Int64 // Bytes from new/changed files only
BytesSkipped atomic.Int64 // Bytes from unchanged files
BytesProcessed atomic.Int64 // Actual bytes processed (for ETA calculation)
ChunksCreated atomic.Int64
BlobsCreated atomic.Int64
BlobsUploaded atomic.Int64
BytesUploaded atomic.Int64
CurrentFile atomic.Value // stores string
TotalSize atomic.Int64 // Total size to process (set after scan phase)
TotalFiles atomic.Int64 // Total files to process in phase 2
ProcessStartTime atomic.Value // stores time.Time when processing starts
StartTime time.Time
mu sync.RWMutex
lastDetailTime time.Time
// Upload tracking
CurrentUpload atomic.Value // stores *UploadInfo
lastChunkingTime time.Time // Track when we last showed chunking progress
}
// UploadInfo tracks current upload progress
type UploadInfo struct {
BlobHash string
Size int64
StartTime time.Time
}
// ProgressReporter handles periodic progress reporting
type ProgressReporter struct {
stats *ProgressStats
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
detailTicker *time.Ticker
summaryTicker *time.Ticker
sigChan chan os.Signal
}
// NewProgressReporter creates a new progress reporter
func NewProgressReporter() *ProgressReporter {
stats := &ProgressStats{
StartTime: time.Now(),
lastDetailTime: time.Now(),
}
stats.CurrentFile.Store("")
ctx, cancel := context.WithCancel(context.Background())
pr := &ProgressReporter{
stats: stats,
ctx: ctx,
cancel: cancel,
summaryTicker: time.NewTicker(SummaryInterval),
detailTicker: time.NewTicker(DetailInterval),
sigChan: make(chan os.Signal, 1),
}
// Register for SIGUSR1
signal.Notify(pr.sigChan, syscall.SIGUSR1)
return pr
}
// Start begins the progress reporting
func (pr *ProgressReporter) Start() {
pr.wg.Add(1)
go pr.run()
// Print initial multi-line status
pr.printDetailedStatus()
}
// Stop stops the progress reporting
func (pr *ProgressReporter) Stop() {
pr.cancel()
pr.summaryTicker.Stop()
pr.detailTicker.Stop()
signal.Stop(pr.sigChan)
close(pr.sigChan)
pr.wg.Wait()
}
// GetStats returns the progress stats for updating
func (pr *ProgressReporter) GetStats() *ProgressStats {
return pr.stats
}
// SetTotalSize sets the total size to process (after scan phase)
func (pr *ProgressReporter) SetTotalSize(size int64) {
pr.stats.TotalSize.Store(size)
pr.stats.ProcessStartTime.Store(time.Now())
}
// run is the main progress reporting loop
func (pr *ProgressReporter) run() {
defer pr.wg.Done()
for {
select {
case <-pr.ctx.Done():
return
case <-pr.summaryTicker.C:
pr.printSummaryStatus()
case <-pr.detailTicker.C:
pr.printDetailedStatus()
case <-pr.sigChan:
// SIGUSR1 received, print detailed status
log.Info("SIGUSR1 received, printing detailed status")
pr.printDetailedStatus()
}
}
}
// printSummaryStatus prints a one-line status update
func (pr *ProgressReporter) printSummaryStatus() {
// Check if we're currently uploading
if uploadInfo, ok := pr.stats.CurrentUpload.Load().(*UploadInfo); ok && uploadInfo != nil {
// Show upload progress instead
pr.printUploadProgress(uploadInfo)
return
}
// Only show chunking progress if we've done chunking recently
pr.stats.mu.RLock()
timeSinceLastChunk := time.Since(pr.stats.lastChunkingTime)
pr.stats.mu.RUnlock()
if timeSinceLastChunk > SummaryInterval*2 {
// No recent chunking activity, don't show progress
return
}
elapsed := time.Since(pr.stats.StartTime)
bytesScanned := pr.stats.BytesScanned.Load()
bytesSkipped := pr.stats.BytesSkipped.Load()
bytesProcessed := pr.stats.BytesProcessed.Load()
totalSize := pr.stats.TotalSize.Load()
currentFile := pr.stats.CurrentFile.Load().(string)
// Calculate ETA if we have total size and are processing
etaStr := ""
if totalSize > 0 && bytesProcessed > 0 {
processStart, ok := pr.stats.ProcessStartTime.Load().(time.Time)
if ok && !processStart.IsZero() {
processElapsed := time.Since(processStart)
rate := float64(bytesProcessed) / processElapsed.Seconds()
if rate > 0 {
remainingBytes := totalSize - bytesProcessed
remainingSeconds := float64(remainingBytes) / rate
eta := time.Duration(remainingSeconds * float64(time.Second))
etaStr = fmt.Sprintf(" | ETA: %s", formatDuration(eta))
}
}
}
rate := float64(bytesScanned+bytesSkipped) / elapsed.Seconds()
// Show files processed / total files to process
filesProcessed := pr.stats.FilesProcessed.Load()
totalFiles := pr.stats.TotalFiles.Load()
status := fmt.Sprintf("Progress: %d/%d files, %s/%s (%.1f%%), %s/s%s",
filesProcessed,
totalFiles,
humanize.Bytes(uint64(bytesProcessed)),
humanize.Bytes(uint64(totalSize)),
float64(bytesProcessed)/float64(totalSize)*100,
humanize.Bytes(uint64(rate)),
etaStr,
)
if currentFile != "" {
status += fmt.Sprintf(" | Current: %s", truncatePath(currentFile, 40))
}
log.Info(status)
}
// printDetailedStatus prints a multi-line detailed status
func (pr *ProgressReporter) printDetailedStatus() {
pr.stats.mu.Lock()
pr.stats.lastDetailTime = time.Now()
pr.stats.mu.Unlock()
elapsed := time.Since(pr.stats.StartTime)
filesScanned := pr.stats.FilesScanned.Load()
filesSkipped := pr.stats.FilesSkipped.Load()
bytesScanned := pr.stats.BytesScanned.Load()
bytesSkipped := pr.stats.BytesSkipped.Load()
bytesProcessed := pr.stats.BytesProcessed.Load()
totalSize := pr.stats.TotalSize.Load()
chunksCreated := pr.stats.ChunksCreated.Load()
blobsCreated := pr.stats.BlobsCreated.Load()
blobsUploaded := pr.stats.BlobsUploaded.Load()
bytesUploaded := pr.stats.BytesUploaded.Load()
currentFile := pr.stats.CurrentFile.Load().(string)
totalBytes := bytesScanned + bytesSkipped
rate := float64(totalBytes) / elapsed.Seconds()
log.Notice("=== Backup Progress Report ===")
log.Info("Elapsed time", "duration", formatDuration(elapsed))
// Calculate and show ETA if we have data
if totalSize > 0 && bytesProcessed > 0 {
processStart, ok := pr.stats.ProcessStartTime.Load().(time.Time)
if ok && !processStart.IsZero() {
processElapsed := time.Since(processStart)
processRate := float64(bytesProcessed) / processElapsed.Seconds()
if processRate > 0 {
remainingBytes := totalSize - bytesProcessed
remainingSeconds := float64(remainingBytes) / processRate
eta := time.Duration(remainingSeconds * float64(time.Second))
percentComplete := float64(bytesProcessed) / float64(totalSize) * 100
log.Info("Overall progress",
"percent", fmt.Sprintf("%.1f%%", percentComplete),
"processed", humanize.Bytes(uint64(bytesProcessed)),
"total", humanize.Bytes(uint64(totalSize)),
"rate", humanize.Bytes(uint64(processRate))+"/s",
"eta", formatDuration(eta))
}
}
}
log.Info("Files processed",
"scanned", filesScanned,
"skipped", filesSkipped,
"total", filesScanned,
"skip_rate", formatPercent(filesSkipped, filesScanned))
log.Info("Data scanned",
"new", humanize.Bytes(uint64(bytesScanned)),
"skipped", humanize.Bytes(uint64(bytesSkipped)),
"total", humanize.Bytes(uint64(totalBytes)),
"scan_rate", humanize.Bytes(uint64(rate))+"/s")
log.Info("Chunks created", "count", chunksCreated)
log.Info("Blobs status",
"created", blobsCreated,
"uploaded", blobsUploaded,
"pending", blobsCreated-blobsUploaded)
log.Info("Upload progress",
"uploaded", humanize.Bytes(uint64(bytesUploaded)),
"compression_ratio", formatRatio(bytesUploaded, bytesScanned))
if currentFile != "" {
log.Info("Current file", "path", currentFile)
}
log.Notice("=============================")
}
// Helper functions
func formatDuration(d time.Duration) string {
if d < 0 {
return "unknown"
}
if d < time.Minute {
return fmt.Sprintf("%ds", int(d.Seconds()))
}
if d < time.Hour {
return fmt.Sprintf("%dm%ds", int(d.Minutes()), int(d.Seconds())%60)
}
return fmt.Sprintf("%dh%dm", int(d.Hours()), int(d.Minutes())%60)
}
func formatPercent(numerator, denominator int64) string {
if denominator == 0 {
return "0.0%"
}
return fmt.Sprintf("%.1f%%", float64(numerator)/float64(denominator)*100)
}
func formatRatio(compressed, uncompressed int64) string {
if uncompressed == 0 {
return "1.00"
}
ratio := float64(compressed) / float64(uncompressed)
return fmt.Sprintf("%.2f", ratio)
}
func truncatePath(path string, maxLen int) string {
if len(path) <= maxLen {
return path
}
// Keep the last maxLen-3 characters and prepend "..."
return "..." + path[len(path)-(maxLen-3):]
}
// printUploadProgress prints upload progress
func (pr *ProgressReporter) printUploadProgress(info *UploadInfo) {
elapsed := time.Since(info.StartTime)
if elapsed < time.Millisecond {
elapsed = time.Millisecond // Avoid division by zero
}
bytesPerSec := float64(info.Size) / elapsed.Seconds()
bitsPerSec := bytesPerSec * 8
// Format speed in bits/second
var speedStr string
if bitsPerSec >= 1e9 {
speedStr = fmt.Sprintf("%.1fGbit/sec", bitsPerSec/1e9)
} else if bitsPerSec >= 1e6 {
speedStr = fmt.Sprintf("%.0fMbit/sec", bitsPerSec/1e6)
} else if bitsPerSec >= 1e3 {
speedStr = fmt.Sprintf("%.0fKbit/sec", bitsPerSec/1e3)
} else {
speedStr = fmt.Sprintf("%.0fbit/sec", bitsPerSec)
}
log.Info("Uploading blob",
"hash", info.BlobHash[:8]+"...",
"size", humanize.Bytes(uint64(info.Size)),
"elapsed", formatDuration(elapsed),
"speed", speedStr)
}
// ReportUploadStart marks the beginning of a blob upload
func (pr *ProgressReporter) ReportUploadStart(blobHash string, size int64) {
info := &UploadInfo{
BlobHash: blobHash,
Size: size,
StartTime: time.Now(),
}
pr.stats.CurrentUpload.Store(info)
}
// ReportUploadComplete marks the completion of a blob upload
func (pr *ProgressReporter) ReportUploadComplete(blobHash string, size int64, duration time.Duration) {
// Clear current upload
pr.stats.CurrentUpload.Store((*UploadInfo)(nil))
// Calculate speed
if duration < time.Millisecond {
duration = time.Millisecond
}
bytesPerSec := float64(size) / duration.Seconds()
bitsPerSec := bytesPerSec * 8
// Format speed
var speedStr string
if bitsPerSec >= 1e9 {
speedStr = fmt.Sprintf("%.1fGbit/sec", bitsPerSec/1e9)
} else if bitsPerSec >= 1e6 {
speedStr = fmt.Sprintf("%.0fMbit/sec", bitsPerSec/1e6)
} else if bitsPerSec >= 1e3 {
speedStr = fmt.Sprintf("%.0fKbit/sec", bitsPerSec/1e3)
} else {
speedStr = fmt.Sprintf("%.0fbit/sec", bitsPerSec)
}
log.Info("Blob uploaded",
"hash", blobHash[:8]+"...",
"size", humanize.Bytes(uint64(size)),
"duration", formatDuration(duration),
"speed", speedStr)
}
// UpdateChunkingActivity updates the last chunking time
func (pr *ProgressReporter) UpdateChunkingActivity() {
pr.stats.mu.Lock()
pr.stats.lastChunkingTime = time.Now()
pr.stats.mu.Unlock()
}

View File

@@ -2,71 +2,197 @@ package backup
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"fmt"
"io"
"os"
"strings"
"sync"
"time"
"git.eeqj.de/sneak/vaultik/internal/blob"
"git.eeqj.de/sneak/vaultik/internal/chunker"
"git.eeqj.de/sneak/vaultik/internal/crypto"
"git.eeqj.de/sneak/vaultik/internal/database"
"git.eeqj.de/sneak/vaultik/internal/log"
"github.com/dustin/go-humanize"
"github.com/spf13/afero"
)
// FileToProcess holds information about a file that needs processing
type FileToProcess struct {
Path string
FileInfo os.FileInfo
File *database.File
}
// Scanner scans directories and populates the database with file and chunk information
type Scanner struct {
fs afero.Fs
chunkSize int
repos *database.Repositories
fs afero.Fs
chunker *chunker.Chunker
packer *blob.Packer
repos *database.Repositories
s3Client S3Client
maxBlobSize int64
compressionLevel int
ageRecipient string
snapshotID string // Current snapshot being processed
progress *ProgressReporter
// Mutex for coordinating blob creation
packerMu sync.Mutex // Blocks chunk production during blob creation
// Context for cancellation
scanCtx context.Context
}
// S3Client interface for blob storage operations
type S3Client interface {
PutObject(ctx context.Context, key string, data io.Reader) error
}
// ScannerConfig contains configuration for the scanner
type ScannerConfig struct {
FS afero.Fs
ChunkSize int
Repositories *database.Repositories
FS afero.Fs
ChunkSize int64
Repositories *database.Repositories
S3Client S3Client
MaxBlobSize int64
CompressionLevel int
AgeRecipients []string // Optional, empty means no encryption
EnableProgress bool // Enable progress reporting
}
// ScanResult contains the results of a scan operation
type ScanResult struct {
FilesScanned int
BytesScanned int64
StartTime time.Time
EndTime time.Time
FilesScanned int
FilesSkipped int
BytesScanned int64
BytesSkipped int64
ChunksCreated int
BlobsCreated int
StartTime time.Time
EndTime time.Time
}
// NewScanner creates a new scanner instance
func NewScanner(cfg ScannerConfig) *Scanner {
// Create encryptor (required for blob packing)
if len(cfg.AgeRecipients) == 0 {
log.Error("No age recipients configured - encryption is required")
return nil
}
enc, err := crypto.NewEncryptor(cfg.AgeRecipients)
if err != nil {
log.Error("Failed to create encryptor", "error", err)
return nil
}
// Create blob packer with encryption
packerCfg := blob.PackerConfig{
MaxBlobSize: cfg.MaxBlobSize,
CompressionLevel: cfg.CompressionLevel,
Encryptor: enc,
Repositories: cfg.Repositories,
}
packer, err := blob.NewPacker(packerCfg)
if err != nil {
log.Error("Failed to create packer", "error", err)
return nil
}
var progress *ProgressReporter
if cfg.EnableProgress {
progress = NewProgressReporter()
}
return &Scanner{
fs: cfg.FS,
chunkSize: cfg.ChunkSize,
repos: cfg.Repositories,
fs: cfg.FS,
chunker: chunker.NewChunker(cfg.ChunkSize),
packer: packer,
repos: cfg.Repositories,
s3Client: cfg.S3Client,
maxBlobSize: cfg.MaxBlobSize,
compressionLevel: cfg.CompressionLevel,
ageRecipient: strings.Join(cfg.AgeRecipients, ","),
progress: progress,
}
}
// Scan scans a directory and populates the database
func (s *Scanner) Scan(ctx context.Context, path string) (*ScanResult, error) {
func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*ScanResult, error) {
s.snapshotID = snapshotID
s.scanCtx = ctx
result := &ScanResult{
StartTime: time.Now(),
}
// Start a transaction
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return s.scanDirectory(ctx, tx, path, result)
})
if err != nil {
return nil, fmt.Errorf("scan failed: %w", err)
// Set blob handler for concurrent upload
if s.s3Client != nil {
log.Debug("Setting blob handler for S3 uploads")
s.packer.SetBlobHandler(s.handleBlobReady)
} else {
log.Debug("No S3 client configured, blobs will not be uploaded")
}
// Start progress reporting if enabled
if s.progress != nil {
s.progress.Start()
defer s.progress.Stop()
}
// Phase 1: Scan directory and collect files to process
log.Info("Phase 1: Scanning directory structure")
filesToProcess, err := s.scanPhase(ctx, path, result)
if err != nil {
return nil, fmt.Errorf("scan phase failed: %w", err)
}
// Calculate total size to process
var totalSizeToProcess int64
for _, file := range filesToProcess {
totalSizeToProcess += file.FileInfo.Size()
}
// Update progress with total size and file count
if s.progress != nil {
s.progress.SetTotalSize(totalSizeToProcess)
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
}
log.Info("Phase 1 complete",
"total_files", len(filesToProcess),
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
"files_skipped", result.FilesSkipped,
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
// Phase 2: Process files and create chunks
if len(filesToProcess) > 0 {
log.Info("Phase 2: Processing files and creating chunks")
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
return nil, fmt.Errorf("process phase failed: %w", err)
}
}
// Get final stats from packer
blobs := s.packer.GetFinishedBlobs()
result.BlobsCreated += len(blobs)
result.EndTime = time.Now()
return result, nil
}
func (s *Scanner) scanDirectory(ctx context.Context, tx *sql.Tx, path string, result *ScanResult) error {
return afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error {
// scanPhase performs the initial directory scan to identify files to process
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult) ([]*FileToProcess, error) {
var filesToProcess []*FileToProcess
var mu sync.Mutex
log.Debug("Starting directory walk", "path", path)
err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error {
log.Debug("Walking file", "path", path)
if err != nil {
log.Debug("Error walking path", "path", path, "error", err)
return err
}
@@ -77,21 +203,108 @@ func (s *Scanner) scanDirectory(ctx context.Context, tx *sql.Tx, path string, re
default:
}
// Skip directories
if info.IsDir() {
return nil
// Check file and update metadata
file, needsProcessing, err := s.checkFileAndUpdateMetadata(ctx, path, info, result)
if err != nil {
// Don't log context cancellation as an error
if err == context.Canceled {
return err
}
return fmt.Errorf("failed to check %s: %w", path, err)
}
// Process the file
if err := s.processFile(ctx, tx, path, info, result); err != nil {
return fmt.Errorf("failed to process %s: %w", path, err)
// If file needs processing, add to list
if needsProcessing && info.Mode().IsRegular() && info.Size() > 0 {
mu.Lock()
filesToProcess = append(filesToProcess, &FileToProcess{
Path: path,
FileInfo: info,
File: file,
})
mu.Unlock()
}
return nil
})
if err != nil {
return nil, err
}
return filesToProcess, nil
}
func (s *Scanner) processFile(ctx context.Context, tx *sql.Tx, path string, info os.FileInfo, result *ScanResult) error {
// processPhase processes the files that need backing up
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
// Process each file
for _, fileToProcess := range filesToProcess {
// Update progress
if s.progress != nil {
s.progress.GetStats().CurrentFile.Store(fileToProcess.Path)
}
// Process file in streaming fashion
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
}
// Update files processed counter
if s.progress != nil {
s.progress.GetStats().FilesProcessed.Add(1)
}
}
// Final flush (outside any transaction)
s.packerMu.Lock()
if err := s.packer.Flush(); err != nil {
s.packerMu.Unlock()
return fmt.Errorf("flushing packer: %w", err)
}
s.packerMu.Unlock()
// If no S3 client, store any remaining blobs
if s.s3Client == nil {
blobs := s.packer.GetFinishedBlobs()
for _, b := range blobs {
// Blob metadata is already stored incrementally during packing
// Just add the blob to the snapshot
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, b.ID, b.Hash)
})
if err != nil {
return fmt.Errorf("storing blob metadata: %w", err)
}
}
result.BlobsCreated += len(blobs)
}
return nil
}
// checkFileAndUpdateMetadata checks if a file needs processing and updates metadata
func (s *Scanner) checkFileAndUpdateMetadata(ctx context.Context, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) {
// Check context cancellation
select {
case <-ctx.Done():
return nil, false, ctx.Err()
default:
}
var file *database.File
var needsProcessing bool
// Use a short transaction just for the database operations
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
var err error
file, needsProcessing, err = s.checkFile(txCtx, tx, path, info, result)
return err
})
return file, needsProcessing, err
}
// checkFile checks if a file needs processing and updates metadata within a transaction
func (s *Scanner) checkFile(ctx context.Context, tx *sql.Tx, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) {
// Get file stats
stat, ok := info.Sys().(interface {
Uid() uint32
@@ -125,92 +338,378 @@ func (s *Scanner) processFile(ctx context.Context, tx *sql.Tx, path string, info
LinkTarget: linkTarget,
}
// Insert file
if err := s.repos.Files.Create(ctx, tx, file); err != nil {
return err
// Check if file has changed since last backup
log.Debug("Checking if file exists in database", "path", path)
existingFile, err := s.repos.Files.GetByPathTx(ctx, tx, path)
if err != nil {
return nil, false, fmt.Errorf("checking existing file: %w", err)
}
fileChanged := existingFile == nil || s.hasFileChanged(existingFile, file)
// Always update file metadata
log.Debug("Updating file metadata", "path", path, "changed", fileChanged)
if err := s.repos.Files.Create(ctx, tx, file); err != nil {
return nil, false, err
}
log.Debug("File metadata updated", "path", path)
// Add file to snapshot
log.Debug("Adding file to snapshot", "path", path, "snapshot", s.snapshotID)
if err := s.repos.Snapshots.AddFile(ctx, tx, s.snapshotID, path); err != nil {
return nil, false, fmt.Errorf("adding file to snapshot: %w", err)
}
log.Debug("File added to snapshot", "path", path)
result.FilesScanned++
result.BytesScanned += info.Size()
// Process chunks only for regular files
if info.Mode().IsRegular() && info.Size() > 0 {
if err := s.processFileChunks(ctx, tx, path, result); err != nil {
return err
// Update progress
if s.progress != nil {
stats := s.progress.GetStats()
stats.FilesScanned.Add(1)
stats.CurrentFile.Store(path)
}
// Track skipped files
if info.Mode().IsRegular() && info.Size() > 0 && !fileChanged {
result.FilesSkipped++
result.BytesSkipped += info.Size()
if s.progress != nil {
stats := s.progress.GetStats()
stats.FilesSkipped.Add(1)
stats.BytesSkipped.Add(info.Size())
}
// File hasn't changed, but we still need to associate existing chunks with this snapshot
log.Debug("File hasn't changed, associating existing chunks", "path", path)
if err := s.associateExistingChunks(ctx, tx, path); err != nil {
return nil, false, fmt.Errorf("associating existing chunks: %w", err)
}
log.Debug("Existing chunks associated", "path", path)
} else {
// File changed or is not a regular file
result.BytesScanned += info.Size()
if s.progress != nil {
s.progress.GetStats().BytesScanned.Add(info.Size())
}
}
return nil
return file, fileChanged, nil
}
func (s *Scanner) processFileChunks(ctx context.Context, tx *sql.Tx, path string, result *ScanResult) error {
file, err := s.fs.Open(path)
// hasFileChanged determines if a file has changed since last backup
func (s *Scanner) hasFileChanged(existingFile, newFile *database.File) bool {
// Check if any metadata has changed
if existingFile.Size != newFile.Size {
return true
}
if existingFile.MTime.Unix() != newFile.MTime.Unix() {
return true
}
if existingFile.Mode != newFile.Mode {
return true
}
if existingFile.UID != newFile.UID {
return true
}
if existingFile.GID != newFile.GID {
return true
}
if existingFile.LinkTarget != newFile.LinkTarget {
return true
}
return false
}
// associateExistingChunks links existing chunks from an unchanged file to the current snapshot
func (s *Scanner) associateExistingChunks(ctx context.Context, tx *sql.Tx, path string) error {
log.Debug("associateExistingChunks start", "path", path)
// Get existing file chunks
log.Debug("Getting existing file chunks", "path", path)
fileChunks, err := s.repos.FileChunks.GetByFileTx(ctx, tx, path)
if err != nil {
return err
return fmt.Errorf("getting existing file chunks: %w", err)
}
defer func() {
if err := file.Close(); err != nil {
database.Fatal("failed to close file %s: %v", path, err)
log.Debug("Got file chunks", "path", path, "count", len(fileChunks))
// For each chunk, find its blob and associate with current snapshot
processedBlobs := make(map[string]bool)
for i, fc := range fileChunks {
log.Debug("Processing chunk", "path", path, "chunk_index", i, "chunk_hash", fc.ChunkHash)
// Find which blob contains this chunk
log.Debug("Finding blob for chunk", "chunk_hash", fc.ChunkHash)
blobChunk, err := s.repos.BlobChunks.GetByChunkHashTx(ctx, tx, fc.ChunkHash)
if err != nil {
return fmt.Errorf("finding blob for chunk %s: %w", fc.ChunkHash, err)
}
}()
if blobChunk == nil {
log.Warn("Chunk exists but not in any blob", "chunk", fc.ChunkHash, "file", path)
continue
}
log.Debug("Found blob for chunk", "chunk_hash", fc.ChunkHash, "blob_id", blobChunk.BlobID)
sequence := 0
buffer := make([]byte, s.chunkSize)
for {
n, err := io.ReadFull(file, buffer)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return err
// Get blob to find its hash
blob, err := s.repos.Blobs.GetByID(ctx, blobChunk.BlobID)
if err != nil {
return fmt.Errorf("getting blob %s: %w", blobChunk.BlobID, err)
}
if blob == nil {
log.Warn("Blob record not found", "blob_id", blobChunk.BlobID)
continue
}
if n == 0 {
break
}
// Calculate chunk hash
h := sha256.New()
h.Write(buffer[:n])
hash := hex.EncodeToString(h.Sum(nil))
// Create chunk if it doesn't exist
chunk := &database.Chunk{
ChunkHash: hash,
SHA256: hash, // Using same hash for now
Size: int64(n),
}
// Try to insert chunk (ignore duplicate errors)
_ = s.repos.Chunks.Create(ctx, tx, chunk)
// Create file-chunk mapping
fileChunk := &database.FileChunk{
Path: path,
ChunkHash: hash,
Idx: sequence,
}
if err := s.repos.FileChunks.Create(ctx, tx, fileChunk); err != nil {
return err
}
// Create chunk-file mapping
chunkFile := &database.ChunkFile{
ChunkHash: hash,
FilePath: path,
FileOffset: int64(sequence * s.chunkSize),
Length: int64(n),
}
if err := s.repos.ChunkFiles.Create(ctx, tx, chunkFile); err != nil {
return err
}
sequence++
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
// Add blob to snapshot if not already processed
if !processedBlobs[blobChunk.BlobID] {
log.Debug("Adding blob to snapshot", "blob_id", blobChunk.BlobID, "blob_hash", blob.Hash, "snapshot", s.snapshotID)
if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobChunk.BlobID, blob.Hash); err != nil {
return fmt.Errorf("adding existing blob to snapshot: %w", err)
}
log.Debug("Added blob to snapshot", "blob_id", blobChunk.BlobID)
processedBlobs[blobChunk.BlobID] = true
}
}
log.Debug("associateExistingChunks complete", "path", path, "blobs_processed", len(processedBlobs))
return nil
}
// handleBlobReady is called by the packer when a blob is finalized
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
log.Debug("Blob handler called", "blob_hash", blobWithReader.Hash[:8]+"...")
startTime := time.Now()
finishedBlob := blobWithReader.FinishedBlob
// Report upload start
if s.progress != nil {
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
}
// Upload to S3 first (without holding any locks)
// Use scan context for cancellation support
ctx := s.scanCtx
if ctx == nil {
ctx = context.Background()
}
if err := s.s3Client.PutObject(ctx, "blobs/"+finishedBlob.Hash, blobWithReader.Reader); err != nil {
return fmt.Errorf("uploading blob %s to S3: %w", finishedBlob.Hash, err)
}
uploadDuration := time.Since(startTime)
// Report upload complete
if s.progress != nil {
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
}
// Update progress
if s.progress != nil {
stats := s.progress.GetStats()
stats.BlobsUploaded.Add(1)
stats.BytesUploaded.Add(finishedBlob.Compressed)
stats.BlobsCreated.Add(1)
}
// Store metadata in database (after upload is complete)
dbCtx := s.scanCtx
if dbCtx == nil {
dbCtx = context.Background()
}
err := s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error {
// Update blob upload timestamp
if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil {
return fmt.Errorf("updating blob upload timestamp: %w", err)
}
// Add the blob to the snapshot
if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlob.ID, finishedBlob.Hash); err != nil {
return fmt.Errorf("adding blob to snapshot: %w", err)
}
// Record upload metrics
upload := &database.Upload{
BlobHash: finishedBlob.Hash,
UploadedAt: startTime,
Size: finishedBlob.Compressed,
DurationMs: uploadDuration.Milliseconds(),
}
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
return fmt.Errorf("recording upload metrics: %w", err)
}
return nil
})
// Cleanup temp file if needed
if blobWithReader.TempFile != nil {
tempName := blobWithReader.TempFile.Name()
if err := blobWithReader.TempFile.Close(); err != nil {
log.Fatal("Failed to close temp file", "file", tempName, "error", err)
}
if err := os.Remove(tempName); err != nil {
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
}
}
return err
}
// processFileStreaming processes a file by streaming chunks directly to the packer
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
// Open the file
file, err := s.fs.Open(fileToProcess.Path)
if err != nil {
return fmt.Errorf("opening file: %w", err)
}
defer func() { _ = file.Close() }()
// We'll collect file chunks for database storage
// but process them for packing as we go
type chunkInfo struct {
fileChunk database.FileChunk
offset int64
size int64
}
var chunks []chunkInfo
chunkIndex := 0
// Process chunks in streaming fashion
err = s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
log.Debug("Processing chunk",
"file", fileToProcess.Path,
"chunk", chunkIndex,
"hash", chunk.Hash,
"size", chunk.Size)
// Check if chunk already exists
chunkExists := false
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
existing, err := s.repos.Chunks.GetByHash(txCtx, chunk.Hash)
if err != nil {
return err
}
chunkExists = (existing != nil)
// Store chunk if new
if !chunkExists {
dbChunk := &database.Chunk{
ChunkHash: chunk.Hash,
SHA256: chunk.Hash,
Size: chunk.Size,
}
if err := s.repos.Chunks.Create(txCtx, tx, dbChunk); err != nil {
return fmt.Errorf("creating chunk: %w", err)
}
}
return nil
})
if err != nil {
return fmt.Errorf("checking/storing chunk: %w", err)
}
// Track file chunk association for later storage
chunks = append(chunks, chunkInfo{
fileChunk: database.FileChunk{
Path: fileToProcess.Path,
Idx: chunkIndex,
ChunkHash: chunk.Hash,
},
offset: chunk.Offset,
size: chunk.Size,
})
// Update stats
if chunkExists {
result.FilesSkipped++ // Track as skipped for now
result.BytesSkipped += chunk.Size
if s.progress != nil {
s.progress.GetStats().BytesSkipped.Add(chunk.Size)
}
} else {
result.ChunksCreated++
result.BytesScanned += chunk.Size
if s.progress != nil {
s.progress.GetStats().ChunksCreated.Add(1)
s.progress.GetStats().BytesProcessed.Add(chunk.Size)
s.progress.UpdateChunkingActivity()
}
}
// Add chunk to packer immediately (streaming)
// This happens outside the database transaction
if !chunkExists {
s.packerMu.Lock()
err := s.packer.AddChunk(&blob.ChunkRef{
Hash: chunk.Hash,
Data: chunk.Data,
})
if err == blob.ErrBlobSizeLimitExceeded {
// Finalize current blob and retry
if err := s.packer.FinalizeBlob(); err != nil {
s.packerMu.Unlock()
return fmt.Errorf("finalizing blob: %w", err)
}
// Retry adding the chunk
if err := s.packer.AddChunk(&blob.ChunkRef{
Hash: chunk.Hash,
Data: chunk.Data,
}); err != nil {
s.packerMu.Unlock()
return fmt.Errorf("adding chunk after finalize: %w", err)
}
} else if err != nil {
s.packerMu.Unlock()
return fmt.Errorf("adding chunk to packer: %w", err)
}
s.packerMu.Unlock()
}
// Clear chunk data from memory immediately after use
chunk.Data = nil
chunkIndex++
return nil
})
if err != nil {
return fmt.Errorf("chunking file: %w", err)
}
// Store file-chunk associations and chunk-file mappings in database
err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
for _, ci := range chunks {
// Create file-chunk mapping
if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil {
return fmt.Errorf("creating file chunk: %w", err)
}
// Create chunk-file mapping
chunkFile := &database.ChunkFile{
ChunkHash: ci.fileChunk.ChunkHash,
FilePath: fileToProcess.Path,
FileOffset: ci.offset,
Length: ci.size,
}
if err := s.repos.ChunkFiles.Create(txCtx, tx, chunkFile); err != nil {
return fmt.Errorf("creating chunk file: %w", err)
}
}
// Add file to snapshot
if err := s.repos.Snapshots.AddFile(txCtx, tx, s.snapshotID, fileToProcess.Path); err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
return nil
})
return err
}

View File

@@ -2,16 +2,21 @@ package backup_test
import (
"context"
"database/sql"
"path/filepath"
"testing"
"time"
"git.eeqj.de/sneak/vaultik/internal/backup"
"git.eeqj.de/sneak/vaultik/internal/database"
"git.eeqj.de/sneak/vaultik/internal/log"
"github.com/spf13/afero"
)
func TestScannerSimpleDirectory(t *testing.T) {
// Initialize logger for tests
log.Initialize(log.Config{})
// Create in-memory filesystem
fs := afero.NewMemMapFs()
@@ -56,25 +61,53 @@ func TestScannerSimpleDirectory(t *testing.T) {
// Create scanner
scanner := backup.NewScanner(backup.ScannerConfig{
FS: fs,
ChunkSize: 1024 * 16, // 16KB chunks for testing
Repositories: repos,
FS: fs,
ChunkSize: int64(1024 * 16), // 16KB chunks for testing
Repositories: repos,
MaxBlobSize: int64(1024 * 1024), // 1MB blobs
CompressionLevel: 3,
AgeRecipients: []string{"age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"}, // Test public key
})
// Scan the directory
// Create a snapshot record for testing
ctx := context.Background()
result, err := scanner.Scan(ctx, "/source")
snapshotID := "test-snapshot-001"
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
snapshot := &database.Snapshot{
ID: snapshotID,
Hostname: "test-host",
VaultikVersion: "test",
StartedAt: time.Now(),
CompletedAt: nil,
FileCount: 0,
ChunkCount: 0,
BlobCount: 0,
TotalSize: 0,
BlobSize: 0,
CompressionRatio: 1.0,
}
return repos.Snapshots.Create(ctx, tx, snapshot)
})
if err != nil {
t.Fatalf("failed to create snapshot: %v", err)
}
// Scan the directory
var result *backup.ScanResult
result, err = scanner.Scan(ctx, "/source", snapshotID)
if err != nil {
t.Fatalf("scan failed: %v", err)
}
// Verify results
if result.FilesScanned != 6 {
t.Errorf("expected 6 files scanned, got %d", result.FilesScanned)
// We now scan 6 files + 3 directories (source, subdir, subdir2) = 9 entries
if result.FilesScanned != 9 {
t.Errorf("expected 9 entries scanned, got %d", result.FilesScanned)
}
if result.BytesScanned != 97 { // Total size of all test files: 13 + 20 + 20 + 28 + 0 + 16 = 97
t.Errorf("expected 97 bytes scanned, got %d", result.BytesScanned)
// Directories have their own sizes, so the total will be more than just file content
if result.BytesScanned < 97 { // At minimum we have 97 bytes of file content
t.Errorf("expected at least 97 bytes scanned, got %d", result.BytesScanned)
}
// Verify files in database
@@ -83,8 +116,9 @@ func TestScannerSimpleDirectory(t *testing.T) {
t.Fatalf("failed to list files: %v", err)
}
if len(files) != 6 {
t.Errorf("expected 6 files in database, got %d", len(files))
// We should have 6 files + 3 directories = 9 entries
if len(files) != 9 {
t.Errorf("expected 9 entries in database, got %d", len(files))
}
// Verify specific file
@@ -126,6 +160,9 @@ func TestScannerSimpleDirectory(t *testing.T) {
}
func TestScannerWithSymlinks(t *testing.T) {
// Initialize logger for tests
log.Initialize(log.Config{})
// Create in-memory filesystem
fs := afero.NewMemMapFs()
@@ -171,14 +208,40 @@ func TestScannerWithSymlinks(t *testing.T) {
// Create scanner
scanner := backup.NewScanner(backup.ScannerConfig{
FS: fs,
ChunkSize: 1024 * 16,
Repositories: repos,
FS: fs,
ChunkSize: 1024 * 16,
Repositories: repos,
MaxBlobSize: int64(1024 * 1024),
CompressionLevel: 3,
AgeRecipients: []string{},
})
// Scan the directory
// Create a snapshot record for testing
ctx := context.Background()
result, err := scanner.Scan(ctx, "/source")
snapshotID := "test-snapshot-001"
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
snapshot := &database.Snapshot{
ID: snapshotID,
Hostname: "test-host",
VaultikVersion: "test",
StartedAt: time.Now(),
CompletedAt: nil,
FileCount: 0,
ChunkCount: 0,
BlobCount: 0,
TotalSize: 0,
BlobSize: 0,
CompressionRatio: 1.0,
}
return repos.Snapshots.Create(ctx, tx, snapshot)
})
if err != nil {
t.Fatalf("failed to create snapshot: %v", err)
}
// Scan the directory
var result *backup.ScanResult
result, err = scanner.Scan(ctx, "/source", snapshotID)
if err != nil {
t.Fatalf("scan failed: %v", err)
}
@@ -209,13 +272,19 @@ func TestScannerWithSymlinks(t *testing.T) {
}
func TestScannerLargeFile(t *testing.T) {
// Initialize logger for tests
log.Initialize(log.Config{})
// Create in-memory filesystem
fs := afero.NewMemMapFs()
// Create a large file that will require multiple chunks
// Use random content to ensure good chunk boundaries
largeContent := make([]byte, 1024*1024) // 1MB
for i := range largeContent {
largeContent[i] = byte(i % 256)
// Fill with pseudo-random data to ensure chunk boundaries
for i := 0; i < len(largeContent); i++ {
// Simple pseudo-random generator for deterministic tests
largeContent[i] = byte((i * 7919) ^ (i >> 3))
}
if err := fs.MkdirAll("/source", 0755); err != nil {
@@ -238,22 +307,54 @@ func TestScannerLargeFile(t *testing.T) {
repos := database.NewRepositories(db)
// Create scanner with 64KB chunks
// Create scanner with 64KB average chunk size
scanner := backup.NewScanner(backup.ScannerConfig{
FS: fs,
ChunkSize: 1024 * 64, // 64KB chunks
Repositories: repos,
FS: fs,
ChunkSize: int64(1024 * 64), // 64KB average chunks
Repositories: repos,
MaxBlobSize: int64(1024 * 1024),
CompressionLevel: 3,
AgeRecipients: []string{},
})
// Scan the directory
// Create a snapshot record for testing
ctx := context.Background()
result, err := scanner.Scan(ctx, "/source")
snapshotID := "test-snapshot-001"
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
snapshot := &database.Snapshot{
ID: snapshotID,
Hostname: "test-host",
VaultikVersion: "test",
StartedAt: time.Now(),
CompletedAt: nil,
FileCount: 0,
ChunkCount: 0,
BlobCount: 0,
TotalSize: 0,
BlobSize: 0,
CompressionRatio: 1.0,
}
return repos.Snapshots.Create(ctx, tx, snapshot)
})
if err != nil {
t.Fatalf("failed to create snapshot: %v", err)
}
// Scan the directory
var result *backup.ScanResult
result, err = scanner.Scan(ctx, "/source", snapshotID)
if err != nil {
t.Fatalf("scan failed: %v", err)
}
if result.BytesScanned != 1024*1024 {
t.Errorf("expected %d bytes scanned, got %d", 1024*1024, result.BytesScanned)
// We scan 1 file + 1 directory = 2 entries
if result.FilesScanned != 2 {
t.Errorf("expected 2 entries scanned, got %d", result.FilesScanned)
}
// The file size should be at least 1MB
if result.BytesScanned < 1024*1024 {
t.Errorf("expected at least %d bytes scanned, got %d", 1024*1024, result.BytesScanned)
}
// Verify chunks
@@ -262,11 +363,15 @@ func TestScannerLargeFile(t *testing.T) {
t.Fatalf("failed to get chunks: %v", err)
}
expectedChunks := 16 // 1MB / 64KB
if len(chunks) != expectedChunks {
t.Errorf("expected %d chunks, got %d", expectedChunks, len(chunks))
// With content-defined chunking, the number of chunks depends on content
// For a 1MB file, we should get at least 1 chunk
if len(chunks) < 1 {
t.Errorf("expected at least 1 chunk, got %d", len(chunks))
}
// Log the actual number of chunks for debugging
t.Logf("1MB file produced %d chunks with 64KB average chunk size", len(chunks))
// Verify chunk sequence
for i, fc := range chunks {
if fc.Idx != i {

542
internal/backup/snapshot.go Normal file
View File

@@ -0,0 +1,542 @@
package backup
// Snapshot Metadata Export Process
// ================================
//
// The snapshot metadata contains all information needed to restore a backup.
// Instead of creating a custom format, we use a trimmed copy of the SQLite
// database containing only data relevant to the current snapshot.
//
// Process Overview:
// 1. After all files/chunks/blobs are backed up, create a snapshot record
// 2. Close the main database to ensure consistency
// 3. Copy the entire database to a temporary file
// 4. Open the temporary database
// 5. Delete all snapshots except the current one
// 6. Delete all orphaned records:
// - Files not referenced by any remaining snapshot
// - Chunks not referenced by any remaining files
// - Blobs not containing any remaining chunks
// - All related mapping tables (file_chunks, chunk_files, blob_chunks)
// 7. Close the temporary database
// 8. Use sqlite3 to dump the cleaned database to SQL
// 9. Delete the temporary database file
// 10. Compress the SQL dump with zstd
// 11. Encrypt the compressed dump with age (if encryption is enabled)
// 12. Upload to S3 as: snapshots/{snapshot-id}.sql.zst[.age]
// 13. Reopen the main database
//
// Advantages of this approach:
// - No custom metadata format needed
// - Reuses existing database schema and relationships
// - SQL dumps are portable and compress well
// - Restore process can simply execute the SQL
// - Atomic and consistent snapshot of all metadata
//
// TODO: Future improvements:
// - Add snapshot-file relationships to track which files belong to which snapshot
// - Implement incremental snapshots that reference previous snapshots
// - Add snapshot manifest with additional metadata (size, chunk count, etc.)
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"time"
"git.eeqj.de/sneak/vaultik/internal/database"
"git.eeqj.de/sneak/vaultik/internal/log"
"github.com/klauspost/compress/zstd"
)
// SnapshotManager handles snapshot creation and metadata export
type SnapshotManager struct {
repos *database.Repositories
s3Client S3Client
encryptor Encryptor
}
// Encryptor interface for snapshot encryption
type Encryptor interface {
Encrypt(data []byte) ([]byte, error)
}
// NewSnapshotManager creates a new snapshot manager
func NewSnapshotManager(repos *database.Repositories, s3Client S3Client, encryptor Encryptor) *SnapshotManager {
return &SnapshotManager{
repos: repos,
s3Client: s3Client,
encryptor: encryptor,
}
}
// CreateSnapshot creates a new snapshot record in the database at the start of a backup
func (sm *SnapshotManager) CreateSnapshot(ctx context.Context, hostname, version string) (string, error) {
snapshotID := fmt.Sprintf("%s-%s", hostname, time.Now().Format("20060102-150405"))
snapshot := &database.Snapshot{
ID: snapshotID,
Hostname: hostname,
VaultikVersion: version,
StartedAt: time.Now(),
CompletedAt: nil, // Not completed yet
FileCount: 0,
ChunkCount: 0,
BlobCount: 0,
TotalSize: 0,
BlobSize: 0,
CompressionRatio: 1.0,
}
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return sm.repos.Snapshots.Create(ctx, tx, snapshot)
})
if err != nil {
return "", fmt.Errorf("creating snapshot: %w", err)
}
log.Info("Created snapshot", "snapshot_id", snapshotID)
return snapshotID, nil
}
// UpdateSnapshotStats updates the statistics for a snapshot during backup
func (sm *SnapshotManager) UpdateSnapshotStats(ctx context.Context, snapshotID string, stats BackupStats) error {
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return sm.repos.Snapshots.UpdateCounts(ctx, tx, snapshotID,
int64(stats.FilesScanned),
int64(stats.ChunksCreated),
int64(stats.BlobsCreated),
stats.BytesScanned,
stats.BytesUploaded,
)
})
if err != nil {
return fmt.Errorf("updating snapshot stats: %w", err)
}
return nil
}
// CompleteSnapshot marks a snapshot as completed and exports its metadata
func (sm *SnapshotManager) CompleteSnapshot(ctx context.Context, snapshotID string) error {
// Mark the snapshot as completed
err := sm.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return sm.repos.Snapshots.MarkComplete(ctx, tx, snapshotID)
})
if err != nil {
return fmt.Errorf("marking snapshot complete: %w", err)
}
log.Info("Completed snapshot", "snapshot_id", snapshotID)
return nil
}
// ExportSnapshotMetadata exports snapshot metadata to S3
//
// This method executes the complete snapshot metadata export process:
// 1. Creates a temporary directory for working files
// 2. Copies the main database to preserve its state
// 3. Cleans the copy to contain only current snapshot data
// 4. Dumps the cleaned database to SQL
// 5. Compresses the SQL dump with zstd
// 6. Encrypts the compressed data (if encryption is enabled)
// 7. Uploads to S3 at: snapshots/{snapshot-id}.sql.zst[.age]
//
// The caller is responsible for:
// - Ensuring the main database is closed before calling this method
// - Reopening the main database after this method returns
//
// This ensures database consistency during the copy operation.
func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath string, snapshotID string) error {
log.Info("Exporting snapshot metadata", "snapshot_id", snapshotID)
// Create temp directory for all temporary files
tempDir, err := os.MkdirTemp("", "vaultik-snapshot-*")
if err != nil {
return fmt.Errorf("creating temp dir: %w", err)
}
defer func() {
if err := os.RemoveAll(tempDir); err != nil {
log.Debug("Failed to remove temp dir", "path", tempDir, "error", err)
}
}()
// Step 1: Copy database to temp file
// The main database should be closed at this point
tempDBPath := filepath.Join(tempDir, "snapshot.db")
if err := copyFile(dbPath, tempDBPath); err != nil {
return fmt.Errorf("copying database: %w", err)
}
// Step 2: Clean the temp database to only contain current snapshot data
if err := sm.cleanSnapshotDB(ctx, tempDBPath, snapshotID); err != nil {
return fmt.Errorf("cleaning snapshot database: %w", err)
}
// Step 3: Dump the cleaned database to SQL
dumpPath := filepath.Join(tempDir, "snapshot.sql")
if err := sm.dumpDatabase(tempDBPath, dumpPath); err != nil {
return fmt.Errorf("dumping database: %w", err)
}
// Step 4: Compress the SQL dump
compressedPath := filepath.Join(tempDir, "snapshot.sql.zst")
if err := sm.compressDump(dumpPath, compressedPath); err != nil {
return fmt.Errorf("compressing dump: %w", err)
}
// Step 5: Read compressed data for encryption/upload
compressedData, err := os.ReadFile(compressedPath)
if err != nil {
return fmt.Errorf("reading compressed dump: %w", err)
}
// Step 6: Encrypt if encryptor is available
finalData := compressedData
if sm.encryptor != nil {
encrypted, err := sm.encryptor.Encrypt(compressedData)
if err != nil {
return fmt.Errorf("encrypting snapshot: %w", err)
}
finalData = encrypted
}
// Step 7: Generate blob manifest (before closing temp DB)
blobManifest, err := sm.generateBlobManifest(ctx, tempDBPath, snapshotID)
if err != nil {
return fmt.Errorf("generating blob manifest: %w", err)
}
// Step 8: Upload to S3 in snapshot subdirectory
// Upload database backup (encrypted)
dbKey := fmt.Sprintf("metadata/%s/db.zst", snapshotID)
if sm.encryptor != nil {
dbKey += ".age"
}
if err := sm.s3Client.PutObject(ctx, dbKey, bytes.NewReader(finalData)); err != nil {
return fmt.Errorf("uploading snapshot database: %w", err)
}
// Upload blob manifest (unencrypted, compressed)
manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
if err := sm.s3Client.PutObject(ctx, manifestKey, bytes.NewReader(blobManifest)); err != nil {
return fmt.Errorf("uploading blob manifest: %w", err)
}
log.Info("Uploaded snapshot metadata",
"snapshot_id", snapshotID,
"db_size", len(finalData),
"manifest_size", len(blobManifest))
return nil
}
// cleanSnapshotDB removes all data except for the specified snapshot
//
// Current implementation:
// Since we don't yet have snapshot-file relationships, this currently only
// removes other snapshots. In a complete implementation, it would:
//
// 1. Delete all snapshots except the current one
// 2. Delete files not belonging to the current snapshot
// 3. Delete file_chunks for deleted files (CASCADE)
// 4. Delete chunk_files for deleted files
// 5. Delete chunks with no remaining file references
// 6. Delete blob_chunks for deleted chunks
// 7. Delete blobs with no remaining chunks
//
// The order is important to maintain referential integrity.
//
// Future implementation when we have snapshot_files table:
//
// DELETE FROM snapshots WHERE id != ?;
// DELETE FROM files WHERE path NOT IN (
// SELECT file_path FROM snapshot_files WHERE snapshot_id = ?
// );
// DELETE FROM chunks WHERE chunk_hash NOT IN (
// SELECT DISTINCT chunk_hash FROM file_chunks
// );
// DELETE FROM blobs WHERE blob_hash NOT IN (
// SELECT DISTINCT blob_hash FROM blob_chunks
// );
func (sm *SnapshotManager) cleanSnapshotDB(ctx context.Context, dbPath string, snapshotID string) error {
// Open the temp database
db, err := database.New(ctx, dbPath)
if err != nil {
return fmt.Errorf("opening temp database: %w", err)
}
defer func() {
if err := db.Close(); err != nil {
log.Debug("Failed to close temp database", "error", err)
}
}()
// Start a transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginning transaction: %w", err)
}
defer func() {
if rbErr := tx.Rollback(); rbErr != nil && rbErr != sql.ErrTxDone {
log.Debug("Failed to rollback transaction", "error", rbErr)
}
}()
// Step 1: Delete all other snapshots
_, err = tx.ExecContext(ctx, "DELETE FROM snapshots WHERE id != ?", snapshotID)
if err != nil {
return fmt.Errorf("deleting other snapshots: %w", err)
}
// Step 2: Delete files not in this snapshot
_, err = tx.ExecContext(ctx, `
DELETE FROM files
WHERE path NOT IN (
SELECT file_path FROM snapshot_files WHERE snapshot_id = ?
)`, snapshotID)
if err != nil {
return fmt.Errorf("deleting orphaned files: %w", err)
}
// Step 3: file_chunks will be deleted via CASCADE from files
// Step 4: Delete chunk_files for deleted files
_, err = tx.ExecContext(ctx, `
DELETE FROM chunk_files
WHERE file_path NOT IN (
SELECT path FROM files
)`)
if err != nil {
return fmt.Errorf("deleting orphaned chunk_files: %w", err)
}
// Step 5: Delete chunks with no remaining file references
_, err = tx.ExecContext(ctx, `
DELETE FROM chunks
WHERE chunk_hash NOT IN (
SELECT DISTINCT chunk_hash FROM file_chunks
)`)
if err != nil {
return fmt.Errorf("deleting orphaned chunks: %w", err)
}
// Step 6: Delete blob_chunks for deleted chunks
_, err = tx.ExecContext(ctx, `
DELETE FROM blob_chunks
WHERE chunk_hash NOT IN (
SELECT chunk_hash FROM chunks
)`)
if err != nil {
return fmt.Errorf("deleting orphaned blob_chunks: %w", err)
}
// Step 7: Delete blobs not in this snapshot
_, err = tx.ExecContext(ctx, `
DELETE FROM blobs
WHERE blob_hash NOT IN (
SELECT blob_hash FROM snapshot_blobs WHERE snapshot_id = ?
)`, snapshotID)
if err != nil {
return fmt.Errorf("deleting orphaned blobs: %w", err)
}
// Step 8: Delete orphaned snapshot_files and snapshot_blobs
_, err = tx.ExecContext(ctx, "DELETE FROM snapshot_files WHERE snapshot_id != ?", snapshotID)
if err != nil {
return fmt.Errorf("deleting orphaned snapshot_files: %w", err)
}
_, err = tx.ExecContext(ctx, "DELETE FROM snapshot_blobs WHERE snapshot_id != ?", snapshotID)
if err != nil {
return fmt.Errorf("deleting orphaned snapshot_blobs: %w", err)
}
// Commit transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
return nil
}
// dumpDatabase creates a SQL dump of the database
func (sm *SnapshotManager) dumpDatabase(dbPath, dumpPath string) error {
cmd := exec.Command("sqlite3", dbPath, ".dump")
output, err := cmd.Output()
if err != nil {
return fmt.Errorf("running sqlite3 dump: %w", err)
}
if err := os.WriteFile(dumpPath, output, 0644); err != nil {
return fmt.Errorf("writing dump file: %w", err)
}
return nil
}
// compressDump compresses the SQL dump using zstd
func (sm *SnapshotManager) compressDump(inputPath, outputPath string) error {
input, err := os.Open(inputPath)
if err != nil {
return fmt.Errorf("opening input file: %w", err)
}
defer func() {
if err := input.Close(); err != nil {
log.Debug("Failed to close input file", "error", err)
}
}()
output, err := os.Create(outputPath)
if err != nil {
return fmt.Errorf("creating output file: %w", err)
}
defer func() {
if err := output.Close(); err != nil {
log.Debug("Failed to close output file", "error", err)
}
}()
// Create zstd encoder with good compression and multithreading
zstdWriter, err := zstd.NewWriter(output,
zstd.WithEncoderLevel(zstd.SpeedBetterCompression),
zstd.WithEncoderConcurrency(runtime.NumCPU()),
zstd.WithWindowSize(4<<20), // 4MB window for metadata files
)
if err != nil {
return fmt.Errorf("creating zstd writer: %w", err)
}
defer func() {
if err := zstdWriter.Close(); err != nil {
log.Debug("Failed to close zstd writer", "error", err)
}
}()
if _, err := io.Copy(zstdWriter, input); err != nil {
return fmt.Errorf("compressing data: %w", err)
}
return nil
}
// copyFile copies a file from src to dst
func copyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer func() {
if err := sourceFile.Close(); err != nil {
log.Debug("Failed to close source file", "error", err)
}
}()
destFile, err := os.Create(dst)
if err != nil {
return err
}
defer func() {
if err := destFile.Close(); err != nil {
log.Debug("Failed to close destination file", "error", err)
}
}()
if _, err := io.Copy(destFile, sourceFile); err != nil {
return err
}
return nil
}
// generateBlobManifest creates a compressed JSON list of all blobs in the snapshot
func (sm *SnapshotManager) generateBlobManifest(ctx context.Context, dbPath string, snapshotID string) ([]byte, error) {
// Open the cleaned database using the database package
db, err := database.New(ctx, dbPath)
if err != nil {
return nil, fmt.Errorf("opening database: %w", err)
}
defer func() { _ = db.Close() }()
// Create repositories to access the data
repos := database.NewRepositories(db)
// Get all blobs for this snapshot
blobs, err := repos.Snapshots.GetBlobHashes(ctx, snapshotID)
if err != nil {
return nil, fmt.Errorf("getting snapshot blobs: %w", err)
}
// Create manifest structure
manifest := struct {
SnapshotID string `json:"snapshot_id"`
Timestamp string `json:"timestamp"`
BlobCount int `json:"blob_count"`
Blobs []string `json:"blobs"`
}{
SnapshotID: snapshotID,
Timestamp: time.Now().UTC().Format(time.RFC3339),
BlobCount: len(blobs),
Blobs: blobs,
}
// Marshal to JSON
jsonData, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return nil, fmt.Errorf("marshaling manifest: %w", err)
}
// Compress with zstd
compressed, err := compressData(jsonData)
if err != nil {
return nil, fmt.Errorf("compressing manifest: %w", err)
}
log.Info("Generated blob manifest",
"snapshot_id", snapshotID,
"blob_count", len(blobs),
"json_size", len(jsonData),
"compressed_size", len(compressed))
return compressed, nil
}
// compressData compresses data using zstd
func compressData(data []byte) ([]byte, error) {
var buf bytes.Buffer
w, err := zstd.NewWriter(&buf,
zstd.WithEncoderLevel(zstd.SpeedBetterCompression),
)
if err != nil {
return nil, err
}
if _, err := w.Write(data); err != nil {
_ = w.Close()
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// BackupStats contains statistics from a backup operation
type BackupStats struct {
FilesScanned int
BytesScanned int64
ChunksCreated int
BlobsCreated int
BytesUploaded int64
}

View File

@@ -0,0 +1,147 @@
package backup
import (
"context"
"database/sql"
"path/filepath"
"testing"
"git.eeqj.de/sneak/vaultik/internal/database"
"git.eeqj.de/sneak/vaultik/internal/log"
)
func TestCleanSnapshotDBEmptySnapshot(t *testing.T) {
// Initialize logger
log.Initialize(log.Config{})
ctx := context.Background()
// Create a test database
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "test.db")
db, err := database.New(ctx, dbPath)
if err != nil {
t.Fatalf("failed to create database: %v", err)
}
repos := database.NewRepositories(db)
// Create an empty snapshot
snapshot := &database.Snapshot{
ID: "empty-snapshot",
Hostname: "test-host",
}
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return repos.Snapshots.Create(ctx, tx, snapshot)
})
if err != nil {
t.Fatalf("failed to create snapshot: %v", err)
}
// Create some files and chunks not associated with any snapshot
file := &database.File{Path: "/orphan/file.txt", Size: 1000}
chunk := &database.Chunk{ChunkHash: "orphan-chunk", SHA256: "orphan-chunk", Size: 500}
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
if err := repos.Files.Create(ctx, tx, file); err != nil {
return err
}
return repos.Chunks.Create(ctx, tx, chunk)
})
if err != nil {
t.Fatalf("failed to create orphan data: %v", err)
}
// Close the database
if err := db.Close(); err != nil {
t.Fatalf("failed to close database: %v", err)
}
// Copy database
tempDBPath := filepath.Join(tempDir, "temp.db")
if err := copyFile(dbPath, tempDBPath); err != nil {
t.Fatalf("failed to copy database: %v", err)
}
// Clean the database
sm := &SnapshotManager{}
if err := sm.cleanSnapshotDB(ctx, tempDBPath, snapshot.ID); err != nil {
t.Fatalf("failed to clean snapshot database: %v", err)
}
// Verify the cleaned database
cleanedDB, err := database.New(ctx, tempDBPath)
if err != nil {
t.Fatalf("failed to open cleaned database: %v", err)
}
defer func() {
if err := cleanedDB.Close(); err != nil {
t.Errorf("failed to close database: %v", err)
}
}()
cleanedRepos := database.NewRepositories(cleanedDB)
// Verify snapshot exists
verifySnapshot, err := cleanedRepos.Snapshots.GetByID(ctx, snapshot.ID)
if err != nil {
t.Fatalf("failed to get snapshot: %v", err)
}
if verifySnapshot == nil {
t.Error("snapshot should exist")
}
// Verify orphan file is gone
f, err := cleanedRepos.Files.GetByPath(ctx, file.Path)
if err != nil {
t.Fatalf("failed to check file: %v", err)
}
if f != nil {
t.Error("orphan file should not exist")
}
// Verify orphan chunk is gone
c, err := cleanedRepos.Chunks.GetByHash(ctx, chunk.ChunkHash)
if err != nil {
t.Fatalf("failed to check chunk: %v", err)
}
if c != nil {
t.Error("orphan chunk should not exist")
}
}
func TestCleanSnapshotDBNonExistentSnapshot(t *testing.T) {
// Initialize logger
log.Initialize(log.Config{})
ctx := context.Background()
// Create a test database
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "test.db")
db, err := database.New(ctx, dbPath)
if err != nil {
t.Fatalf("failed to create database: %v", err)
}
// Close immediately
if err := db.Close(); err != nil {
t.Fatalf("failed to close database: %v", err)
}
// Copy database
tempDBPath := filepath.Join(tempDir, "temp.db")
if err := copyFile(dbPath, tempDBPath); err != nil {
t.Fatalf("failed to copy database: %v", err)
}
// Try to clean with non-existent snapshot
sm := &SnapshotManager{}
err = sm.cleanSnapshotDB(ctx, tempDBPath, "non-existent-snapshot")
// Should not error - it will just delete everything
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}