vaultik/internal/snapshot/scanner.go
sneak 417b25a5f5 Add custom types, version command, and restore --verify flag
- Add internal/types package with type-safe wrappers for IDs, hashes,
  paths, and credentials (FileID, BlobID, ChunkHash, etc.)
- Implement driver.Valuer and sql.Scanner for UUID-based types
- Add `vaultik version` command showing version, commit, go version
- Add `--verify` flag to restore command that checksums all restored
  files against expected chunk hashes with progress bar
- Remove fetch.go (dead code, functionality in restore)
- Clean up TODO.md, remove completed items
- Update all database and snapshot code to use new custom types
2026-01-14 17:11:52 -08:00

1379 lines
43 KiB
Go

package snapshot
import (
"context"
"database/sql"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"git.eeqj.de/sneak/vaultik/internal/blob"
"git.eeqj.de/sneak/vaultik/internal/chunker"
"git.eeqj.de/sneak/vaultik/internal/database"
"git.eeqj.de/sneak/vaultik/internal/log"
"git.eeqj.de/sneak/vaultik/internal/storage"
"git.eeqj.de/sneak/vaultik/internal/types"
"github.com/dustin/go-humanize"
"github.com/gobwas/glob"
"github.com/spf13/afero"
)
// FileToProcess holds information about a file that needs processing
type FileToProcess struct {
Path string
FileInfo os.FileInfo
File *database.File
}
// pendingFileData holds all data needed to commit a file to the database
type pendingFileData struct {
file *database.File
fileChunks []database.FileChunk
chunkFiles []database.ChunkFile
}
// compiledPattern holds a compiled glob pattern and whether it's anchored
type compiledPattern struct {
pattern glob.Glob
anchored bool // If true, only matches from root of source dir
original string
}
// Scanner scans directories and populates the database with file and chunk information
type Scanner struct {
fs afero.Fs
chunker *chunker.Chunker
packer *blob.Packer
repos *database.Repositories
storage storage.Storer
maxBlobSize int64
compressionLevel int
ageRecipient string
snapshotID string // Current snapshot being processed
currentSourcePath string // Current source directory being scanned (for restore path stripping)
exclude []string // Glob patterns for files/directories to exclude
compiledExclude []compiledPattern // Compiled glob patterns
progress *ProgressReporter
skipErrors bool // Skip file read errors (log loudly but continue)
// In-memory cache of known chunk hashes for fast existence checks
knownChunks map[string]struct{}
knownChunksMu sync.RWMutex
// Pending chunk hashes - chunks that have been added to packer but not yet committed to DB
// When a blob finalizes, the committed chunks are removed from this set
pendingChunkHashes map[string]struct{}
pendingChunkHashesMu sync.Mutex
// Pending file data buffer for batch insertion
// Files are flushed when all their chunks have been committed to DB
pendingFiles []pendingFileData
pendingFilesMu sync.Mutex
// Mutex for coordinating blob creation
packerMu sync.Mutex // Blocks chunk production during blob creation
// Context for cancellation
scanCtx context.Context
}
// ScannerConfig contains configuration for the scanner
type ScannerConfig struct {
FS afero.Fs
ChunkSize int64
Repositories *database.Repositories
Storage storage.Storer
MaxBlobSize int64
CompressionLevel int
AgeRecipients []string // Optional, empty means no encryption
EnableProgress bool // Enable progress reporting
Exclude []string // Glob patterns for files/directories to exclude
SkipErrors bool // Skip file read errors (log loudly but continue)
}
// ScanResult contains the results of a scan operation
type ScanResult struct {
FilesScanned int
FilesSkipped int
FilesDeleted int
BytesScanned int64
BytesSkipped int64
BytesDeleted int64
ChunksCreated int
BlobsCreated int
StartTime time.Time
EndTime time.Time
}
// NewScanner creates a new scanner instance
func NewScanner(cfg ScannerConfig) *Scanner {
// Create encryptor (required for blob packing)
if len(cfg.AgeRecipients) == 0 {
log.Error("No age recipients configured - encryption is required")
return nil
}
// Create blob packer with encryption
packerCfg := blob.PackerConfig{
MaxBlobSize: cfg.MaxBlobSize,
CompressionLevel: cfg.CompressionLevel,
Recipients: cfg.AgeRecipients,
Repositories: cfg.Repositories,
Fs: cfg.FS,
}
packer, err := blob.NewPacker(packerCfg)
if err != nil {
log.Error("Failed to create packer", "error", err)
return nil
}
var progress *ProgressReporter
if cfg.EnableProgress {
progress = NewProgressReporter()
}
// Compile exclude patterns
compiledExclude := compileExcludePatterns(cfg.Exclude)
return &Scanner{
fs: cfg.FS,
chunker: chunker.NewChunker(cfg.ChunkSize),
packer: packer,
repos: cfg.Repositories,
storage: cfg.Storage,
maxBlobSize: cfg.MaxBlobSize,
compressionLevel: cfg.CompressionLevel,
ageRecipient: strings.Join(cfg.AgeRecipients, ","),
exclude: cfg.Exclude,
compiledExclude: compiledExclude,
progress: progress,
skipErrors: cfg.SkipErrors,
pendingChunkHashes: make(map[string]struct{}),
}
}
// Scan scans a directory and populates the database
func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*ScanResult, error) {
s.snapshotID = snapshotID
s.currentSourcePath = path // Store source path for file records (used during restore)
s.scanCtx = ctx
result := &ScanResult{
StartTime: time.Now().UTC(),
}
// Set blob handler for concurrent upload
if s.storage != nil {
log.Debug("Setting blob handler for storage uploads")
s.packer.SetBlobHandler(s.handleBlobReady)
} else {
log.Debug("No storage configured, blobs will not be uploaded")
}
// Start progress reporting if enabled
if s.progress != nil {
s.progress.Start()
defer s.progress.Stop()
}
// Phase 0: Load known files and chunks from database into memory for fast lookup
fmt.Println("Loading known files from database...")
knownFiles, err := s.loadKnownFiles(ctx, path)
if err != nil {
return nil, fmt.Errorf("loading known files: %w", err)
}
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
fmt.Println("Loading known chunks from database...")
if err := s.loadKnownChunks(ctx); err != nil {
return nil, fmt.Errorf("loading known chunks: %w", err)
}
fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
// Phase 1: Scan directory, collect files to process, and track existing files
// (builds existingFiles map during walk to avoid double traversal)
log.Info("Phase 1/3: Scanning directory structure")
existingFiles := make(map[string]struct{})
scanResult, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles)
if err != nil {
return nil, fmt.Errorf("scan phase failed: %w", err)
}
filesToProcess := scanResult.FilesToProcess
// Phase 1b: Detect deleted files by comparing DB against scanned files
if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil {
return nil, fmt.Errorf("detecting deleted files: %w", err)
}
// Phase 1c: Associate unchanged files with this snapshot (no new records needed)
if len(scanResult.UnchangedFileIDs) > 0 {
fmt.Printf("Associating %s unchanged files with snapshot...\n", formatNumber(len(scanResult.UnchangedFileIDs)))
if err := s.batchAddFilesToSnapshot(ctx, scanResult.UnchangedFileIDs); err != nil {
return nil, fmt.Errorf("associating unchanged files: %w", err)
}
}
// Calculate total size to process
var totalSizeToProcess int64
for _, file := range filesToProcess {
totalSizeToProcess += file.FileInfo.Size()
}
// Update progress with total size and file count
if s.progress != nil {
s.progress.SetTotalSize(totalSizeToProcess)
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
}
log.Info("Phase 1 complete",
"total_files", len(filesToProcess),
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
"files_skipped", result.FilesSkipped,
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
// Print scan summary
fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)",
formatNumber(result.FilesScanned),
humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)),
formatNumber(len(filesToProcess)),
humanize.Bytes(uint64(totalSizeToProcess)))
if result.FilesDeleted > 0 {
fmt.Printf(", %s deleted (%s)",
formatNumber(result.FilesDeleted),
humanize.Bytes(uint64(result.BytesDeleted)))
}
fmt.Println()
// Phase 2: Process files and create chunks
if len(filesToProcess) > 0 {
fmt.Printf("Processing %s files...\n", formatNumber(len(filesToProcess)))
log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)")
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
return nil, fmt.Errorf("process phase failed: %w", err)
}
} else {
fmt.Printf("No files need processing. Creating metadata-only snapshot.\n")
log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)")
}
// Get final stats from packer
blobs := s.packer.GetFinishedBlobs()
result.BlobsCreated += len(blobs)
// Query database for actual blob count created during this snapshot
// The database is authoritative, especially for concurrent blob uploads
// We count uploads rather than all snapshot_blobs to get only NEW blobs
if s.snapshotID != "" {
uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID)
if err != nil {
log.Warn("Failed to query upload count from database", "error", err)
} else {
result.BlobsCreated = int(uploadCount)
}
}
result.EndTime = time.Now().UTC()
return result, nil
}
// loadKnownFiles loads all known files from the database into a map for fast lookup
// This avoids per-file database queries during the scan phase
func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*database.File, error) {
files, err := s.repos.Files.ListByPrefix(ctx, path)
if err != nil {
return nil, fmt.Errorf("listing files by prefix: %w", err)
}
result := make(map[string]*database.File, len(files))
for _, f := range files {
result[f.Path.String()] = f
}
return result, nil
}
// loadKnownChunks loads all known chunk hashes from the database into a map for fast lookup
// This avoids per-chunk database queries during file processing
func (s *Scanner) loadKnownChunks(ctx context.Context) error {
chunks, err := s.repos.Chunks.List(ctx)
if err != nil {
return fmt.Errorf("listing chunks: %w", err)
}
s.knownChunksMu.Lock()
s.knownChunks = make(map[string]struct{}, len(chunks))
for _, c := range chunks {
s.knownChunks[c.ChunkHash.String()] = struct{}{}
}
s.knownChunksMu.Unlock()
return nil
}
// chunkExists checks if a chunk hash exists in the in-memory cache
func (s *Scanner) chunkExists(hash string) bool {
s.knownChunksMu.RLock()
_, exists := s.knownChunks[hash]
s.knownChunksMu.RUnlock()
return exists
}
// addKnownChunk adds a chunk hash to the in-memory cache
func (s *Scanner) addKnownChunk(hash string) {
s.knownChunksMu.Lock()
s.knownChunks[hash] = struct{}{}
s.knownChunksMu.Unlock()
}
// addPendingChunkHash marks a chunk as pending (not yet committed to DB)
func (s *Scanner) addPendingChunkHash(hash string) {
s.pendingChunkHashesMu.Lock()
s.pendingChunkHashes[hash] = struct{}{}
s.pendingChunkHashesMu.Unlock()
}
// removePendingChunkHashes removes committed chunk hashes from the pending set
func (s *Scanner) removePendingChunkHashes(hashes []string) {
log.Debug("removePendingChunkHashes: starting", "count", len(hashes))
start := time.Now()
s.pendingChunkHashesMu.Lock()
for _, hash := range hashes {
delete(s.pendingChunkHashes, hash)
}
s.pendingChunkHashesMu.Unlock()
log.Debug("removePendingChunkHashes: done", "count", len(hashes), "duration", time.Since(start))
}
// isChunkPending returns true if the chunk is still pending (not yet committed to DB)
func (s *Scanner) isChunkPending(hash string) bool {
s.pendingChunkHashesMu.Lock()
_, pending := s.pendingChunkHashes[hash]
s.pendingChunkHashesMu.Unlock()
return pending
}
// addPendingFile adds a file to the pending buffer
// Files are NOT auto-flushed here - they are flushed when their chunks are committed
// (in handleBlobReady after blob finalize)
func (s *Scanner) addPendingFile(_ context.Context, data pendingFileData) {
s.pendingFilesMu.Lock()
s.pendingFiles = append(s.pendingFiles, data)
s.pendingFilesMu.Unlock()
}
// flushPendingFiles writes all pending files to the database in a single transaction
func (s *Scanner) flushPendingFiles(ctx context.Context) error {
s.pendingFilesMu.Lock()
files := s.pendingFiles
s.pendingFiles = nil
s.pendingFilesMu.Unlock()
if len(files) == 0 {
return nil
}
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
for _, data := range files {
// Create or update the file record
if err := s.repos.Files.Create(txCtx, tx, data.file); err != nil {
return fmt.Errorf("creating file record: %w", err)
}
// Delete any existing file_chunks and chunk_files for this file
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
return fmt.Errorf("deleting old file chunks: %w", err)
}
if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
return fmt.Errorf("deleting old chunk files: %w", err)
}
// Create file-chunk mappings
for i := range data.fileChunks {
if err := s.repos.FileChunks.Create(txCtx, tx, &data.fileChunks[i]); err != nil {
return fmt.Errorf("creating file chunk: %w", err)
}
}
// Create chunk-file mappings
for i := range data.chunkFiles {
if err := s.repos.ChunkFiles.Create(txCtx, tx, &data.chunkFiles[i]); err != nil {
return fmt.Errorf("creating chunk file: %w", err)
}
}
// Add file to snapshot
if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID); err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
}
return nil
})
}
// flushAllPending flushes all pending files to the database
func (s *Scanner) flushAllPending(ctx context.Context) error {
return s.flushPendingFiles(ctx)
}
// flushCompletedPendingFiles flushes only files whose chunks are all committed to DB
// Files with pending chunks are kept in the queue for later flushing
func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
flushStart := time.Now()
log.Debug("flushCompletedPendingFiles: starting")
log.Debug("flushCompletedPendingFiles: acquiring pendingFilesMu lock")
s.pendingFilesMu.Lock()
log.Debug("flushCompletedPendingFiles: acquired lock", "pending_files", len(s.pendingFiles))
// Separate files into complete (can flush) and incomplete (keep pending)
var canFlush []pendingFileData
var stillPending []pendingFileData
log.Debug("flushCompletedPendingFiles: checking which files can flush")
checkStart := time.Now()
for _, data := range s.pendingFiles {
allChunksCommitted := true
for _, fc := range data.fileChunks {
if s.isChunkPending(fc.ChunkHash.String()) {
allChunksCommitted = false
break
}
}
if allChunksCommitted {
canFlush = append(canFlush, data)
} else {
stillPending = append(stillPending, data)
}
}
log.Debug("flushCompletedPendingFiles: check done", "duration", time.Since(checkStart), "can_flush", len(canFlush), "still_pending", len(stillPending))
s.pendingFiles = stillPending
s.pendingFilesMu.Unlock()
log.Debug("flushCompletedPendingFiles: released lock")
if len(canFlush) == 0 {
log.Debug("flushCompletedPendingFiles: nothing to flush")
return nil
}
log.Debug("Flushing completed files after blob finalize",
"files_to_flush", len(canFlush),
"files_still_pending", len(stillPending))
// Collect all data for batch operations
log.Debug("flushCompletedPendingFiles: collecting data for batch ops")
collectStart := time.Now()
var allFileChunks []database.FileChunk
var allChunkFiles []database.ChunkFile
var allFileIDs []types.FileID
var allFiles []*database.File
for _, data := range canFlush {
allFileChunks = append(allFileChunks, data.fileChunks...)
allChunkFiles = append(allChunkFiles, data.chunkFiles...)
allFileIDs = append(allFileIDs, data.file.ID)
allFiles = append(allFiles, data.file)
}
log.Debug("flushCompletedPendingFiles: collected data",
"duration", time.Since(collectStart),
"file_chunks", len(allFileChunks),
"chunk_files", len(allChunkFiles),
"files", len(allFiles))
// Flush the complete files using batch operations
log.Debug("flushCompletedPendingFiles: starting transaction")
txStart := time.Now()
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
log.Debug("flushCompletedPendingFiles: inside transaction")
// Batch delete old file_chunks and chunk_files
log.Debug("flushCompletedPendingFiles: deleting old file_chunks")
opStart := time.Now()
if err := s.repos.FileChunks.DeleteByFileIDs(txCtx, tx, allFileIDs); err != nil {
return fmt.Errorf("batch deleting old file chunks: %w", err)
}
log.Debug("flushCompletedPendingFiles: deleted file_chunks", "duration", time.Since(opStart))
log.Debug("flushCompletedPendingFiles: deleting old chunk_files")
opStart = time.Now()
if err := s.repos.ChunkFiles.DeleteByFileIDs(txCtx, tx, allFileIDs); err != nil {
return fmt.Errorf("batch deleting old chunk files: %w", err)
}
log.Debug("flushCompletedPendingFiles: deleted chunk_files", "duration", time.Since(opStart))
// Batch create/update file records
log.Debug("flushCompletedPendingFiles: creating files")
opStart = time.Now()
if err := s.repos.Files.CreateBatch(txCtx, tx, allFiles); err != nil {
return fmt.Errorf("batch creating file records: %w", err)
}
log.Debug("flushCompletedPendingFiles: created files", "duration", time.Since(opStart))
// Batch insert file_chunks
log.Debug("flushCompletedPendingFiles: inserting file_chunks")
opStart = time.Now()
if err := s.repos.FileChunks.CreateBatch(txCtx, tx, allFileChunks); err != nil {
return fmt.Errorf("batch creating file chunks: %w", err)
}
log.Debug("flushCompletedPendingFiles: inserted file_chunks", "duration", time.Since(opStart))
// Batch insert chunk_files
log.Debug("flushCompletedPendingFiles: inserting chunk_files")
opStart = time.Now()
if err := s.repos.ChunkFiles.CreateBatch(txCtx, tx, allChunkFiles); err != nil {
return fmt.Errorf("batch creating chunk files: %w", err)
}
log.Debug("flushCompletedPendingFiles: inserted chunk_files", "duration", time.Since(opStart))
// Batch add files to snapshot
log.Debug("flushCompletedPendingFiles: adding files to snapshot")
opStart = time.Now()
if err := s.repos.Snapshots.AddFilesByIDBatch(txCtx, tx, s.snapshotID, allFileIDs); err != nil {
return fmt.Errorf("batch adding files to snapshot: %w", err)
}
log.Debug("flushCompletedPendingFiles: added files to snapshot", "duration", time.Since(opStart))
log.Debug("flushCompletedPendingFiles: transaction complete")
return nil
})
log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart))
log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart))
return err
}
// ScanPhaseResult contains the results of the scan phase
type ScanPhaseResult struct {
FilesToProcess []*FileToProcess
UnchangedFileIDs []types.FileID // IDs of unchanged files to associate with snapshot
}
// scanPhase performs the initial directory scan to identify files to process
// It uses the pre-loaded knownFiles map for fast change detection without DB queries
// It also populates existingFiles map for deletion detection
// Returns files needing processing and IDs of unchanged files for snapshot association
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) (*ScanPhaseResult, error) {
// Use known file count as estimate for progress (accurate for subsequent backups)
estimatedTotal := int64(len(knownFiles))
var filesToProcess []*FileToProcess
var unchangedFileIDs []types.FileID // Just IDs - no new records needed
var mu sync.Mutex
// Set up periodic status output
startTime := time.Now()
lastStatusTime := time.Now()
statusInterval := 15 * time.Second
var filesScanned int64
log.Debug("Starting directory walk", "path", path)
err := afero.Walk(s.fs, path, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
if s.skipErrors {
log.Error("ERROR: Failed to access file (skipping due to --skip-errors)", "path", filePath, "error", err)
fmt.Printf("ERROR: Failed to access %s: %v (skipping)\n", filePath, err)
return nil // Continue scanning
}
log.Debug("Error accessing filesystem entry", "path", filePath, "error", err)
return err
}
// Check context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Check exclude patterns - for directories, skip the entire subtree
if s.shouldExclude(filePath, path) {
if info.IsDir() {
return filepath.SkipDir
}
return nil
}
// Skip non-regular files for processing (but still count them)
if !info.Mode().IsRegular() {
return nil
}
// Track this file as existing (for deletion detection)
existingFiles[filePath] = struct{}{}
// Check file against in-memory map (no DB query!)
file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles)
mu.Lock()
if needsProcessing {
// New or changed file - will create record after processing
filesToProcess = append(filesToProcess, &FileToProcess{
Path: filePath,
FileInfo: info,
File: file,
})
} else if !file.ID.IsZero() {
// Unchanged file with existing ID - just need snapshot association
unchangedFileIDs = append(unchangedFileIDs, file.ID)
}
filesScanned++
changedCount := len(filesToProcess)
mu.Unlock()
// Update result stats
if needsProcessing {
result.BytesScanned += info.Size()
} else {
result.FilesSkipped++
result.BytesSkipped += info.Size()
}
result.FilesScanned++
// Output periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime)
rate := float64(filesScanned) / elapsed.Seconds()
// Build status line - use estimate if available (not first backup)
if estimatedTotal > 0 {
// Show actual scanned vs estimate (may exceed estimate if files were added)
pct := float64(filesScanned) / float64(estimatedTotal) * 100
if pct > 100 {
pct = 100 // Cap at 100% for display
}
remaining := estimatedTotal - filesScanned
if remaining < 0 {
remaining = 0
}
var eta time.Duration
if rate > 0 && remaining > 0 {
eta = time.Duration(float64(remaining)/rate) * time.Second
}
fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
formatNumber(int(filesScanned)),
pct,
formatNumber(changedCount),
rate,
elapsed.Round(time.Second))
if eta > 0 {
fmt.Printf(", ETA %s", eta.Round(time.Second))
}
fmt.Println()
} else {
// First backup - no estimate available
fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
formatNumber(int(filesScanned)),
formatNumber(changedCount),
rate,
elapsed.Round(time.Second))
}
lastStatusTime = time.Now()
}
return nil
})
if err != nil {
return nil, err
}
return &ScanPhaseResult{
FilesToProcess: filesToProcess,
UnchangedFileIDs: unchangedFileIDs,
}, nil
}
// checkFileInMemory checks if a file needs processing using the in-memory map
// No database access is performed - this is purely CPU/memory work
func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) {
// Get file stats
stat, ok := info.Sys().(interface {
Uid() uint32
Gid() uint32
})
var uid, gid uint32
if ok {
uid = stat.Uid()
gid = stat.Gid()
}
// Check against in-memory map first to get existing ID if available
existingFile, exists := knownFiles[path]
// Create file record with ID set upfront
// For new files, generate UUID immediately so it's available for chunk associations
// For existing files, reuse the existing ID
var fileID types.FileID
if exists {
fileID = existingFile.ID
} else {
fileID = types.NewFileID()
}
file := &database.File{
ID: fileID,
Path: types.FilePath(path),
SourcePath: types.SourcePath(s.currentSourcePath), // Store source directory for restore path stripping
MTime: info.ModTime(),
CTime: info.ModTime(), // afero doesn't provide ctime
Size: info.Size(),
Mode: uint32(info.Mode()),
UID: uid,
GID: gid,
}
// New file - needs processing
if !exists {
return file, true
}
// Check if file has changed
if existingFile.Size != file.Size ||
existingFile.MTime.Unix() != file.MTime.Unix() ||
existingFile.Mode != file.Mode ||
existingFile.UID != file.UID ||
existingFile.GID != file.GID {
return file, true
}
// File unchanged
return file, false
}
// batchAddFilesToSnapshot adds existing file IDs to the snapshot association table
// This is used for unchanged files that already have records in the database
func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []types.FileID) error {
const batchSize = 1000
startTime := time.Now()
lastStatusTime := time.Now()
statusInterval := 5 * time.Second
for i := 0; i < len(fileIDs); i += batchSize {
// Check context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
end := i + batchSize
if end > len(fileIDs) {
end = len(fileIDs)
}
batch := fileIDs[i:end]
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
for _, fileID := range batch {
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, fileID); err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
}
return nil
})
if err != nil {
return err
}
// Periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime)
rate := float64(end) / elapsed.Seconds()
pct := float64(end) / float64(len(fileIDs)) * 100
fmt.Printf("Associating files: %s/%s (%.1f%%), %.0f files/sec\n",
formatNumber(end), formatNumber(len(fileIDs)), pct, rate)
lastStatusTime = time.Now()
}
}
elapsed := time.Since(startTime)
rate := float64(len(fileIDs)) / elapsed.Seconds()
fmt.Printf("Associated %s unchanged files in %s (%.0f files/sec)\n",
formatNumber(len(fileIDs)), elapsed.Round(time.Second), rate)
return nil
}
// processPhase processes the files that need backing up
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
// Calculate total bytes to process
var totalBytes int64
for _, f := range filesToProcess {
totalBytes += f.FileInfo.Size()
}
// Set up periodic status output
lastStatusTime := time.Now()
statusInterval := 15 * time.Second
startTime := time.Now()
filesProcessed := 0
var bytesProcessed int64
totalFiles := len(filesToProcess)
// Process each file
for _, fileToProcess := range filesToProcess {
// Update progress
if s.progress != nil {
s.progress.GetStats().CurrentFile.Store(fileToProcess.Path)
}
// Process file in streaming fashion
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
// Handle files that were deleted between scan and process phases
if errors.Is(err, os.ErrNotExist) {
log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path)
result.FilesSkipped++
continue
}
// Skip file read errors if --skip-errors is enabled
if s.skipErrors {
log.Error("ERROR: Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err)
fmt.Printf("ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err)
result.FilesSkipped++
continue
}
return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
}
// Update files processed counter
if s.progress != nil {
s.progress.GetStats().FilesProcessed.Add(1)
}
filesProcessed++
bytesProcessed += fileToProcess.FileInfo.Size()
// Output periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime)
pct := float64(bytesProcessed) / float64(totalBytes) * 100
byteRate := float64(bytesProcessed) / elapsed.Seconds()
fileRate := float64(filesProcessed) / elapsed.Seconds()
// Calculate ETA based on bytes (more accurate than files)
remainingBytes := totalBytes - bytesProcessed
var eta time.Duration
if byteRate > 0 {
eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second
}
// Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s
fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s",
formatCompact(filesProcessed),
formatCompact(totalFiles),
humanize.Bytes(uint64(bytesProcessed)),
humanize.Bytes(uint64(totalBytes)),
pct,
humanize.Bytes(uint64(byteRate)),
fileRate,
elapsed.Round(time.Second))
if eta > 0 {
fmt.Printf(", ETA: %s", eta.Round(time.Second))
}
fmt.Println()
lastStatusTime = time.Now()
}
}
// Final packer flush first - this commits remaining chunks to DB
// and handleBlobReady will flush files whose chunks are now committed
s.packerMu.Lock()
if err := s.packer.Flush(); err != nil {
s.packerMu.Unlock()
return fmt.Errorf("flushing packer: %w", err)
}
s.packerMu.Unlock()
// Flush any remaining pending files (e.g., files with only pre-existing chunks
// that didn't trigger a blob finalize)
if err := s.flushAllPending(ctx); err != nil {
return fmt.Errorf("flushing remaining pending files: %w", err)
}
// If no storage configured, store any remaining blobs locally
if s.storage == nil {
blobs := s.packer.GetFinishedBlobs()
for _, b := range blobs {
// Blob metadata is already stored incrementally during packing
// Just add the blob to the snapshot
blobID, err := types.ParseBlobID(b.ID)
if err != nil {
return fmt.Errorf("parsing blob ID: %w", err)
}
err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, types.BlobHash(b.Hash))
})
if err != nil {
return fmt.Errorf("storing blob metadata: %w", err)
}
}
result.BlobsCreated += len(blobs)
}
return nil
}
// handleBlobReady is called by the packer when a blob is finalized
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
startTime := time.Now().UTC()
finishedBlob := blobWithReader.FinishedBlob
// Report upload start
if s.progress != nil {
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
}
// Upload to storage first (without holding any locks)
// Use scan context for cancellation support
ctx := s.scanCtx
if ctx == nil {
ctx = context.Background()
}
// Track bytes uploaded for accurate speed calculation
lastProgressTime := time.Now()
lastProgressBytes := int64(0)
progressCallback := func(uploaded int64) error {
// Calculate instantaneous speed
now := time.Now()
elapsed := now.Sub(lastProgressTime).Seconds()
if elapsed > 0.5 { // Update speed every 0.5 seconds
bytesSinceLastUpdate := uploaded - lastProgressBytes
speed := float64(bytesSinceLastUpdate) / elapsed
if s.progress != nil {
s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed)
}
lastProgressTime = now
lastProgressBytes = uploaded
}
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
// Create sharded path: blobs/ca/fe/cafebabe...
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err)
}
uploadDuration := time.Since(startTime)
// Calculate upload speed
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
// Print blob stored message
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
finishedBlob.Hash[:12]+"...",
humanize.Bytes(uint64(finishedBlob.Compressed)),
humanize.Bytes(uint64(uploadSpeedBps)),
uploadDuration.Round(time.Millisecond))
// Log upload stats
uploadSpeedBits := uploadSpeedBps * 8 // bits per second
log.Info("Successfully uploaded blob to storage",
"path", blobPath,
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
"duration", uploadDuration,
"speed", humanize.SI(uploadSpeedBits, "bps"))
// Report upload complete
if s.progress != nil {
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
}
// Update progress
if s.progress != nil {
stats := s.progress.GetStats()
stats.BlobsUploaded.Add(1)
stats.BytesUploaded.Add(finishedBlob.Compressed)
stats.BlobsCreated.Add(1)
}
// Store metadata in database (after upload is complete)
dbCtx := s.scanCtx
if dbCtx == nil {
dbCtx = context.Background()
}
// Parse blob ID for typed operations
finishedBlobID, err := types.ParseBlobID(finishedBlob.ID)
if err != nil {
return fmt.Errorf("parsing finished blob ID: %w", err)
}
err = s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error {
// Update blob upload timestamp
if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil {
return fmt.Errorf("updating blob upload timestamp: %w", err)
}
// Add the blob to the snapshot
if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil {
return fmt.Errorf("adding blob to snapshot: %w", err)
}
// Record upload metrics
upload := &database.Upload{
BlobHash: finishedBlob.Hash,
SnapshotID: s.snapshotID,
UploadedAt: startTime,
Size: finishedBlob.Compressed,
DurationMs: uploadDuration.Milliseconds(),
}
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
return fmt.Errorf("recording upload metrics: %w", err)
}
return nil
})
// Cleanup temp file if needed
if blobWithReader.TempFile != nil {
tempName := blobWithReader.TempFile.Name()
if err := blobWithReader.TempFile.Close(); err != nil {
log.Fatal("Failed to close temp file", "file", tempName, "error", err)
}
if err := s.fs.Remove(tempName); err != nil {
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
}
}
if err != nil {
return err
}
// Chunks from this blob are now committed to DB - remove from pending set
log.Debug("handleBlobReady: removing pending chunk hashes")
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
log.Debug("handleBlobReady: removed pending chunk hashes")
// Flush files whose chunks are now all committed
// This maintains database consistency after each blob
log.Debug("handleBlobReady: calling flushCompletedPendingFiles")
if err := s.flushCompletedPendingFiles(dbCtx); err != nil {
return fmt.Errorf("flushing completed files: %w", err)
}
log.Debug("handleBlobReady: flushCompletedPendingFiles returned")
log.Debug("handleBlobReady: complete")
return nil
}
// processFileStreaming processes a file by streaming chunks directly to the packer
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
// Open the file
file, err := s.fs.Open(fileToProcess.Path)
if err != nil {
return fmt.Errorf("opening file: %w", err)
}
defer func() { _ = file.Close() }()
// We'll collect file chunks for database storage
// but process them for packing as we go
type chunkInfo struct {
fileChunk database.FileChunk
offset int64
size int64
}
var chunks []chunkInfo
chunkIndex := 0
// Process chunks in streaming fashion and get full file hash
fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
log.Debug("Processing content-defined chunk from file",
"file", fileToProcess.Path,
"chunk_index", chunkIndex,
"hash", chunk.Hash,
"size", chunk.Size)
// Check if chunk already exists (fast in-memory lookup)
chunkExists := s.chunkExists(chunk.Hash)
// Queue new chunks for batch insert when blob finalizes
// This dramatically reduces transaction overhead
if !chunkExists {
s.packer.AddPendingChunk(chunk.Hash, chunk.Size)
// Add to in-memory cache immediately for fast duplicate detection
s.addKnownChunk(chunk.Hash)
// Track as pending until blob finalizes and commits to DB
s.addPendingChunkHash(chunk.Hash)
}
// Track file chunk association for later storage
chunks = append(chunks, chunkInfo{
fileChunk: database.FileChunk{
FileID: fileToProcess.File.ID,
Idx: chunkIndex,
ChunkHash: types.ChunkHash(chunk.Hash),
},
offset: chunk.Offset,
size: chunk.Size,
})
// Update stats
if chunkExists {
result.FilesSkipped++ // Track as skipped for now
result.BytesSkipped += chunk.Size
if s.progress != nil {
s.progress.GetStats().BytesSkipped.Add(chunk.Size)
}
} else {
result.ChunksCreated++
result.BytesScanned += chunk.Size
if s.progress != nil {
s.progress.GetStats().ChunksCreated.Add(1)
s.progress.GetStats().BytesProcessed.Add(chunk.Size)
s.progress.UpdateChunkingActivity()
}
}
// Add chunk to packer immediately (streaming)
// This happens outside the database transaction
if !chunkExists {
s.packerMu.Lock()
err := s.packer.AddChunk(&blob.ChunkRef{
Hash: chunk.Hash,
Data: chunk.Data,
})
if err == blob.ErrBlobSizeLimitExceeded {
// Finalize current blob and retry
if err := s.packer.FinalizeBlob(); err != nil {
s.packerMu.Unlock()
return fmt.Errorf("finalizing blob: %w", err)
}
// Retry adding the chunk
if err := s.packer.AddChunk(&blob.ChunkRef{
Hash: chunk.Hash,
Data: chunk.Data,
}); err != nil {
s.packerMu.Unlock()
return fmt.Errorf("adding chunk after finalize: %w", err)
}
} else if err != nil {
s.packerMu.Unlock()
return fmt.Errorf("adding chunk to packer: %w", err)
}
s.packerMu.Unlock()
}
// Clear chunk data from memory immediately after use
chunk.Data = nil
chunkIndex++
return nil
})
if err != nil {
return fmt.Errorf("chunking file: %w", err)
}
log.Debug("Completed snapshotting file",
"path", fileToProcess.Path,
"file_hash", fileHash,
"chunks", len(chunks))
// Build file data for batch insertion
// Update chunk associations with the file ID
fileChunks := make([]database.FileChunk, len(chunks))
chunkFiles := make([]database.ChunkFile, len(chunks))
for i, ci := range chunks {
fileChunks[i] = database.FileChunk{
FileID: fileToProcess.File.ID,
Idx: ci.fileChunk.Idx,
ChunkHash: ci.fileChunk.ChunkHash,
}
chunkFiles[i] = database.ChunkFile{
ChunkHash: ci.fileChunk.ChunkHash,
FileID: fileToProcess.File.ID,
FileOffset: ci.offset,
Length: ci.size,
}
}
// Queue file for batch insertion
// Files will be flushed when their chunks are committed (after blob finalize)
s.addPendingFile(ctx, pendingFileData{
file: fileToProcess.File,
fileChunks: fileChunks,
chunkFiles: chunkFiles,
})
return nil
}
// GetProgress returns the progress reporter for this scanner
func (s *Scanner) GetProgress() *ProgressReporter {
return s.progress
}
// detectDeletedFilesFromMap finds files that existed in previous snapshots but no longer exist
// Uses pre-loaded maps to avoid any filesystem or database access
func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[string]*database.File, existingFiles map[string]struct{}, result *ScanResult) error {
if len(knownFiles) == 0 {
return nil
}
// Check each known file against the enumerated set (no filesystem access needed)
for path, file := range knownFiles {
// Check context cancellation periodically
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Check if the file exists in our enumerated set
if _, exists := existingFiles[path]; !exists {
// File has been deleted
result.FilesDeleted++
result.BytesDeleted += file.Size
log.Debug("Detected deleted file", "path", path, "size", file.Size)
}
}
if result.FilesDeleted > 0 {
fmt.Printf("Found %s deleted files\n", formatNumber(result.FilesDeleted))
}
return nil
}
// compileExcludePatterns compiles the exclude patterns into glob matchers
func compileExcludePatterns(patterns []string) []compiledPattern {
var compiled []compiledPattern
for _, p := range patterns {
if p == "" {
continue
}
// Check if pattern is anchored (starts with /)
anchored := strings.HasPrefix(p, "/")
pattern := p
if anchored {
pattern = p[1:] // Remove leading /
}
// Remove trailing slash if present (directory indicator)
pattern = strings.TrimSuffix(pattern, "/")
// Compile the glob pattern
// For patterns without path separators, we need to match them as components
// e.g., ".git" should match ".git" anywhere in the path
g, err := glob.Compile(pattern, '/')
if err != nil {
log.Warn("Invalid exclude pattern, skipping", "pattern", p, "error", err)
continue
}
compiled = append(compiled, compiledPattern{
pattern: g,
anchored: anchored,
original: p,
})
}
return compiled
}
// shouldExclude checks if a path should be excluded based on exclude patterns
// filePath is the full path to the file
// rootPath is the root of the backup source directory
func (s *Scanner) shouldExclude(filePath, rootPath string) bool {
if len(s.compiledExclude) == 0 {
return false
}
// Get the relative path from root
relPath, err := filepath.Rel(rootPath, filePath)
if err != nil {
return false
}
// Never exclude the root directory itself
if relPath == "." {
return false
}
// Normalize path separators
relPath = filepath.ToSlash(relPath)
// Check each pattern
for _, cp := range s.compiledExclude {
if cp.anchored {
// Anchored pattern: must match from the root
// Match the relative path directly
if cp.pattern.Match(relPath) {
return true
}
// Also check if any prefix of the path matches (for directory patterns)
parts := strings.Split(relPath, "/")
for i := 1; i <= len(parts); i++ {
prefix := strings.Join(parts[:i], "/")
if cp.pattern.Match(prefix) {
return true
}
}
} else {
// Unanchored pattern: can match anywhere in path
// Check the full relative path
if cp.pattern.Match(relPath) {
return true
}
// Check each path component and subpath
parts := strings.Split(relPath, "/")
for i := range parts {
// Match individual component (e.g., ".git" matches ".git" directory)
if cp.pattern.Match(parts[i]) {
return true
}
// Match subpath from this component onwards
subpath := strings.Join(parts[i:], "/")
if cp.pattern.Match(subpath) {
return true
}
}
}
}
return false
}
// formatNumber formats a number with comma separators
func formatNumber(n int) string {
if n < 1000 {
return fmt.Sprintf("%d", n)
}
return humanize.Comma(int64(n))
}
// formatCompact formats a number compactly with k/M suffixes (e.g., 5.7k, 1.2M)
func formatCompact(n int) string {
if n < 1000 {
return fmt.Sprintf("%d", n)
}
if n < 10000 {
return fmt.Sprintf("%.1fk", float64(n)/1000)
}
if n < 1000000 {
return fmt.Sprintf("%.0fk", float64(n)/1000)
}
return fmt.Sprintf("%.1fM", float64(n)/1000000)
}