Add deterministic deduplication, rclone backend, and database purge command
- Implement deterministic blob hashing using double SHA256 of uncompressed plaintext data, enabling deduplication even after local DB is cleared - Add Stat() check before blob upload to skip existing blobs in storage - Add rclone storage backend for additional remote storage options - Add 'vaultik database purge' command to erase local state DB - Add 'vaultik remote check' command to verify remote connectivity - Show configured snapshots in 'vaultik snapshot list' output - Skip macOS resource fork files (._*) when listing remote snapshots - Use multi-threaded zstd compression (CPUs - 2 threads) - Add writer tests for double hashing behavior
This commit is contained in:
@@ -39,7 +39,7 @@ type ProgressStats struct {
|
||||
BlobsCreated atomic.Int64
|
||||
BlobsUploaded atomic.Int64
|
||||
BytesUploaded atomic.Int64
|
||||
UploadDurationMs atomic.Int64 // Total milliseconds spent uploading to S3
|
||||
UploadDurationMs atomic.Int64 // Total milliseconds spent uploading
|
||||
CurrentFile atomic.Value // stores string
|
||||
TotalSize atomic.Int64 // Total size to process (set after scan phase)
|
||||
TotalFiles atomic.Int64 // Total files to process in phase 2
|
||||
@@ -273,7 +273,7 @@ func (pr *ProgressReporter) printDetailedStatus() {
|
||||
"created", blobsCreated,
|
||||
"uploaded", blobsUploaded,
|
||||
"pending", blobsCreated-blobsUploaded)
|
||||
log.Info("Total uploaded to S3",
|
||||
log.Info("Total uploaded to remote",
|
||||
"uploaded", humanize.Bytes(uint64(bytesUploaded)),
|
||||
"compression_ratio", formatRatio(bytesUploaded, bytesScanned))
|
||||
if currentFile != "" {
|
||||
@@ -336,7 +336,7 @@ func (pr *ProgressReporter) ReportUploadStart(blobHash string, size int64) {
|
||||
pr.stats.CurrentUpload.Store(info)
|
||||
|
||||
// Log the start of upload
|
||||
log.Info("Starting blob upload to S3",
|
||||
log.Info("Starting blob upload",
|
||||
"hash", blobHash[:8]+"...",
|
||||
"size", humanize.Bytes(uint64(size)))
|
||||
}
|
||||
|
||||
@@ -625,11 +625,21 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
// Update result stats
|
||||
if needsProcessing {
|
||||
result.BytesScanned += info.Size()
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesScanned.Add(info.Size())
|
||||
}
|
||||
} else {
|
||||
result.FilesSkipped++
|
||||
result.BytesSkipped += info.Size()
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().FilesSkipped.Add(1)
|
||||
s.progress.GetStats().BytesSkipped.Add(info.Size())
|
||||
}
|
||||
}
|
||||
result.FilesScanned++
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().FilesScanned.Add(1)
|
||||
}
|
||||
|
||||
// Output periodic status
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
@@ -921,9 +931,10 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
startTime := time.Now().UTC()
|
||||
finishedBlob := blobWithReader.FinishedBlob
|
||||
|
||||
// Report upload start
|
||||
// Report upload start and increment blobs created
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
|
||||
s.progress.GetStats().BlobsCreated.Add(1)
|
||||
}
|
||||
|
||||
// Upload to storage first (without holding any locks)
|
||||
@@ -964,41 +975,55 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
|
||||
// Create sharded path: blobs/ca/fe/cafebabe...
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err)
|
||||
|
||||
// Check if blob already exists in remote storage (deduplication after restart)
|
||||
blobExists := false
|
||||
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
||||
blobExists = true
|
||||
log.Info("Blob already exists in storage, skipping upload",
|
||||
"hash", finishedBlob.Hash,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
fmt.Printf("Blob exists: %s (%s, skipped upload)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
}
|
||||
|
||||
uploadDuration := time.Since(startTime)
|
||||
if !blobExists {
|
||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err)
|
||||
}
|
||||
|
||||
// Calculate upload speed
|
||||
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
// Print blob stored message
|
||||
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
humanize.Bytes(uint64(uploadSpeedBps)),
|
||||
uploadDuration.Round(time.Millisecond))
|
||||
// Calculate upload speed
|
||||
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
||||
|
||||
// Log upload stats
|
||||
uploadSpeedBits := uploadSpeedBps * 8 // bits per second
|
||||
log.Info("Successfully uploaded blob to storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
"speed", humanize.SI(uploadSpeedBits, "bps"))
|
||||
// Print blob stored message
|
||||
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
humanize.Bytes(uint64(uploadSpeedBps)),
|
||||
uploadDuration.Round(time.Millisecond))
|
||||
|
||||
// Report upload complete
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
||||
}
|
||||
// Log upload stats
|
||||
uploadSpeedBits := uploadSpeedBps * 8 // bits per second
|
||||
log.Info("Successfully uploaded blob to storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
"speed", humanize.SI(uploadSpeedBits, "bps"))
|
||||
|
||||
// Update progress
|
||||
if s.progress != nil {
|
||||
stats := s.progress.GetStats()
|
||||
stats.BlobsUploaded.Add(1)
|
||||
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
||||
stats.BlobsCreated.Add(1)
|
||||
// Report upload complete
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
||||
}
|
||||
|
||||
// Update progress after upload completes
|
||||
if s.progress != nil {
|
||||
stats := s.progress.GetStats()
|
||||
stats.BlobsUploaded.Add(1)
|
||||
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
||||
}
|
||||
}
|
||||
|
||||
// Store metadata in database (after upload is complete)
|
||||
@@ -1013,6 +1038,9 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
return fmt.Errorf("parsing finished blob ID: %w", err)
|
||||
}
|
||||
|
||||
// Track upload duration (0 if blob already existed)
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
err = s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
// Update blob upload timestamp
|
||||
if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil {
|
||||
@@ -1024,16 +1052,18 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
return fmt.Errorf("adding blob to snapshot: %w", err)
|
||||
}
|
||||
|
||||
// Record upload metrics
|
||||
upload := &database.Upload{
|
||||
BlobHash: finishedBlob.Hash,
|
||||
SnapshotID: s.snapshotID,
|
||||
UploadedAt: startTime,
|
||||
Size: finishedBlob.Compressed,
|
||||
DurationMs: uploadDuration.Milliseconds(),
|
||||
}
|
||||
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
|
||||
return fmt.Errorf("recording upload metrics: %w", err)
|
||||
// Record upload metrics (only for actual uploads, not deduplicated blobs)
|
||||
if !blobExists {
|
||||
upload := &database.Upload{
|
||||
BlobHash: finishedBlob.Hash,
|
||||
SnapshotID: s.snapshotID,
|
||||
UploadedAt: startTime,
|
||||
Size: finishedBlob.Compressed,
|
||||
DurationMs: uploadDuration.Milliseconds(),
|
||||
}
|
||||
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
|
||||
return fmt.Errorf("recording upload metrics: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -19,24 +19,19 @@ package snapshot
|
||||
// - Blobs not containing any remaining chunks
|
||||
// - All related mapping tables (file_chunks, chunk_files, blob_chunks)
|
||||
// 7. Close the temporary database
|
||||
// 8. Use sqlite3 to dump the cleaned database to SQL
|
||||
// 9. Delete the temporary database file
|
||||
// 10. Compress the SQL dump with zstd
|
||||
// 11. Encrypt the compressed dump with age (if encryption is enabled)
|
||||
// 12. Upload to S3 as: snapshots/{snapshot-id}.sql.zst[.age]
|
||||
// 13. Reopen the main database
|
||||
// 8. VACUUM the database to remove deleted data and compact (security critical)
|
||||
// 9. Compress the binary database with zstd
|
||||
// 10. Encrypt the compressed database with age (if encryption is enabled)
|
||||
// 11. Upload to S3 as: metadata/{snapshot-id}/db.zst.age
|
||||
// 12. Reopen the main database
|
||||
//
|
||||
// Advantages of this approach:
|
||||
// - No custom metadata format needed
|
||||
// - Reuses existing database schema and relationships
|
||||
// - SQL dumps are portable and compress well
|
||||
// - Restore process can simply execute the SQL
|
||||
// - Binary SQLite files are portable and compress well
|
||||
// - Fast restore - just decompress and open (no SQL parsing)
|
||||
// - VACUUM ensures no deleted data leaks
|
||||
// - Atomic and consistent snapshot of all metadata
|
||||
//
|
||||
// TODO: Future improvements:
|
||||
// - Add snapshot-file relationships to track which files belong to which snapshot
|
||||
// - Implement incremental snapshots that reference previous snapshots
|
||||
// - Add snapshot manifest with additional metadata (size, chunk count, etc.)
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@@ -257,20 +252,20 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
"total_uncompressed_size", humanize.Bytes(uint64(stats.UncompressedSize)),
|
||||
"compression_ratio", fmt.Sprintf("%.2fx", float64(stats.UncompressedSize)/float64(stats.CompressedSize)))
|
||||
|
||||
// Step 3: Dump the cleaned database to SQL
|
||||
dumpPath := filepath.Join(tempDir, "snapshot.sql")
|
||||
if err := sm.dumpDatabase(tempDBPath, dumpPath); err != nil {
|
||||
return fmt.Errorf("dumping database: %w", err)
|
||||
// Step 3: VACUUM the database to remove deleted data and compact
|
||||
// This is critical for security - ensures no stale/deleted data is uploaded
|
||||
if err := sm.vacuumDatabase(tempDBPath); err != nil {
|
||||
return fmt.Errorf("vacuuming database: %w", err)
|
||||
}
|
||||
log.Debug("SQL dump complete", "size", humanize.Bytes(uint64(sm.getFileSize(dumpPath))))
|
||||
log.Debug("Database vacuumed", "size", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))))
|
||||
|
||||
// Step 4: Compress and encrypt the SQL dump
|
||||
compressedPath := filepath.Join(tempDir, "snapshot.sql.zst.age")
|
||||
if err := sm.compressDump(dumpPath, compressedPath); err != nil {
|
||||
return fmt.Errorf("compressing dump: %w", err)
|
||||
// Step 4: Compress and encrypt the binary database file
|
||||
compressedPath := filepath.Join(tempDir, "db.zst.age")
|
||||
if err := sm.compressFile(tempDBPath, compressedPath); err != nil {
|
||||
return fmt.Errorf("compressing database: %w", err)
|
||||
}
|
||||
log.Debug("Compression complete",
|
||||
"original_size", humanize.Bytes(uint64(sm.getFileSize(dumpPath))),
|
||||
"original_size", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))),
|
||||
"compressed_size", humanize.Bytes(uint64(sm.getFileSize(compressedPath))))
|
||||
|
||||
// Step 5: Read compressed and encrypted data for upload
|
||||
@@ -295,7 +290,7 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
}
|
||||
dbUploadDuration := time.Since(dbUploadStart)
|
||||
dbUploadSpeed := float64(len(finalData)) * 8 / dbUploadDuration.Seconds() // bits per second
|
||||
log.Info("Uploaded snapshot database to S3",
|
||||
log.Info("Uploaded snapshot database",
|
||||
"path", dbKey,
|
||||
"size", humanize.Bytes(uint64(len(finalData))),
|
||||
"duration", dbUploadDuration,
|
||||
@@ -309,7 +304,7 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
}
|
||||
manifestUploadDuration := time.Since(manifestUploadStart)
|
||||
manifestUploadSpeed := float64(len(blobManifest)) * 8 / manifestUploadDuration.Seconds() // bits per second
|
||||
log.Info("Uploaded blob manifest to S3",
|
||||
log.Info("Uploaded blob manifest",
|
||||
"path", manifestKey,
|
||||
"size", humanize.Bytes(uint64(len(blobManifest))),
|
||||
"duration", manifestUploadDuration,
|
||||
@@ -438,26 +433,21 @@ func (sm *SnapshotManager) cleanSnapshotDB(ctx context.Context, dbPath string, s
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// dumpDatabase creates a SQL dump of the database
|
||||
func (sm *SnapshotManager) dumpDatabase(dbPath, dumpPath string) error {
|
||||
log.Debug("Running sqlite3 dump command", "source", dbPath, "destination", dumpPath)
|
||||
cmd := exec.Command("sqlite3", dbPath, ".dump")
|
||||
// vacuumDatabase runs VACUUM on the database to remove deleted data and compact
|
||||
// This is critical for security - ensures no stale/deleted data pages are uploaded
|
||||
func (sm *SnapshotManager) vacuumDatabase(dbPath string) error {
|
||||
log.Debug("Running VACUUM on database", "path", dbPath)
|
||||
cmd := exec.Command("sqlite3", dbPath, "VACUUM;")
|
||||
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return fmt.Errorf("running sqlite3 dump: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("SQL dump generated", "size", humanize.Bytes(uint64(len(output))))
|
||||
if err := afero.WriteFile(sm.fs, dumpPath, output, 0644); err != nil {
|
||||
return fmt.Errorf("writing dump file: %w", err)
|
||||
if output, err := cmd.CombinedOutput(); err != nil {
|
||||
return fmt.Errorf("running VACUUM: %w (output: %s)", err, string(output))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// compressDump compresses the SQL dump using zstd
|
||||
func (sm *SnapshotManager) compressDump(inputPath, outputPath string) error {
|
||||
// compressFile compresses a file using zstd and encrypts with age
|
||||
func (sm *SnapshotManager) compressFile(inputPath, outputPath string) error {
|
||||
input, err := sm.fs.Open(inputPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening input file: %w", err)
|
||||
@@ -676,7 +666,7 @@ func (sm *SnapshotManager) CleanupIncompleteSnapshots(ctx context.Context, hostn
|
||||
} else {
|
||||
// Metadata exists - this snapshot was completed but database wasn't updated
|
||||
// This shouldn't happen in normal operation, but mark it complete
|
||||
log.Warn("Found snapshot with S3 metadata but incomplete in database", "snapshot_id", snapshot.ID)
|
||||
log.Warn("Found snapshot with remote metadata but incomplete in database", "snapshot_id", snapshot.ID)
|
||||
if err := sm.repos.Snapshots.MarkComplete(ctx, nil, snapshot.ID.String()); err != nil {
|
||||
log.Error("Failed to mark snapshot as complete in database", "snapshot_id", snapshot.ID, "error", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user