All user-facing output now goes through a single ui.Writer with a
uniform style:
》 (white) for begin / info / notice
》 (green) for complete / success
Warning: for warnings (orange)
ERROR: for errors (red)
》 (indented) for progress heartbeats
Color is enabled when stdout is a TTY and NO_COLOR is unset.
Standards:
- Complete-sentence messages with fully qualified terms ("backup
destination store", "local index database", "snapshot source
files enumeration").
- Every Complete has a matching Begin.
- Natural verb tense conveys state ("Uploading" -> "Uploaded"). The
words "begin"/"complete" never appear in message bodies; the marker
color carries that information.
- ETA means clock time, not duration. Progress lines say "estimated
remaining time (<dur>), finish at <time>" with both labeled.
Adds globals.CommitDate (populated by Makefile/Dockerfile/goreleaser
via ldflags from `git show -s --format=%cI HEAD`) and a startup banner
printed once per invocation.
Strips fx call-chain noise from startup errors so users see the actual
underlying error (e.g. "creating base path: mkdir /Volumes/BACKUPS:
permission denied" instead of three layers of "could not build
arguments for function ...").
README documents the output style and the ui package conventions.
1613 lines
53 KiB
Go
1613 lines
53 KiB
Go
package snapshot
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/gobwas/glob"
|
|
"github.com/spf13/afero"
|
|
"sneak.berlin/go/vaultik/internal/blob"
|
|
"sneak.berlin/go/vaultik/internal/chunker"
|
|
"sneak.berlin/go/vaultik/internal/database"
|
|
"sneak.berlin/go/vaultik/internal/log"
|
|
"sneak.berlin/go/vaultik/internal/storage"
|
|
"sneak.berlin/go/vaultik/internal/types"
|
|
"sneak.berlin/go/vaultik/internal/ui"
|
|
)
|
|
|
|
// FileToProcess holds information about a file that needs processing
|
|
type FileToProcess struct {
|
|
Path string
|
|
FileInfo os.FileInfo
|
|
File *database.File
|
|
}
|
|
|
|
// pendingFileData holds all data needed to commit a file to the database
|
|
type pendingFileData struct {
|
|
file *database.File
|
|
fileChunks []database.FileChunk
|
|
chunkFiles []database.ChunkFile
|
|
}
|
|
|
|
// compiledPattern holds a compiled glob pattern and whether it's anchored
|
|
type compiledPattern struct {
|
|
pattern glob.Glob
|
|
anchored bool // If true, only matches from root of source dir
|
|
original string
|
|
}
|
|
|
|
// Scanner scans directories and populates the database with file and chunk information
|
|
type Scanner struct {
|
|
fs afero.Fs
|
|
chunker *chunker.Chunker
|
|
packer *blob.Packer
|
|
repos *database.Repositories
|
|
storage storage.Storer
|
|
maxBlobSize int64
|
|
compressionLevel int
|
|
ageRecipient string
|
|
snapshotID string // Current snapshot being processed
|
|
currentSourcePath string // Current source directory being scanned (for restore path stripping)
|
|
exclude []string // Glob patterns for files/directories to exclude
|
|
compiledExclude []compiledPattern // Compiled glob patterns
|
|
progress *ProgressReporter
|
|
skipErrors bool // Skip file read errors (log loudly but continue)
|
|
ui *ui.Writer // User-facing output; never nil (defaults to a discarding writer)
|
|
|
|
// In-memory cache of known chunk hashes for fast existence checks
|
|
knownChunks map[string]struct{}
|
|
knownChunksMu sync.RWMutex
|
|
|
|
// Pending chunk hashes - chunks that have been added to packer but not yet committed to DB
|
|
// When a blob finalizes, the committed chunks are removed from this set
|
|
pendingChunkHashes map[string]struct{}
|
|
pendingChunkHashesMu sync.Mutex
|
|
|
|
// Pending file data buffer for batch insertion
|
|
// Files are flushed when all their chunks have been committed to DB
|
|
pendingFiles []pendingFileData
|
|
pendingFilesMu sync.Mutex
|
|
|
|
// Mutex for coordinating blob creation
|
|
packerMu sync.Mutex // Blocks chunk production during blob creation
|
|
|
|
// Context for cancellation
|
|
scanCtx context.Context
|
|
}
|
|
|
|
// ScannerConfig contains configuration for the scanner
|
|
type ScannerConfig struct {
|
|
FS afero.Fs
|
|
ChunkSize int64
|
|
Repositories *database.Repositories
|
|
Storage storage.Storer
|
|
MaxBlobSize int64
|
|
CompressionLevel int
|
|
AgeRecipients []string // Optional, empty means no encryption
|
|
EnableProgress bool // Enable the live progress reporter (ETAs, throughput)
|
|
UI *ui.Writer // Where user-facing scanner messages go; nil = discard
|
|
Exclude []string // Glob patterns for files/directories to exclude
|
|
SkipErrors bool // Skip file read errors (log loudly but continue)
|
|
}
|
|
|
|
// ScanResult contains the results of a scan operation
|
|
type ScanResult struct {
|
|
FilesScanned int
|
|
FilesSkipped int
|
|
FilesDeleted int
|
|
BytesScanned int64
|
|
BytesSkipped int64
|
|
BytesDeleted int64
|
|
ChunksCreated int
|
|
BlobsCreated int
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
}
|
|
|
|
// NewScanner creates a new scanner instance
|
|
func NewScanner(cfg ScannerConfig) *Scanner {
|
|
// Create encryptor (required for blob packing)
|
|
if len(cfg.AgeRecipients) == 0 {
|
|
log.Error("No age recipients configured - encryption is required")
|
|
return nil
|
|
}
|
|
|
|
// Create blob packer with encryption
|
|
packerCfg := blob.PackerConfig{
|
|
MaxBlobSize: cfg.MaxBlobSize,
|
|
CompressionLevel: cfg.CompressionLevel,
|
|
Recipients: cfg.AgeRecipients,
|
|
Repositories: cfg.Repositories,
|
|
Fs: cfg.FS,
|
|
}
|
|
packer, err := blob.NewPacker(packerCfg)
|
|
if err != nil {
|
|
log.Error("Failed to create packer", "error", err)
|
|
return nil
|
|
}
|
|
|
|
var progress *ProgressReporter
|
|
if cfg.EnableProgress {
|
|
progress = NewProgressReporter()
|
|
}
|
|
|
|
// Compile exclude patterns
|
|
compiledExclude := compileExcludePatterns(cfg.Exclude)
|
|
|
|
uiw := cfg.UI
|
|
if uiw == nil {
|
|
uiw = ui.NewWithColor(io.Discard, false)
|
|
}
|
|
|
|
return &Scanner{
|
|
fs: cfg.FS,
|
|
chunker: chunker.NewChunker(cfg.ChunkSize),
|
|
packer: packer,
|
|
repos: cfg.Repositories,
|
|
storage: cfg.Storage,
|
|
maxBlobSize: cfg.MaxBlobSize,
|
|
compressionLevel: cfg.CompressionLevel,
|
|
ageRecipient: strings.Join(cfg.AgeRecipients, ","),
|
|
exclude: cfg.Exclude,
|
|
compiledExclude: compiledExclude,
|
|
progress: progress,
|
|
skipErrors: cfg.SkipErrors,
|
|
ui: uiw,
|
|
pendingChunkHashes: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
// Scan scans a directory and populates the database
|
|
func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*ScanResult, error) {
|
|
s.snapshotID = snapshotID
|
|
s.currentSourcePath = path // Store source path for file records (used during restore)
|
|
s.scanCtx = ctx
|
|
result := &ScanResult{
|
|
StartTime: time.Now().UTC(),
|
|
}
|
|
|
|
// Set blob handler for concurrent upload
|
|
if s.storage != nil {
|
|
log.Debug("Setting blob handler for storage uploads")
|
|
s.packer.SetBlobHandler(s.handleBlobReady)
|
|
} else {
|
|
log.Debug("No storage configured, blobs will not be uploaded")
|
|
}
|
|
|
|
// Start progress reporting if enabled
|
|
if s.progress != nil {
|
|
s.progress.Start()
|
|
defer s.progress.Stop()
|
|
}
|
|
|
|
// Phase 0: Load known files and chunks from database into memory for fast lookup
|
|
knownFiles, err := s.loadDatabaseState(ctx, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Phase 1: Scan directory, collect files to process, and track existing files
|
|
// (builds existingFiles map during walk to avoid double traversal)
|
|
log.Info("Phase 1/3: Scanning directory structure")
|
|
existingFiles := make(map[string]struct{})
|
|
scanResult, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scan phase failed: %w", err)
|
|
}
|
|
filesToProcess := scanResult.FilesToProcess
|
|
|
|
// Phase 1b: Detect deleted files by comparing DB against scanned files
|
|
if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil {
|
|
return nil, fmt.Errorf("detecting deleted files: %w", err)
|
|
}
|
|
|
|
// Phase 1c: Associate unchanged files with this snapshot (no new records needed)
|
|
if len(scanResult.UnchangedFileIDs) > 0 {
|
|
s.ui.Begin("Associating %s unchanged files with the snapshot.", s.ui.Count(len(scanResult.UnchangedFileIDs)))
|
|
if err := s.batchAddFilesToSnapshot(ctx, scanResult.UnchangedFileIDs); err != nil {
|
|
return nil, fmt.Errorf("associating unchanged files: %w", err)
|
|
}
|
|
}
|
|
|
|
// Summarize scan phase results and update progress
|
|
s.summarizeScanPhase(result, filesToProcess)
|
|
|
|
// Phase 2: Process files and create chunks
|
|
if len(filesToProcess) > 0 {
|
|
s.ui.Begin("Processing %s snapshot source files (chunking, compressing, encrypting, uploading).", s.ui.Count(len(filesToProcess)))
|
|
log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)")
|
|
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
|
|
return nil, fmt.Errorf("process phase failed: %w", err)
|
|
}
|
|
} else {
|
|
s.ui.Info("Snapshot file processing skipped: no changed files (creating metadata-only snapshot).")
|
|
log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)")
|
|
}
|
|
|
|
// Finalize result with blob statistics
|
|
s.finalizeScanResult(ctx, result)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// loadDatabaseState loads known files and chunks from the database into memory for fast lookup
|
|
// This avoids per-file and per-chunk database queries during the scan and process phases
|
|
func (s *Scanner) loadDatabaseState(ctx context.Context, path string) (map[string]*database.File, error) {
|
|
s.ui.Begin("Loading known files from local index database.")
|
|
knownFiles, err := s.loadKnownFiles(ctx, path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("loading known files: %w", err)
|
|
}
|
|
s.ui.Complete("Loaded %s known files from local index database.", s.ui.Count(len(knownFiles)))
|
|
|
|
s.ui.Begin("Loading known chunks from local index database.")
|
|
if err := s.loadKnownChunks(ctx); err != nil {
|
|
return nil, fmt.Errorf("loading known chunks: %w", err)
|
|
}
|
|
s.ui.Complete("Loaded %s known chunks from local index database.", s.ui.Count(len(s.knownChunks)))
|
|
|
|
return knownFiles, nil
|
|
}
|
|
|
|
// summarizeScanPhase calculates total size to process, updates progress tracking,
|
|
// and prints the scan phase summary with file counts and sizes
|
|
func (s *Scanner) summarizeScanPhase(result *ScanResult, filesToProcess []*FileToProcess) {
|
|
var totalSizeToProcess int64
|
|
for _, file := range filesToProcess {
|
|
totalSizeToProcess += file.FileInfo.Size()
|
|
}
|
|
|
|
if s.progress != nil {
|
|
s.progress.SetTotalSize(totalSizeToProcess)
|
|
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
|
|
}
|
|
|
|
log.Info("Phase 1 complete",
|
|
"total_files", len(filesToProcess),
|
|
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
|
|
"files_skipped", result.FilesSkipped,
|
|
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
|
|
|
|
msg := fmt.Sprintf("Enumerated %s snapshot source files (%s total), %s to process (%s)",
|
|
s.ui.Count(result.FilesScanned),
|
|
s.ui.Size(totalSizeToProcess+result.BytesSkipped),
|
|
s.ui.Count(len(filesToProcess)),
|
|
s.ui.Size(totalSizeToProcess))
|
|
if result.FilesDeleted > 0 {
|
|
msg += fmt.Sprintf(", %s deleted (%s)",
|
|
s.ui.Count(result.FilesDeleted),
|
|
s.ui.Size(result.BytesDeleted))
|
|
}
|
|
s.ui.Complete("%s.", msg)
|
|
}
|
|
|
|
// finalizeScanResult populates final blob statistics in the scan result
|
|
// by querying the packer and database for blob/upload counts
|
|
func (s *Scanner) finalizeScanResult(ctx context.Context, result *ScanResult) {
|
|
blobs := s.packer.GetFinishedBlobs()
|
|
result.BlobsCreated += len(blobs)
|
|
|
|
// Query database for actual blob count created during this snapshot
|
|
// The database is authoritative, especially for concurrent blob uploads
|
|
// We count uploads rather than all snapshot_blobs to get only NEW blobs
|
|
if s.snapshotID != "" {
|
|
uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID)
|
|
if err != nil {
|
|
log.Warn("Failed to query upload count from database", "error", err)
|
|
} else {
|
|
result.BlobsCreated = int(uploadCount)
|
|
}
|
|
}
|
|
|
|
result.EndTime = time.Now().UTC()
|
|
}
|
|
|
|
// loadKnownFiles loads all known files from the database into a map for fast lookup
|
|
// This avoids per-file database queries during the scan phase
|
|
func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*database.File, error) {
|
|
files, err := s.repos.Files.ListByPrefix(ctx, path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing files by prefix: %w", err)
|
|
}
|
|
|
|
result := make(map[string]*database.File, len(files))
|
|
for _, f := range files {
|
|
result[f.Path.String()] = f
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// loadKnownChunks loads all known chunk hashes from the database into a map for fast lookup
|
|
// This avoids per-chunk database queries during file processing
|
|
func (s *Scanner) loadKnownChunks(ctx context.Context) error {
|
|
chunks, err := s.repos.Chunks.List(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("listing chunks: %w", err)
|
|
}
|
|
|
|
s.knownChunksMu.Lock()
|
|
s.knownChunks = make(map[string]struct{}, len(chunks))
|
|
for _, c := range chunks {
|
|
s.knownChunks[c.ChunkHash.String()] = struct{}{}
|
|
}
|
|
s.knownChunksMu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// chunkExists checks if a chunk hash exists in the in-memory cache
|
|
func (s *Scanner) chunkExists(hash string) bool {
|
|
s.knownChunksMu.RLock()
|
|
_, exists := s.knownChunks[hash]
|
|
s.knownChunksMu.RUnlock()
|
|
return exists
|
|
}
|
|
|
|
// addKnownChunk adds a chunk hash to the in-memory cache
|
|
func (s *Scanner) addKnownChunk(hash string) {
|
|
s.knownChunksMu.Lock()
|
|
s.knownChunks[hash] = struct{}{}
|
|
s.knownChunksMu.Unlock()
|
|
}
|
|
|
|
// addPendingChunkHash marks a chunk as pending (not yet committed to DB)
|
|
func (s *Scanner) addPendingChunkHash(hash string) {
|
|
s.pendingChunkHashesMu.Lock()
|
|
s.pendingChunkHashes[hash] = struct{}{}
|
|
s.pendingChunkHashesMu.Unlock()
|
|
}
|
|
|
|
// removePendingChunkHashes removes committed chunk hashes from the pending set
|
|
func (s *Scanner) removePendingChunkHashes(hashes []string) {
|
|
log.Debug("removePendingChunkHashes: starting", "count", len(hashes))
|
|
start := time.Now()
|
|
s.pendingChunkHashesMu.Lock()
|
|
for _, hash := range hashes {
|
|
delete(s.pendingChunkHashes, hash)
|
|
}
|
|
s.pendingChunkHashesMu.Unlock()
|
|
log.Debug("removePendingChunkHashes: done", "count", len(hashes), "duration", time.Since(start))
|
|
}
|
|
|
|
// isChunkPending returns true if the chunk is still pending (not yet committed to DB)
|
|
func (s *Scanner) isChunkPending(hash string) bool {
|
|
s.pendingChunkHashesMu.Lock()
|
|
_, pending := s.pendingChunkHashes[hash]
|
|
s.pendingChunkHashesMu.Unlock()
|
|
return pending
|
|
}
|
|
|
|
// addPendingFile adds a file to the pending buffer
|
|
// Files are NOT auto-flushed here - they are flushed when their chunks are committed
|
|
// (in handleBlobReady after blob finalize)
|
|
func (s *Scanner) addPendingFile(_ context.Context, data pendingFileData) {
|
|
s.pendingFilesMu.Lock()
|
|
s.pendingFiles = append(s.pendingFiles, data)
|
|
s.pendingFilesMu.Unlock()
|
|
}
|
|
|
|
// flushPendingFiles writes all pending files to the database in a single transaction
|
|
func (s *Scanner) flushPendingFiles(ctx context.Context) error {
|
|
s.pendingFilesMu.Lock()
|
|
files := s.pendingFiles
|
|
s.pendingFiles = nil
|
|
s.pendingFilesMu.Unlock()
|
|
|
|
if len(files) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
|
for _, data := range files {
|
|
// Create or update the file record
|
|
if err := s.repos.Files.Create(txCtx, tx, data.file); err != nil {
|
|
return fmt.Errorf("creating file record: %w", err)
|
|
}
|
|
|
|
// Delete any existing file_chunks and chunk_files for this file
|
|
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
|
|
return fmt.Errorf("deleting old file chunks: %w", err)
|
|
}
|
|
if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
|
|
return fmt.Errorf("deleting old chunk files: %w", err)
|
|
}
|
|
|
|
// Create file-chunk mappings
|
|
for i := range data.fileChunks {
|
|
if err := s.repos.FileChunks.Create(txCtx, tx, &data.fileChunks[i]); err != nil {
|
|
return fmt.Errorf("creating file chunk: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create chunk-file mappings
|
|
for i := range data.chunkFiles {
|
|
if err := s.repos.ChunkFiles.Create(txCtx, tx, &data.chunkFiles[i]); err != nil {
|
|
return fmt.Errorf("creating chunk file: %w", err)
|
|
}
|
|
}
|
|
|
|
// Add file to snapshot
|
|
if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID); err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// flushAllPending flushes all pending files to the database
|
|
func (s *Scanner) flushAllPending(ctx context.Context) error {
|
|
return s.flushPendingFiles(ctx)
|
|
}
|
|
|
|
// flushCompletedPendingFiles flushes only files whose chunks are all committed to DB
|
|
// Files with pending chunks are kept in the queue for later flushing
|
|
func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
|
|
flushStart := time.Now()
|
|
log.Debug("flushCompletedPendingFiles: starting")
|
|
|
|
// Partition pending files into those ready to flush and those still waiting
|
|
canFlush, stillPendingCount := s.partitionPendingByChunkStatus()
|
|
|
|
if len(canFlush) == 0 {
|
|
log.Debug("flushCompletedPendingFiles: nothing to flush")
|
|
return nil
|
|
}
|
|
|
|
log.Debug("Flushing completed files after blob finalize",
|
|
"files_to_flush", len(canFlush),
|
|
"files_still_pending", stillPendingCount)
|
|
|
|
// Collect all data for batch operations
|
|
allFiles, allFileIDs, allFileChunks, allChunkFiles := s.collectBatchFlushData(canFlush)
|
|
|
|
// Execute the batch flush in a single transaction
|
|
log.Debug("flushCompletedPendingFiles: starting transaction")
|
|
txStart := time.Now()
|
|
err := s.executeBatchFileFlush(ctx, allFiles, allFileIDs, allFileChunks, allChunkFiles)
|
|
log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart))
|
|
log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart))
|
|
return err
|
|
}
|
|
|
|
// partitionPendingByChunkStatus separates pending files into those whose chunks
|
|
// are all committed to DB (ready to flush) and those still waiting on pending chunks.
|
|
// Updates s.pendingFiles to contain only the still-pending files.
|
|
func (s *Scanner) partitionPendingByChunkStatus() (canFlush []pendingFileData, stillPendingCount int) {
|
|
log.Debug("flushCompletedPendingFiles: acquiring pendingFilesMu lock")
|
|
s.pendingFilesMu.Lock()
|
|
log.Debug("flushCompletedPendingFiles: acquired lock", "pending_files", len(s.pendingFiles))
|
|
|
|
var stillPending []pendingFileData
|
|
|
|
log.Debug("flushCompletedPendingFiles: checking which files can flush")
|
|
checkStart := time.Now()
|
|
for _, data := range s.pendingFiles {
|
|
allChunksCommitted := true
|
|
for _, fc := range data.fileChunks {
|
|
if s.isChunkPending(fc.ChunkHash.String()) {
|
|
allChunksCommitted = false
|
|
break
|
|
}
|
|
}
|
|
if allChunksCommitted {
|
|
canFlush = append(canFlush, data)
|
|
} else {
|
|
stillPending = append(stillPending, data)
|
|
}
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: check done", "duration", time.Since(checkStart), "can_flush", len(canFlush), "still_pending", len(stillPending))
|
|
|
|
s.pendingFiles = stillPending
|
|
s.pendingFilesMu.Unlock()
|
|
log.Debug("flushCompletedPendingFiles: released lock")
|
|
|
|
return canFlush, len(stillPending)
|
|
}
|
|
|
|
// collectBatchFlushData aggregates file records, IDs, file-chunk mappings, and chunk-file
|
|
// mappings from the given pending file data for efficient batch database operations
|
|
func (s *Scanner) collectBatchFlushData(canFlush []pendingFileData) ([]*database.File, []types.FileID, []database.FileChunk, []database.ChunkFile) {
|
|
log.Debug("flushCompletedPendingFiles: collecting data for batch ops")
|
|
collectStart := time.Now()
|
|
|
|
var allFileChunks []database.FileChunk
|
|
var allChunkFiles []database.ChunkFile
|
|
var allFileIDs []types.FileID
|
|
var allFiles []*database.File
|
|
|
|
for _, data := range canFlush {
|
|
allFileChunks = append(allFileChunks, data.fileChunks...)
|
|
allChunkFiles = append(allChunkFiles, data.chunkFiles...)
|
|
allFileIDs = append(allFileIDs, data.file.ID)
|
|
allFiles = append(allFiles, data.file)
|
|
}
|
|
|
|
log.Debug("flushCompletedPendingFiles: collected data",
|
|
"duration", time.Since(collectStart),
|
|
"file_chunks", len(allFileChunks),
|
|
"chunk_files", len(allChunkFiles),
|
|
"files", len(allFiles))
|
|
|
|
return allFiles, allFileIDs, allFileChunks, allChunkFiles
|
|
}
|
|
|
|
// executeBatchFileFlush writes all collected file data to the database in a single transaction,
|
|
// including deleting old mappings, creating file records, and adding snapshot associations
|
|
func (s *Scanner) executeBatchFileFlush(ctx context.Context, allFiles []*database.File, allFileIDs []types.FileID, allFileChunks []database.FileChunk, allChunkFiles []database.ChunkFile) error {
|
|
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
|
log.Debug("flushCompletedPendingFiles: inside transaction")
|
|
|
|
// Batch delete old file_chunks and chunk_files
|
|
log.Debug("flushCompletedPendingFiles: deleting old file_chunks")
|
|
opStart := time.Now()
|
|
if err := s.repos.FileChunks.DeleteByFileIDs(txCtx, tx, allFileIDs); err != nil {
|
|
return fmt.Errorf("batch deleting old file chunks: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: deleted file_chunks", "duration", time.Since(opStart))
|
|
|
|
log.Debug("flushCompletedPendingFiles: deleting old chunk_files")
|
|
opStart = time.Now()
|
|
if err := s.repos.ChunkFiles.DeleteByFileIDs(txCtx, tx, allFileIDs); err != nil {
|
|
return fmt.Errorf("batch deleting old chunk files: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: deleted chunk_files", "duration", time.Since(opStart))
|
|
|
|
// Batch create/update file records
|
|
log.Debug("flushCompletedPendingFiles: creating files")
|
|
opStart = time.Now()
|
|
if err := s.repos.Files.CreateBatch(txCtx, tx, allFiles); err != nil {
|
|
return fmt.Errorf("batch creating file records: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: created files", "duration", time.Since(opStart))
|
|
|
|
// Batch insert file_chunks
|
|
log.Debug("flushCompletedPendingFiles: inserting file_chunks")
|
|
opStart = time.Now()
|
|
if err := s.repos.FileChunks.CreateBatch(txCtx, tx, allFileChunks); err != nil {
|
|
return fmt.Errorf("batch creating file chunks: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: inserted file_chunks", "duration", time.Since(opStart))
|
|
|
|
// Batch insert chunk_files
|
|
log.Debug("flushCompletedPendingFiles: inserting chunk_files")
|
|
opStart = time.Now()
|
|
if err := s.repos.ChunkFiles.CreateBatch(txCtx, tx, allChunkFiles); err != nil {
|
|
return fmt.Errorf("batch creating chunk files: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: inserted chunk_files", "duration", time.Since(opStart))
|
|
|
|
// Batch add files to snapshot
|
|
log.Debug("flushCompletedPendingFiles: adding files to snapshot")
|
|
opStart = time.Now()
|
|
if err := s.repos.Snapshots.AddFilesByIDBatch(txCtx, tx, s.snapshotID, allFileIDs); err != nil {
|
|
return fmt.Errorf("batch adding files to snapshot: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: added files to snapshot", "duration", time.Since(opStart))
|
|
|
|
log.Debug("flushCompletedPendingFiles: transaction complete")
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ScanPhaseResult contains the results of the scan phase
|
|
type ScanPhaseResult struct {
|
|
FilesToProcess []*FileToProcess
|
|
UnchangedFileIDs []types.FileID // IDs of unchanged files to associate with snapshot
|
|
}
|
|
|
|
// scanPhase performs the initial directory scan to identify files to process
|
|
// It uses the pre-loaded knownFiles map for fast change detection without DB queries
|
|
// It also populates existingFiles map for deletion detection
|
|
// Returns files needing processing and IDs of unchanged files for snapshot association
|
|
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) (*ScanPhaseResult, error) {
|
|
// Use known file count as estimate for progress (accurate for subsequent backups)
|
|
estimatedTotal := int64(len(knownFiles))
|
|
|
|
var filesToProcess []*FileToProcess
|
|
var unchangedFileIDs []types.FileID // Just IDs - no new records needed
|
|
var mu sync.Mutex
|
|
|
|
// Set up periodic status output
|
|
startTime := time.Now()
|
|
lastStatusTime := time.Now()
|
|
statusInterval := 15 * time.Second
|
|
var filesScanned int64
|
|
|
|
log.Debug("Starting directory walk", "path", path)
|
|
err := afero.Walk(s.fs, path, func(filePath string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
if s.skipErrors {
|
|
log.Error("Failed to access file (skipping due to --skip-errors)", "path", filePath, "error", err)
|
|
s.ui.Error("Failed to access %s: %v. Skipping (--skip-errors).", s.ui.Path(filePath), err)
|
|
return nil // Continue scanning
|
|
}
|
|
log.Debug("Error accessing filesystem entry", "path", filePath, "error", err)
|
|
return wrapPermissionError(filePath, err)
|
|
}
|
|
|
|
// Check context cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// Check exclude patterns - for directories, skip the entire subtree
|
|
if s.shouldExclude(filePath, path) {
|
|
if info.IsDir() {
|
|
return filepath.SkipDir
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Handle symlinks
|
|
if info.Mode()&os.ModeSymlink != 0 {
|
|
file := s.buildSymlinkEntry(filePath, info)
|
|
if file != nil {
|
|
existingFiles[filePath] = struct{}{}
|
|
mu.Lock()
|
|
filesToProcess = append(filesToProcess, &FileToProcess{
|
|
Path: filePath,
|
|
FileInfo: info,
|
|
File: file,
|
|
})
|
|
filesScanned++
|
|
mu.Unlock()
|
|
s.updateScanEntryStats(result, true, info)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Handle directories (record for permission/ownership preservation and empty-dir support)
|
|
if info.IsDir() {
|
|
file := s.buildDirectoryEntry(filePath, info)
|
|
existingFiles[filePath] = struct{}{}
|
|
mu.Lock()
|
|
filesToProcess = append(filesToProcess, &FileToProcess{
|
|
Path: filePath,
|
|
FileInfo: info,
|
|
File: file,
|
|
})
|
|
filesScanned++
|
|
mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Skip other non-regular files (devices, sockets, etc.)
|
|
if !info.Mode().IsRegular() {
|
|
return nil
|
|
}
|
|
|
|
// Track this file as existing (for deletion detection)
|
|
existingFiles[filePath] = struct{}{}
|
|
|
|
// Check file against in-memory map (no DB query!)
|
|
file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles)
|
|
|
|
mu.Lock()
|
|
if needsProcessing {
|
|
// New or changed file - will create record after processing
|
|
filesToProcess = append(filesToProcess, &FileToProcess{
|
|
Path: filePath,
|
|
FileInfo: info,
|
|
File: file,
|
|
})
|
|
} else if !file.ID.IsZero() {
|
|
// Unchanged file with existing ID - just need snapshot association
|
|
unchangedFileIDs = append(unchangedFileIDs, file.ID)
|
|
}
|
|
filesScanned++
|
|
changedCount := len(filesToProcess)
|
|
mu.Unlock()
|
|
|
|
// Update result stats
|
|
s.updateScanEntryStats(result, needsProcessing, info)
|
|
|
|
// Output periodic status
|
|
if time.Since(lastStatusTime) >= statusInterval {
|
|
s.printScanProgressLine(filesScanned, changedCount, estimatedTotal, startTime)
|
|
lastStatusTime = time.Now()
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &ScanPhaseResult{
|
|
FilesToProcess: filesToProcess,
|
|
UnchangedFileIDs: unchangedFileIDs,
|
|
}, nil
|
|
}
|
|
|
|
// updateScanEntryStats updates the scan result and progress reporter statistics
|
|
// for a single scanned file entry based on whether it needs processing
|
|
func (s *Scanner) updateScanEntryStats(result *ScanResult, needsProcessing bool, info os.FileInfo) {
|
|
if needsProcessing {
|
|
result.BytesScanned += info.Size()
|
|
if s.progress != nil {
|
|
s.progress.GetStats().BytesScanned.Add(info.Size())
|
|
}
|
|
} else {
|
|
result.FilesSkipped++
|
|
result.BytesSkipped += info.Size()
|
|
if s.progress != nil {
|
|
s.progress.GetStats().FilesSkipped.Add(1)
|
|
s.progress.GetStats().BytesSkipped.Add(info.Size())
|
|
}
|
|
}
|
|
result.FilesScanned++
|
|
if s.progress != nil {
|
|
s.progress.GetStats().FilesScanned.Add(1)
|
|
}
|
|
}
|
|
|
|
// printScanProgressLine prints a periodic progress line during the scan phase,
|
|
// showing files scanned, percentage complete (if estimate available), and ETA
|
|
func (s *Scanner) printScanProgressLine(filesScanned int64, changedCount int, estimatedTotal int64, startTime time.Time) {
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(filesScanned) / elapsed.Seconds()
|
|
|
|
if estimatedTotal > 0 {
|
|
// Show actual scanned vs estimate (may exceed estimate if files were added)
|
|
pct := float64(filesScanned) / float64(estimatedTotal) * 100
|
|
if pct > 100 {
|
|
pct = 100 // Cap at 100% for display
|
|
}
|
|
remaining := estimatedTotal - filesScanned
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
var eta time.Duration
|
|
if rate > 0 && remaining > 0 {
|
|
eta = time.Duration(float64(remaining)/rate) * time.Second
|
|
}
|
|
if eta > 0 {
|
|
s.ui.Progress("Snapshot source files enumeration: %s files (~%s), %s changed or new, %.0f files/sec, enumeration elapsed %s, enumeration estimated remaining time (%s), finish at %s.",
|
|
s.ui.Count(int(filesScanned)),
|
|
s.ui.Percent(pct),
|
|
s.ui.Count(changedCount),
|
|
rate,
|
|
s.ui.Duration(elapsed),
|
|
s.ui.Duration(eta),
|
|
s.ui.Time(time.Now().Add(eta)))
|
|
} else {
|
|
s.ui.Progress("Snapshot source files enumeration: %s files (~%s), %s changed or new, %.0f files/sec, enumeration elapsed %s.",
|
|
s.ui.Count(int(filesScanned)),
|
|
s.ui.Percent(pct),
|
|
s.ui.Count(changedCount),
|
|
rate,
|
|
s.ui.Duration(elapsed))
|
|
}
|
|
} else {
|
|
s.ui.Progress("Snapshot source files enumeration: %s files seen, %s changed or new, %.0f files/sec, enumeration elapsed %s.",
|
|
s.ui.Count(int(filesScanned)),
|
|
s.ui.Count(changedCount),
|
|
rate,
|
|
s.ui.Duration(elapsed))
|
|
}
|
|
}
|
|
|
|
// buildSymlinkEntry creates a File record for a symlink.
|
|
// Returns nil if the link target cannot be read.
|
|
func (s *Scanner) buildSymlinkEntry(path string, info os.FileInfo) *database.File {
|
|
target, err := os.Readlink(path)
|
|
if err != nil {
|
|
log.Debug("Cannot read symlink target", "path", path, "error", err)
|
|
return nil
|
|
}
|
|
|
|
var uid, gid uint32
|
|
if stat, ok := info.Sys().(interface {
|
|
Uid() uint32
|
|
Gid() uint32
|
|
}); ok {
|
|
uid = stat.Uid()
|
|
gid = stat.Gid()
|
|
}
|
|
|
|
return &database.File{
|
|
ID: types.NewFileID(),
|
|
Path: types.FilePath(path),
|
|
SourcePath: types.SourcePath(s.currentSourcePath),
|
|
MTime: info.ModTime(),
|
|
Size: 0,
|
|
Mode: uint32(info.Mode()),
|
|
UID: uid,
|
|
GID: gid,
|
|
LinkTarget: types.FilePath(target),
|
|
}
|
|
}
|
|
|
|
// buildDirectoryEntry creates a File record for a directory.
|
|
func (s *Scanner) buildDirectoryEntry(path string, info os.FileInfo) *database.File {
|
|
var uid, gid uint32
|
|
if stat, ok := info.Sys().(interface {
|
|
Uid() uint32
|
|
Gid() uint32
|
|
}); ok {
|
|
uid = stat.Uid()
|
|
gid = stat.Gid()
|
|
}
|
|
|
|
return &database.File{
|
|
ID: types.NewFileID(),
|
|
Path: types.FilePath(path),
|
|
SourcePath: types.SourcePath(s.currentSourcePath),
|
|
MTime: info.ModTime(),
|
|
Size: 0,
|
|
Mode: uint32(info.Mode()),
|
|
UID: uid,
|
|
GID: gid,
|
|
}
|
|
}
|
|
|
|
// recordNonRegularFile writes a symlink or directory entry to the database
|
|
// and associates it with the current snapshot. No chunking is performed.
|
|
func (s *Scanner) recordNonRegularFile(ctx context.Context, ftp *FileToProcess) error {
|
|
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
|
if err := s.repos.Files.Create(txCtx, tx, ftp.File); err != nil {
|
|
return fmt.Errorf("creating non-regular file record: %w", err)
|
|
}
|
|
return s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, ftp.File.ID)
|
|
})
|
|
}
|
|
|
|
// checkFileInMemory checks if a file needs processing using the in-memory map
|
|
// No database access is performed - this is purely CPU/memory work
|
|
func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) {
|
|
// Get file stats
|
|
stat, ok := info.Sys().(interface {
|
|
Uid() uint32
|
|
Gid() uint32
|
|
})
|
|
|
|
var uid, gid uint32
|
|
if ok {
|
|
uid = stat.Uid()
|
|
gid = stat.Gid()
|
|
}
|
|
|
|
// Check against in-memory map first to get existing ID if available
|
|
existingFile, exists := knownFiles[path]
|
|
|
|
// Create file record with ID set upfront
|
|
// For new files, generate UUID immediately so it's available for chunk associations
|
|
// For existing files, reuse the existing ID
|
|
var fileID types.FileID
|
|
if exists {
|
|
fileID = existingFile.ID
|
|
} else {
|
|
fileID = types.NewFileID()
|
|
}
|
|
|
|
file := &database.File{
|
|
ID: fileID,
|
|
Path: types.FilePath(path),
|
|
SourcePath: types.SourcePath(s.currentSourcePath), // Store source directory for restore path stripping
|
|
MTime: info.ModTime(),
|
|
Size: info.Size(),
|
|
Mode: uint32(info.Mode()),
|
|
UID: uid,
|
|
GID: gid,
|
|
}
|
|
|
|
// New file - needs processing
|
|
if !exists {
|
|
return file, true
|
|
}
|
|
|
|
// Check if file has changed
|
|
if existingFile.Size != file.Size ||
|
|
existingFile.MTime.Unix() != file.MTime.Unix() ||
|
|
existingFile.Mode != file.Mode ||
|
|
existingFile.UID != file.UID ||
|
|
existingFile.GID != file.GID {
|
|
return file, true
|
|
}
|
|
|
|
// File unchanged
|
|
return file, false
|
|
}
|
|
|
|
// batchAddFilesToSnapshot adds existing file IDs to the snapshot association table
|
|
// This is used for unchanged files that already have records in the database
|
|
func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []types.FileID) error {
|
|
const batchSize = 1000
|
|
|
|
startTime := time.Now()
|
|
lastStatusTime := time.Now()
|
|
statusInterval := 5 * time.Second
|
|
|
|
for i := 0; i < len(fileIDs); i += batchSize {
|
|
// Check context cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
end := i + batchSize
|
|
if end > len(fileIDs) {
|
|
end = len(fileIDs)
|
|
}
|
|
batch := fileIDs[i:end]
|
|
|
|
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
for _, fileID := range batch {
|
|
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, fileID); err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Periodic status
|
|
if time.Since(lastStatusTime) >= statusInterval {
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(end) / elapsed.Seconds()
|
|
pct := float64(end) / float64(len(fileIDs)) * 100
|
|
s.ui.Progress("Snapshot unchanged-file association: %s/%s (%s), %.0f files/sec.",
|
|
s.ui.Count(end), s.ui.Count(len(fileIDs)), s.ui.Percent(pct), rate)
|
|
lastStatusTime = time.Now()
|
|
}
|
|
}
|
|
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(len(fileIDs)) / elapsed.Seconds()
|
|
s.ui.Complete("Associated %s unchanged files with the snapshot in %s (%.0f files/sec).",
|
|
s.ui.Count(len(fileIDs)), s.ui.Duration(elapsed), rate)
|
|
|
|
return nil
|
|
}
|
|
|
|
// processPhase processes the files that need backing up
|
|
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
|
|
// Calculate total bytes to process
|
|
var totalBytes int64
|
|
for _, f := range filesToProcess {
|
|
totalBytes += f.FileInfo.Size()
|
|
}
|
|
|
|
// Set up periodic status output
|
|
lastStatusTime := time.Now()
|
|
statusInterval := 15 * time.Second
|
|
startTime := time.Now()
|
|
filesProcessed := 0
|
|
var bytesProcessed int64
|
|
totalFiles := len(filesToProcess)
|
|
|
|
// Process each file
|
|
for _, fileToProcess := range filesToProcess {
|
|
// Update progress
|
|
if s.progress != nil {
|
|
s.progress.GetStats().CurrentFile.Store(fileToProcess.Path)
|
|
}
|
|
|
|
// Process file with error handling for deleted files and skip-errors mode
|
|
skipped, err := s.processFileWithErrorHandling(ctx, fileToProcess, result)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if skipped {
|
|
continue
|
|
}
|
|
|
|
// Update files processed counter
|
|
if s.progress != nil {
|
|
s.progress.GetStats().FilesProcessed.Add(1)
|
|
}
|
|
|
|
filesProcessed++
|
|
bytesProcessed += fileToProcess.FileInfo.Size()
|
|
|
|
// Output periodic status
|
|
if time.Since(lastStatusTime) >= statusInterval {
|
|
s.printProcessingProgress(filesProcessed, totalFiles, bytesProcessed, totalBytes, startTime)
|
|
lastStatusTime = time.Now()
|
|
}
|
|
}
|
|
|
|
// Finalize: flush packer, pending files, and handle local blobs
|
|
return s.finalizeProcessPhase(ctx, result)
|
|
}
|
|
|
|
// processFileWithErrorHandling wraps processFileStreaming with error recovery for
|
|
// deleted files and skip-errors mode. Returns (skipped, error).
|
|
func (s *Scanner) processFileWithErrorHandling(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) (bool, error) {
|
|
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
|
|
// Handle files that were deleted between scan and process phases
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path)
|
|
result.FilesSkipped++
|
|
return true, nil
|
|
}
|
|
// Skip file read errors if --skip-errors is enabled
|
|
if s.skipErrors {
|
|
log.Error("Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err)
|
|
s.ui.Error("Failed to process %s: %v. Skipping (--skip-errors).", s.ui.Path(fileToProcess.Path), err)
|
|
result.FilesSkipped++
|
|
return true, nil
|
|
}
|
|
return false, fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// printProcessingProgress prints a periodic progress line during the process phase,
|
|
// showing files processed, bytes transferred, throughput, and ETA
|
|
func (s *Scanner) printProcessingProgress(filesProcessed, totalFiles int, bytesProcessed, totalBytes int64, startTime time.Time) {
|
|
elapsed := time.Since(startTime)
|
|
pct := float64(bytesProcessed) / float64(totalBytes) * 100
|
|
byteRate := float64(bytesProcessed) / elapsed.Seconds()
|
|
fileRate := float64(filesProcessed) / elapsed.Seconds()
|
|
|
|
// Calculate ETA based on bytes (more accurate than files)
|
|
remainingBytes := totalBytes - bytesProcessed
|
|
var eta time.Duration
|
|
if byteRate > 0 {
|
|
eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second
|
|
}
|
|
|
|
if eta > 0 {
|
|
s.ui.Progress("Snapshot file processing: %s/%s files (%s), %s/%s, %s, %.0f files/sec, processing elapsed %s, processing estimated remaining time (%s), finish at %s.",
|
|
s.ui.Count(filesProcessed),
|
|
s.ui.Count(totalFiles),
|
|
s.ui.Percent(pct),
|
|
s.ui.Size(bytesProcessed),
|
|
s.ui.Size(totalBytes),
|
|
s.ui.Speed(byteRate),
|
|
fileRate,
|
|
s.ui.Duration(elapsed),
|
|
s.ui.Duration(eta),
|
|
s.ui.Time(time.Now().Add(eta)))
|
|
} else {
|
|
s.ui.Progress("Snapshot file processing: %s/%s files (%s), %s/%s, %s, %.0f files/sec, processing elapsed %s.",
|
|
s.ui.Count(filesProcessed),
|
|
s.ui.Count(totalFiles),
|
|
s.ui.Percent(pct),
|
|
s.ui.Size(bytesProcessed),
|
|
s.ui.Size(totalBytes),
|
|
s.ui.Speed(byteRate),
|
|
fileRate,
|
|
s.ui.Duration(elapsed))
|
|
}
|
|
}
|
|
|
|
// finalizeProcessPhase flushes the packer, writes remaining pending files to the database,
|
|
// and handles local blob storage when no remote storage is configured
|
|
func (s *Scanner) finalizeProcessPhase(ctx context.Context, result *ScanResult) error {
|
|
// Final packer flush first - this commits remaining chunks to DB
|
|
// and handleBlobReady will flush files whose chunks are now committed
|
|
s.packerMu.Lock()
|
|
if err := s.packer.Flush(); err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("flushing packer: %w", err)
|
|
}
|
|
s.packerMu.Unlock()
|
|
|
|
// Flush any remaining pending files (e.g., files with only pre-existing chunks
|
|
// that didn't trigger a blob finalize)
|
|
if err := s.flushAllPending(ctx); err != nil {
|
|
return fmt.Errorf("flushing remaining pending files: %w", err)
|
|
}
|
|
|
|
// If no storage configured, store any remaining blobs locally
|
|
if s.storage == nil {
|
|
blobs := s.packer.GetFinishedBlobs()
|
|
for _, b := range blobs {
|
|
// Blob metadata is already stored incrementally during packing
|
|
// Just add the blob to the snapshot
|
|
blobID, err := types.ParseBlobID(b.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("parsing blob ID: %w", err)
|
|
}
|
|
err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, types.BlobHash(b.Hash))
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("storing blob metadata: %w", err)
|
|
}
|
|
}
|
|
result.BlobsCreated += len(blobs)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleBlobReady is called by the packer when a blob is finalized
|
|
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
|
startTime := time.Now().UTC()
|
|
finishedBlob := blobWithReader.FinishedBlob
|
|
|
|
if s.progress != nil {
|
|
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
|
|
s.progress.GetStats().BlobsCreated.Add(1)
|
|
}
|
|
|
|
ctx := s.scanCtx
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
|
blobExists, err := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime)
|
|
if err != nil {
|
|
s.cleanupBlobTempFile(blobWithReader)
|
|
return fmt.Errorf("uploading blob %s: %w", finishedBlob.Hash, err)
|
|
}
|
|
|
|
if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil {
|
|
s.cleanupBlobTempFile(blobWithReader)
|
|
return err
|
|
}
|
|
|
|
s.cleanupBlobTempFile(blobWithReader)
|
|
|
|
// Chunks from this blob are now committed to DB - remove from pending set
|
|
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
|
|
|
|
// Flush files whose chunks are now all committed
|
|
if err := s.flushCompletedPendingFiles(ctx); err != nil {
|
|
return fmt.Errorf("flushing completed files: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed
|
|
func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) (bool, error) {
|
|
finishedBlob := blobWithReader.FinishedBlob
|
|
|
|
// Check if blob already exists (deduplication after restart)
|
|
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
|
log.Info("Blob already exists in storage, skipping upload",
|
|
"hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
|
s.ui.Info("Blob %s (%s) already exists in backup destination store. Skipping upload.",
|
|
s.ui.Hex(finishedBlob.Hash), s.ui.Size(finishedBlob.Compressed))
|
|
return true, nil
|
|
}
|
|
|
|
s.ui.Begin("Uploading blob %s (%s) to backup destination store.",
|
|
s.ui.Hex(finishedBlob.Hash), s.ui.Size(finishedBlob.Compressed))
|
|
|
|
progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob, startTime)
|
|
|
|
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
|
log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err)
|
|
return false, fmt.Errorf("uploading blob to storage: %w", err)
|
|
}
|
|
|
|
uploadDuration := time.Since(startTime)
|
|
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
|
|
|
s.ui.Complete("Uploaded blob %s (%s) in %s at %s.",
|
|
s.ui.Hex(finishedBlob.Hash),
|
|
s.ui.Size(finishedBlob.Compressed),
|
|
s.ui.Duration(uploadDuration),
|
|
s.ui.Speed(uploadSpeedBps))
|
|
|
|
log.Info("Successfully uploaded blob to storage",
|
|
"path", blobPath,
|
|
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
|
"duration", uploadDuration,
|
|
"speed", humanize.SI(uploadSpeedBps*8, "bps"))
|
|
|
|
if s.progress != nil {
|
|
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
|
stats := s.progress.GetStats()
|
|
stats.BlobsUploaded.Add(1)
|
|
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// makeUploadProgressCallback creates a progress callback for blob uploads.
|
|
// It updates the live progress reporter ~twice/sec for ETAs and prints a
|
|
// human-readable status line to s.output at most every 15 seconds.
|
|
func (s *Scanner) makeUploadProgressCallback(ctx context.Context, finishedBlob *blob.FinishedBlob, uploadStart time.Time) func(int64) error {
|
|
lastProgressTime := time.Now()
|
|
lastProgressBytes := int64(0)
|
|
lastStdoutTime := time.Now()
|
|
const stdoutInterval = 15 * time.Second
|
|
|
|
return func(uploaded int64) error {
|
|
now := time.Now()
|
|
elapsed := now.Sub(lastProgressTime).Seconds()
|
|
if elapsed > 0.5 {
|
|
bytesSinceLastUpdate := uploaded - lastProgressBytes
|
|
speed := float64(bytesSinceLastUpdate) / elapsed
|
|
if s.progress != nil {
|
|
s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed)
|
|
}
|
|
lastProgressTime = now
|
|
lastProgressBytes = uploaded
|
|
}
|
|
|
|
// Periodic stdout status line so the user knows the upload is alive.
|
|
if now.Sub(lastStdoutTime) >= stdoutInterval {
|
|
totalElapsed := now.Sub(uploadStart)
|
|
pct := float64(uploaded) / float64(finishedBlob.Compressed) * 100
|
|
avgSpeed := float64(uploaded) / totalElapsed.Seconds()
|
|
var eta time.Duration
|
|
if avgSpeed > 0 {
|
|
eta = time.Duration(float64(finishedBlob.Compressed-uploaded)/avgSpeed) * time.Second
|
|
}
|
|
s.ui.Progress("Blob upload %s: %s / %s (%s) at %s, blob upload elapsed %s, blob upload estimated remaining time (%s), finish at %s.",
|
|
s.ui.Hex(finishedBlob.Hash),
|
|
s.ui.Size(uploaded),
|
|
s.ui.Size(finishedBlob.Compressed),
|
|
s.ui.Percent(pct),
|
|
s.ui.Speed(avgSpeed),
|
|
s.ui.Duration(totalElapsed),
|
|
s.ui.Duration(eta),
|
|
s.ui.Time(now.Add(eta)))
|
|
lastStdoutTime = now
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// recordBlobMetadata stores blob upload metadata in the database
|
|
func (s *Scanner) recordBlobMetadata(ctx context.Context, finishedBlob *blob.FinishedBlob, blobExists bool, startTime time.Time) error {
|
|
finishedBlobID, err := types.ParseBlobID(finishedBlob.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("parsing finished blob ID: %w", err)
|
|
}
|
|
|
|
uploadDuration := time.Since(startTime)
|
|
|
|
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
|
if err := s.repos.Blobs.UpdateUploaded(txCtx, tx, finishedBlob.ID); err != nil {
|
|
return fmt.Errorf("updating blob upload timestamp: %w", err)
|
|
}
|
|
|
|
if err := s.repos.Snapshots.AddBlob(txCtx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil {
|
|
return fmt.Errorf("adding blob to snapshot: %w", err)
|
|
}
|
|
|
|
if !blobExists {
|
|
upload := &database.Upload{
|
|
BlobHash: finishedBlob.Hash,
|
|
SnapshotID: s.snapshotID,
|
|
UploadedAt: startTime,
|
|
Size: finishedBlob.Compressed,
|
|
DurationMs: uploadDuration.Milliseconds(),
|
|
}
|
|
if err := s.repos.Uploads.Create(txCtx, tx, upload); err != nil {
|
|
return fmt.Errorf("recording upload metrics: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// cleanupBlobTempFile closes and removes the blob's temporary file
|
|
func (s *Scanner) cleanupBlobTempFile(blobWithReader *blob.BlobWithReader) {
|
|
if blobWithReader.TempFile != nil {
|
|
tempName := blobWithReader.TempFile.Name()
|
|
if err := blobWithReader.TempFile.Close(); err != nil {
|
|
log.Fatal("Failed to close temp file", "file", tempName, "error", err)
|
|
}
|
|
if err := s.fs.Remove(tempName); err != nil {
|
|
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// streamingChunkInfo tracks chunk metadata collected during streaming
|
|
type streamingChunkInfo struct {
|
|
fileChunk database.FileChunk
|
|
offset int64
|
|
size int64
|
|
}
|
|
|
|
// processFileStreaming processes a file by streaming chunks directly to the packer
|
|
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
|
|
// Symlinks and directories have no data to chunk — just record them in the DB.
|
|
mode := os.FileMode(fileToProcess.File.Mode)
|
|
if mode&os.ModeSymlink != 0 || mode.IsDir() {
|
|
return s.recordNonRegularFile(ctx, fileToProcess)
|
|
}
|
|
|
|
file, err := s.fs.Open(fileToProcess.Path)
|
|
if err != nil {
|
|
return fmt.Errorf("opening file: %w", wrapPermissionError(fileToProcess.Path, err))
|
|
}
|
|
defer func() { _ = file.Close() }()
|
|
|
|
var chunks []streamingChunkInfo
|
|
chunkIndex := 0
|
|
|
|
fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
chunkExists := s.chunkExists(chunk.Hash)
|
|
if !chunkExists {
|
|
s.packer.AddPendingChunk(chunk.Hash, chunk.Size)
|
|
s.addKnownChunk(chunk.Hash)
|
|
s.addPendingChunkHash(chunk.Hash)
|
|
}
|
|
|
|
chunks = append(chunks, streamingChunkInfo{
|
|
fileChunk: database.FileChunk{
|
|
FileID: fileToProcess.File.ID,
|
|
Idx: chunkIndex,
|
|
ChunkHash: types.ChunkHash(chunk.Hash),
|
|
},
|
|
offset: chunk.Offset,
|
|
size: chunk.Size,
|
|
})
|
|
|
|
s.updateChunkStats(chunkExists, chunk.Size, result)
|
|
|
|
if !chunkExists {
|
|
if err := s.addChunkToPacker(chunk); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
chunk.Data = nil
|
|
chunkIndex++
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("chunking file: %w", err)
|
|
}
|
|
|
|
log.Debug("Completed snapshotting file",
|
|
"path", fileToProcess.Path, "file_hash", fileHash, "chunks", len(chunks))
|
|
|
|
s.queueFileForBatchInsert(ctx, fileToProcess, chunks)
|
|
return nil
|
|
}
|
|
|
|
// updateChunkStats updates scan result and progress stats for a processed chunk
|
|
func (s *Scanner) updateChunkStats(chunkExists bool, chunkSize int64, result *ScanResult) {
|
|
if chunkExists {
|
|
result.FilesSkipped++
|
|
result.BytesSkipped += chunkSize
|
|
if s.progress != nil {
|
|
s.progress.GetStats().BytesSkipped.Add(chunkSize)
|
|
}
|
|
} else {
|
|
result.ChunksCreated++
|
|
result.BytesScanned += chunkSize
|
|
if s.progress != nil {
|
|
s.progress.GetStats().ChunksCreated.Add(1)
|
|
s.progress.GetStats().BytesProcessed.Add(chunkSize)
|
|
s.progress.UpdateChunkingActivity()
|
|
}
|
|
}
|
|
}
|
|
|
|
// addChunkToPacker adds a chunk to the blob packer, finalizing the current blob if needed
|
|
func (s *Scanner) addChunkToPacker(chunk chunker.Chunk) error {
|
|
s.packerMu.Lock()
|
|
err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data})
|
|
if err == blob.ErrBlobSizeLimitExceeded {
|
|
if err := s.packer.FinalizeBlob(); err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("finalizing blob: %w", err)
|
|
}
|
|
if err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data}); err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("adding chunk after finalize: %w", err)
|
|
}
|
|
} else if err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("adding chunk to packer: %w", err)
|
|
}
|
|
s.packerMu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// queueFileForBatchInsert builds file/chunk associations and queues the file for batch DB insert
|
|
func (s *Scanner) queueFileForBatchInsert(ctx context.Context, fileToProcess *FileToProcess, chunks []streamingChunkInfo) {
|
|
fileChunks := make([]database.FileChunk, len(chunks))
|
|
chunkFiles := make([]database.ChunkFile, len(chunks))
|
|
for i, ci := range chunks {
|
|
fileChunks[i] = database.FileChunk{
|
|
FileID: fileToProcess.File.ID,
|
|
Idx: ci.fileChunk.Idx,
|
|
ChunkHash: ci.fileChunk.ChunkHash,
|
|
}
|
|
chunkFiles[i] = database.ChunkFile{
|
|
ChunkHash: ci.fileChunk.ChunkHash,
|
|
FileID: fileToProcess.File.ID,
|
|
FileOffset: ci.offset,
|
|
Length: ci.size,
|
|
}
|
|
}
|
|
|
|
s.addPendingFile(ctx, pendingFileData{
|
|
file: fileToProcess.File,
|
|
fileChunks: fileChunks,
|
|
chunkFiles: chunkFiles,
|
|
})
|
|
}
|
|
|
|
// GetProgress returns the progress reporter for this scanner
|
|
func (s *Scanner) GetProgress() *ProgressReporter {
|
|
return s.progress
|
|
}
|
|
|
|
// detectDeletedFilesFromMap finds files that existed in previous snapshots but no longer exist
|
|
// Uses pre-loaded maps to avoid any filesystem or database access
|
|
func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[string]*database.File, existingFiles map[string]struct{}, result *ScanResult) error {
|
|
if len(knownFiles) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Check each known file against the enumerated set (no filesystem access needed)
|
|
for path, file := range knownFiles {
|
|
// Check context cancellation periodically
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// Check if the file exists in our enumerated set
|
|
if _, exists := existingFiles[path]; !exists {
|
|
// File has been deleted
|
|
result.FilesDeleted++
|
|
result.BytesDeleted += file.Size
|
|
log.Debug("Detected deleted file", "path", path, "size", file.Size)
|
|
}
|
|
}
|
|
|
|
if result.FilesDeleted > 0 {
|
|
s.ui.Info("Snapshot source files enumeration detected %s deleted files.", s.ui.Count(result.FilesDeleted))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// wrapPermissionError augments permission errors with platform-specific
|
|
// remediation instructions. On macOS, TCC-protected directories (Calendars,
|
|
// Reminders, Photos, etc.) return EPERM unless the running application has
|
|
// been granted Full Disk Access.
|
|
func wrapPermissionError(path string, err error) error {
|
|
if !errors.Is(err, os.ErrPermission) {
|
|
return err
|
|
}
|
|
if runtime.GOOS == "darwin" {
|
|
return fmt.Errorf("cannot read %s: %w\n\n"+
|
|
"macOS is blocking access to this path. Grant Full Disk Access to your\n"+
|
|
"terminal application (or the app running vaultik):\n\n"+
|
|
" System Settings → Privacy & Security → Full Disk Access\n\n"+
|
|
"then quit and reopen the terminal and re-run the backup", path, err)
|
|
}
|
|
return fmt.Errorf("cannot read %s: %w (check file permissions, or run with --skip-errors to continue past unreadable files)", path, err)
|
|
}
|
|
|
|
// compileExcludePatterns compiles the exclude patterns into glob matchers
|
|
func compileExcludePatterns(patterns []string) []compiledPattern {
|
|
var compiled []compiledPattern
|
|
for _, p := range patterns {
|
|
if p == "" {
|
|
continue
|
|
}
|
|
|
|
// Check if pattern is anchored (starts with /)
|
|
anchored := strings.HasPrefix(p, "/")
|
|
pattern := p
|
|
if anchored {
|
|
pattern = p[1:] // Remove leading /
|
|
}
|
|
|
|
// Remove trailing slash if present (directory indicator)
|
|
pattern = strings.TrimSuffix(pattern, "/")
|
|
|
|
// Compile the glob pattern
|
|
// For patterns without path separators, we need to match them as components
|
|
// e.g., ".git" should match ".git" anywhere in the path
|
|
g, err := glob.Compile(pattern, '/')
|
|
if err != nil {
|
|
log.Warn("Invalid exclude pattern, skipping", "pattern", p, "error", err)
|
|
continue
|
|
}
|
|
|
|
compiled = append(compiled, compiledPattern{
|
|
pattern: g,
|
|
anchored: anchored,
|
|
original: p,
|
|
})
|
|
}
|
|
return compiled
|
|
}
|
|
|
|
// shouldExclude checks if a path should be excluded based on exclude patterns
|
|
// filePath is the full path to the file
|
|
// rootPath is the root of the backup source directory
|
|
func (s *Scanner) shouldExclude(filePath, rootPath string) bool {
|
|
if len(s.compiledExclude) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Get the relative path from root
|
|
relPath, err := filepath.Rel(rootPath, filePath)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
// Never exclude the root directory itself
|
|
if relPath == "." {
|
|
return false
|
|
}
|
|
|
|
// Normalize path separators
|
|
relPath = filepath.ToSlash(relPath)
|
|
|
|
// Check each pattern
|
|
for _, cp := range s.compiledExclude {
|
|
if cp.anchored {
|
|
// Anchored pattern: must match from the root
|
|
// Match the relative path directly
|
|
if cp.pattern.Match(relPath) {
|
|
return true
|
|
}
|
|
// Also check if any prefix of the path matches (for directory patterns)
|
|
parts := strings.Split(relPath, "/")
|
|
for i := 1; i <= len(parts); i++ {
|
|
prefix := strings.Join(parts[:i], "/")
|
|
if cp.pattern.Match(prefix) {
|
|
return true
|
|
}
|
|
}
|
|
} else {
|
|
// Unanchored pattern: can match anywhere in path
|
|
// Check the full relative path
|
|
if cp.pattern.Match(relPath) {
|
|
return true
|
|
}
|
|
// Check each path component and subpath
|
|
parts := strings.Split(relPath, "/")
|
|
for i := range parts {
|
|
// Match individual component (e.g., ".git" matches ".git" directory)
|
|
if cp.pattern.Match(parts[i]) {
|
|
return true
|
|
}
|
|
// Match subpath from this component onwards
|
|
subpath := strings.Join(parts[i:], "/")
|
|
if cp.pattern.Match(subpath) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|