All checks were successful
check / check (push) Successful in 2m38s
Scanner now writes all user-facing output to an io.Writer (os.Stdout when progress is enabled, io.Discard in --cron mode). This fixes the long-standing issue where --cron still printed progress lines. S3 HeadObject now properly distinguishes not-found from other errors instead of swallowing all errors as not-found. Config/CLI error messages include actionable hints (where to find the config, how to generate keys, what storage options exist).
1466 lines
48 KiB
Go
1466 lines
48 KiB
Go
package snapshot
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/vaultik/internal/blob"
|
|
"git.eeqj.de/sneak/vaultik/internal/chunker"
|
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
|
"git.eeqj.de/sneak/vaultik/internal/storage"
|
|
"git.eeqj.de/sneak/vaultik/internal/types"
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/gobwas/glob"
|
|
"github.com/spf13/afero"
|
|
)
|
|
|
|
// FileToProcess holds information about a file that needs processing
|
|
type FileToProcess struct {
|
|
Path string
|
|
FileInfo os.FileInfo
|
|
File *database.File
|
|
}
|
|
|
|
// pendingFileData holds all data needed to commit a file to the database
|
|
type pendingFileData struct {
|
|
file *database.File
|
|
fileChunks []database.FileChunk
|
|
chunkFiles []database.ChunkFile
|
|
}
|
|
|
|
// compiledPattern holds a compiled glob pattern and whether it's anchored
|
|
type compiledPattern struct {
|
|
pattern glob.Glob
|
|
anchored bool // If true, only matches from root of source dir
|
|
original string
|
|
}
|
|
|
|
// Scanner scans directories and populates the database with file and chunk information
|
|
type Scanner struct {
|
|
fs afero.Fs
|
|
chunker *chunker.Chunker
|
|
packer *blob.Packer
|
|
repos *database.Repositories
|
|
storage storage.Storer
|
|
maxBlobSize int64
|
|
compressionLevel int
|
|
ageRecipient string
|
|
snapshotID string // Current snapshot being processed
|
|
currentSourcePath string // Current source directory being scanned (for restore path stripping)
|
|
exclude []string // Glob patterns for files/directories to exclude
|
|
compiledExclude []compiledPattern // Compiled glob patterns
|
|
progress *ProgressReporter
|
|
skipErrors bool // Skip file read errors (log loudly but continue)
|
|
output io.Writer // User-facing output (os.Stdout or io.Discard in cron mode)
|
|
|
|
// In-memory cache of known chunk hashes for fast existence checks
|
|
knownChunks map[string]struct{}
|
|
knownChunksMu sync.RWMutex
|
|
|
|
// Pending chunk hashes - chunks that have been added to packer but not yet committed to DB
|
|
// When a blob finalizes, the committed chunks are removed from this set
|
|
pendingChunkHashes map[string]struct{}
|
|
pendingChunkHashesMu sync.Mutex
|
|
|
|
// Pending file data buffer for batch insertion
|
|
// Files are flushed when all their chunks have been committed to DB
|
|
pendingFiles []pendingFileData
|
|
pendingFilesMu sync.Mutex
|
|
|
|
// Mutex for coordinating blob creation
|
|
packerMu sync.Mutex // Blocks chunk production during blob creation
|
|
|
|
// Context for cancellation
|
|
scanCtx context.Context
|
|
}
|
|
|
|
// ScannerConfig contains configuration for the scanner
|
|
type ScannerConfig struct {
|
|
FS afero.Fs
|
|
ChunkSize int64
|
|
Repositories *database.Repositories
|
|
Storage storage.Storer
|
|
MaxBlobSize int64
|
|
CompressionLevel int
|
|
AgeRecipients []string // Optional, empty means no encryption
|
|
EnableProgress bool // Enable progress reporting
|
|
Exclude []string // Glob patterns for files/directories to exclude
|
|
SkipErrors bool // Skip file read errors (log loudly but continue)
|
|
}
|
|
|
|
// ScanResult contains the results of a scan operation
|
|
type ScanResult struct {
|
|
FilesScanned int
|
|
FilesSkipped int
|
|
FilesDeleted int
|
|
BytesScanned int64
|
|
BytesSkipped int64
|
|
BytesDeleted int64
|
|
ChunksCreated int
|
|
BlobsCreated int
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
}
|
|
|
|
// NewScanner creates a new scanner instance
|
|
func NewScanner(cfg ScannerConfig) *Scanner {
|
|
// Create encryptor (required for blob packing)
|
|
if len(cfg.AgeRecipients) == 0 {
|
|
log.Error("No age recipients configured - encryption is required")
|
|
return nil
|
|
}
|
|
|
|
// Create blob packer with encryption
|
|
packerCfg := blob.PackerConfig{
|
|
MaxBlobSize: cfg.MaxBlobSize,
|
|
CompressionLevel: cfg.CompressionLevel,
|
|
Recipients: cfg.AgeRecipients,
|
|
Repositories: cfg.Repositories,
|
|
Fs: cfg.FS,
|
|
}
|
|
packer, err := blob.NewPacker(packerCfg)
|
|
if err != nil {
|
|
log.Error("Failed to create packer", "error", err)
|
|
return nil
|
|
}
|
|
|
|
var progress *ProgressReporter
|
|
if cfg.EnableProgress {
|
|
progress = NewProgressReporter()
|
|
}
|
|
|
|
// Compile exclude patterns
|
|
compiledExclude := compileExcludePatterns(cfg.Exclude)
|
|
|
|
output := io.Writer(io.Discard)
|
|
if cfg.EnableProgress {
|
|
output = os.Stdout
|
|
}
|
|
|
|
return &Scanner{
|
|
fs: cfg.FS,
|
|
chunker: chunker.NewChunker(cfg.ChunkSize),
|
|
packer: packer,
|
|
repos: cfg.Repositories,
|
|
storage: cfg.Storage,
|
|
maxBlobSize: cfg.MaxBlobSize,
|
|
compressionLevel: cfg.CompressionLevel,
|
|
ageRecipient: strings.Join(cfg.AgeRecipients, ","),
|
|
exclude: cfg.Exclude,
|
|
compiledExclude: compiledExclude,
|
|
progress: progress,
|
|
skipErrors: cfg.SkipErrors,
|
|
output: output,
|
|
pendingChunkHashes: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
// Scan scans a directory and populates the database
|
|
func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*ScanResult, error) {
|
|
s.snapshotID = snapshotID
|
|
s.currentSourcePath = path // Store source path for file records (used during restore)
|
|
s.scanCtx = ctx
|
|
result := &ScanResult{
|
|
StartTime: time.Now().UTC(),
|
|
}
|
|
|
|
// Set blob handler for concurrent upload
|
|
if s.storage != nil {
|
|
log.Debug("Setting blob handler for storage uploads")
|
|
s.packer.SetBlobHandler(s.handleBlobReady)
|
|
} else {
|
|
log.Debug("No storage configured, blobs will not be uploaded")
|
|
}
|
|
|
|
// Start progress reporting if enabled
|
|
if s.progress != nil {
|
|
s.progress.Start()
|
|
defer s.progress.Stop()
|
|
}
|
|
|
|
// Phase 0: Load known files and chunks from database into memory for fast lookup
|
|
knownFiles, err := s.loadDatabaseState(ctx, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Phase 1: Scan directory, collect files to process, and track existing files
|
|
// (builds existingFiles map during walk to avoid double traversal)
|
|
log.Info("Phase 1/3: Scanning directory structure")
|
|
existingFiles := make(map[string]struct{})
|
|
scanResult, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scan phase failed: %w", err)
|
|
}
|
|
filesToProcess := scanResult.FilesToProcess
|
|
|
|
// Phase 1b: Detect deleted files by comparing DB against scanned files
|
|
if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil {
|
|
return nil, fmt.Errorf("detecting deleted files: %w", err)
|
|
}
|
|
|
|
// Phase 1c: Associate unchanged files with this snapshot (no new records needed)
|
|
if len(scanResult.UnchangedFileIDs) > 0 {
|
|
_, _ = fmt.Fprintf(s.output, "Associating %s unchanged files with snapshot...\n", formatNumber(len(scanResult.UnchangedFileIDs)))
|
|
if err := s.batchAddFilesToSnapshot(ctx, scanResult.UnchangedFileIDs); err != nil {
|
|
return nil, fmt.Errorf("associating unchanged files: %w", err)
|
|
}
|
|
}
|
|
|
|
// Summarize scan phase results and update progress
|
|
s.summarizeScanPhase(result, filesToProcess)
|
|
|
|
// Phase 2: Process files and create chunks
|
|
if len(filesToProcess) > 0 {
|
|
_, _ = fmt.Fprintf(s.output, "Processing %s files...\n", formatNumber(len(filesToProcess)))
|
|
log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)")
|
|
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
|
|
return nil, fmt.Errorf("process phase failed: %w", err)
|
|
}
|
|
} else {
|
|
_, _ = fmt.Fprintf(s.output, "No files need processing. Creating metadata-only snapshot.\n")
|
|
log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)")
|
|
}
|
|
|
|
// Finalize result with blob statistics
|
|
s.finalizeScanResult(ctx, result)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// loadDatabaseState loads known files and chunks from the database into memory for fast lookup
|
|
// This avoids per-file and per-chunk database queries during the scan and process phases
|
|
func (s *Scanner) loadDatabaseState(ctx context.Context, path string) (map[string]*database.File, error) {
|
|
_, _ = fmt.Fprintln(s.output, "Loading known files from database...")
|
|
knownFiles, err := s.loadKnownFiles(ctx, path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("loading known files: %w", err)
|
|
}
|
|
_, _ = fmt.Fprintf(s.output, "Loaded %s known files from database\n", formatNumber(len(knownFiles)))
|
|
|
|
_, _ = fmt.Fprintln(s.output, "Loading known chunks from database...")
|
|
if err := s.loadKnownChunks(ctx); err != nil {
|
|
return nil, fmt.Errorf("loading known chunks: %w", err)
|
|
}
|
|
_, _ = fmt.Fprintf(s.output, "Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
|
|
|
|
return knownFiles, nil
|
|
}
|
|
|
|
// summarizeScanPhase calculates total size to process, updates progress tracking,
|
|
// and prints the scan phase summary with file counts and sizes
|
|
func (s *Scanner) summarizeScanPhase(result *ScanResult, filesToProcess []*FileToProcess) {
|
|
var totalSizeToProcess int64
|
|
for _, file := range filesToProcess {
|
|
totalSizeToProcess += file.FileInfo.Size()
|
|
}
|
|
|
|
if s.progress != nil {
|
|
s.progress.SetTotalSize(totalSizeToProcess)
|
|
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
|
|
}
|
|
|
|
log.Info("Phase 1 complete",
|
|
"total_files", len(filesToProcess),
|
|
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
|
|
"files_skipped", result.FilesSkipped,
|
|
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
|
|
|
|
_, _ = fmt.Fprintf(s.output, "Scan complete: %s examined (%s), %s to process (%s)",
|
|
formatNumber(result.FilesScanned),
|
|
humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)),
|
|
formatNumber(len(filesToProcess)),
|
|
humanize.Bytes(uint64(totalSizeToProcess)))
|
|
if result.FilesDeleted > 0 {
|
|
_, _ = fmt.Fprintf(s.output, ", %s deleted (%s)",
|
|
formatNumber(result.FilesDeleted),
|
|
humanize.Bytes(uint64(result.BytesDeleted)))
|
|
}
|
|
_, _ = fmt.Fprintln(s.output)
|
|
}
|
|
|
|
// finalizeScanResult populates final blob statistics in the scan result
|
|
// by querying the packer and database for blob/upload counts
|
|
func (s *Scanner) finalizeScanResult(ctx context.Context, result *ScanResult) {
|
|
blobs := s.packer.GetFinishedBlobs()
|
|
result.BlobsCreated += len(blobs)
|
|
|
|
// Query database for actual blob count created during this snapshot
|
|
// The database is authoritative, especially for concurrent blob uploads
|
|
// We count uploads rather than all snapshot_blobs to get only NEW blobs
|
|
if s.snapshotID != "" {
|
|
uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID)
|
|
if err != nil {
|
|
log.Warn("Failed to query upload count from database", "error", err)
|
|
} else {
|
|
result.BlobsCreated = int(uploadCount)
|
|
}
|
|
}
|
|
|
|
result.EndTime = time.Now().UTC()
|
|
}
|
|
|
|
// loadKnownFiles loads all known files from the database into a map for fast lookup
|
|
// This avoids per-file database queries during the scan phase
|
|
func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*database.File, error) {
|
|
files, err := s.repos.Files.ListByPrefix(ctx, path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing files by prefix: %w", err)
|
|
}
|
|
|
|
result := make(map[string]*database.File, len(files))
|
|
for _, f := range files {
|
|
result[f.Path.String()] = f
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// loadKnownChunks loads all known chunk hashes from the database into a map for fast lookup
|
|
// This avoids per-chunk database queries during file processing
|
|
func (s *Scanner) loadKnownChunks(ctx context.Context) error {
|
|
chunks, err := s.repos.Chunks.List(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("listing chunks: %w", err)
|
|
}
|
|
|
|
s.knownChunksMu.Lock()
|
|
s.knownChunks = make(map[string]struct{}, len(chunks))
|
|
for _, c := range chunks {
|
|
s.knownChunks[c.ChunkHash.String()] = struct{}{}
|
|
}
|
|
s.knownChunksMu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// chunkExists checks if a chunk hash exists in the in-memory cache
|
|
func (s *Scanner) chunkExists(hash string) bool {
|
|
s.knownChunksMu.RLock()
|
|
_, exists := s.knownChunks[hash]
|
|
s.knownChunksMu.RUnlock()
|
|
return exists
|
|
}
|
|
|
|
// addKnownChunk adds a chunk hash to the in-memory cache
|
|
func (s *Scanner) addKnownChunk(hash string) {
|
|
s.knownChunksMu.Lock()
|
|
s.knownChunks[hash] = struct{}{}
|
|
s.knownChunksMu.Unlock()
|
|
}
|
|
|
|
// addPendingChunkHash marks a chunk as pending (not yet committed to DB)
|
|
func (s *Scanner) addPendingChunkHash(hash string) {
|
|
s.pendingChunkHashesMu.Lock()
|
|
s.pendingChunkHashes[hash] = struct{}{}
|
|
s.pendingChunkHashesMu.Unlock()
|
|
}
|
|
|
|
// removePendingChunkHashes removes committed chunk hashes from the pending set
|
|
func (s *Scanner) removePendingChunkHashes(hashes []string) {
|
|
log.Debug("removePendingChunkHashes: starting", "count", len(hashes))
|
|
start := time.Now()
|
|
s.pendingChunkHashesMu.Lock()
|
|
for _, hash := range hashes {
|
|
delete(s.pendingChunkHashes, hash)
|
|
}
|
|
s.pendingChunkHashesMu.Unlock()
|
|
log.Debug("removePendingChunkHashes: done", "count", len(hashes), "duration", time.Since(start))
|
|
}
|
|
|
|
// isChunkPending returns true if the chunk is still pending (not yet committed to DB)
|
|
func (s *Scanner) isChunkPending(hash string) bool {
|
|
s.pendingChunkHashesMu.Lock()
|
|
_, pending := s.pendingChunkHashes[hash]
|
|
s.pendingChunkHashesMu.Unlock()
|
|
return pending
|
|
}
|
|
|
|
// addPendingFile adds a file to the pending buffer
|
|
// Files are NOT auto-flushed here - they are flushed when their chunks are committed
|
|
// (in handleBlobReady after blob finalize)
|
|
func (s *Scanner) addPendingFile(_ context.Context, data pendingFileData) {
|
|
s.pendingFilesMu.Lock()
|
|
s.pendingFiles = append(s.pendingFiles, data)
|
|
s.pendingFilesMu.Unlock()
|
|
}
|
|
|
|
// flushPendingFiles writes all pending files to the database in a single transaction
|
|
func (s *Scanner) flushPendingFiles(ctx context.Context) error {
|
|
s.pendingFilesMu.Lock()
|
|
files := s.pendingFiles
|
|
s.pendingFiles = nil
|
|
s.pendingFilesMu.Unlock()
|
|
|
|
if len(files) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
|
for _, data := range files {
|
|
// Create or update the file record
|
|
if err := s.repos.Files.Create(txCtx, tx, data.file); err != nil {
|
|
return fmt.Errorf("creating file record: %w", err)
|
|
}
|
|
|
|
// Delete any existing file_chunks and chunk_files for this file
|
|
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
|
|
return fmt.Errorf("deleting old file chunks: %w", err)
|
|
}
|
|
if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
|
|
return fmt.Errorf("deleting old chunk files: %w", err)
|
|
}
|
|
|
|
// Create file-chunk mappings
|
|
for i := range data.fileChunks {
|
|
if err := s.repos.FileChunks.Create(txCtx, tx, &data.fileChunks[i]); err != nil {
|
|
return fmt.Errorf("creating file chunk: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create chunk-file mappings
|
|
for i := range data.chunkFiles {
|
|
if err := s.repos.ChunkFiles.Create(txCtx, tx, &data.chunkFiles[i]); err != nil {
|
|
return fmt.Errorf("creating chunk file: %w", err)
|
|
}
|
|
}
|
|
|
|
// Add file to snapshot
|
|
if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID); err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// flushAllPending flushes all pending files to the database
|
|
func (s *Scanner) flushAllPending(ctx context.Context) error {
|
|
return s.flushPendingFiles(ctx)
|
|
}
|
|
|
|
// flushCompletedPendingFiles flushes only files whose chunks are all committed to DB
|
|
// Files with pending chunks are kept in the queue for later flushing
|
|
func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
|
|
flushStart := time.Now()
|
|
log.Debug("flushCompletedPendingFiles: starting")
|
|
|
|
// Partition pending files into those ready to flush and those still waiting
|
|
canFlush, stillPendingCount := s.partitionPendingByChunkStatus()
|
|
|
|
if len(canFlush) == 0 {
|
|
log.Debug("flushCompletedPendingFiles: nothing to flush")
|
|
return nil
|
|
}
|
|
|
|
log.Debug("Flushing completed files after blob finalize",
|
|
"files_to_flush", len(canFlush),
|
|
"files_still_pending", stillPendingCount)
|
|
|
|
// Collect all data for batch operations
|
|
allFiles, allFileIDs, allFileChunks, allChunkFiles := s.collectBatchFlushData(canFlush)
|
|
|
|
// Execute the batch flush in a single transaction
|
|
log.Debug("flushCompletedPendingFiles: starting transaction")
|
|
txStart := time.Now()
|
|
err := s.executeBatchFileFlush(ctx, allFiles, allFileIDs, allFileChunks, allChunkFiles)
|
|
log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart))
|
|
log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart))
|
|
return err
|
|
}
|
|
|
|
// partitionPendingByChunkStatus separates pending files into those whose chunks
|
|
// are all committed to DB (ready to flush) and those still waiting on pending chunks.
|
|
// Updates s.pendingFiles to contain only the still-pending files.
|
|
func (s *Scanner) partitionPendingByChunkStatus() (canFlush []pendingFileData, stillPendingCount int) {
|
|
log.Debug("flushCompletedPendingFiles: acquiring pendingFilesMu lock")
|
|
s.pendingFilesMu.Lock()
|
|
log.Debug("flushCompletedPendingFiles: acquired lock", "pending_files", len(s.pendingFiles))
|
|
|
|
var stillPending []pendingFileData
|
|
|
|
log.Debug("flushCompletedPendingFiles: checking which files can flush")
|
|
checkStart := time.Now()
|
|
for _, data := range s.pendingFiles {
|
|
allChunksCommitted := true
|
|
for _, fc := range data.fileChunks {
|
|
if s.isChunkPending(fc.ChunkHash.String()) {
|
|
allChunksCommitted = false
|
|
break
|
|
}
|
|
}
|
|
if allChunksCommitted {
|
|
canFlush = append(canFlush, data)
|
|
} else {
|
|
stillPending = append(stillPending, data)
|
|
}
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: check done", "duration", time.Since(checkStart), "can_flush", len(canFlush), "still_pending", len(stillPending))
|
|
|
|
s.pendingFiles = stillPending
|
|
s.pendingFilesMu.Unlock()
|
|
log.Debug("flushCompletedPendingFiles: released lock")
|
|
|
|
return canFlush, len(stillPending)
|
|
}
|
|
|
|
// collectBatchFlushData aggregates file records, IDs, file-chunk mappings, and chunk-file
|
|
// mappings from the given pending file data for efficient batch database operations
|
|
func (s *Scanner) collectBatchFlushData(canFlush []pendingFileData) ([]*database.File, []types.FileID, []database.FileChunk, []database.ChunkFile) {
|
|
log.Debug("flushCompletedPendingFiles: collecting data for batch ops")
|
|
collectStart := time.Now()
|
|
|
|
var allFileChunks []database.FileChunk
|
|
var allChunkFiles []database.ChunkFile
|
|
var allFileIDs []types.FileID
|
|
var allFiles []*database.File
|
|
|
|
for _, data := range canFlush {
|
|
allFileChunks = append(allFileChunks, data.fileChunks...)
|
|
allChunkFiles = append(allChunkFiles, data.chunkFiles...)
|
|
allFileIDs = append(allFileIDs, data.file.ID)
|
|
allFiles = append(allFiles, data.file)
|
|
}
|
|
|
|
log.Debug("flushCompletedPendingFiles: collected data",
|
|
"duration", time.Since(collectStart),
|
|
"file_chunks", len(allFileChunks),
|
|
"chunk_files", len(allChunkFiles),
|
|
"files", len(allFiles))
|
|
|
|
return allFiles, allFileIDs, allFileChunks, allChunkFiles
|
|
}
|
|
|
|
// executeBatchFileFlush writes all collected file data to the database in a single transaction,
|
|
// including deleting old mappings, creating file records, and adding snapshot associations
|
|
func (s *Scanner) executeBatchFileFlush(ctx context.Context, allFiles []*database.File, allFileIDs []types.FileID, allFileChunks []database.FileChunk, allChunkFiles []database.ChunkFile) error {
|
|
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
|
log.Debug("flushCompletedPendingFiles: inside transaction")
|
|
|
|
// Batch delete old file_chunks and chunk_files
|
|
log.Debug("flushCompletedPendingFiles: deleting old file_chunks")
|
|
opStart := time.Now()
|
|
if err := s.repos.FileChunks.DeleteByFileIDs(txCtx, tx, allFileIDs); err != nil {
|
|
return fmt.Errorf("batch deleting old file chunks: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: deleted file_chunks", "duration", time.Since(opStart))
|
|
|
|
log.Debug("flushCompletedPendingFiles: deleting old chunk_files")
|
|
opStart = time.Now()
|
|
if err := s.repos.ChunkFiles.DeleteByFileIDs(txCtx, tx, allFileIDs); err != nil {
|
|
return fmt.Errorf("batch deleting old chunk files: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: deleted chunk_files", "duration", time.Since(opStart))
|
|
|
|
// Batch create/update file records
|
|
log.Debug("flushCompletedPendingFiles: creating files")
|
|
opStart = time.Now()
|
|
if err := s.repos.Files.CreateBatch(txCtx, tx, allFiles); err != nil {
|
|
return fmt.Errorf("batch creating file records: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: created files", "duration", time.Since(opStart))
|
|
|
|
// Batch insert file_chunks
|
|
log.Debug("flushCompletedPendingFiles: inserting file_chunks")
|
|
opStart = time.Now()
|
|
if err := s.repos.FileChunks.CreateBatch(txCtx, tx, allFileChunks); err != nil {
|
|
return fmt.Errorf("batch creating file chunks: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: inserted file_chunks", "duration", time.Since(opStart))
|
|
|
|
// Batch insert chunk_files
|
|
log.Debug("flushCompletedPendingFiles: inserting chunk_files")
|
|
opStart = time.Now()
|
|
if err := s.repos.ChunkFiles.CreateBatch(txCtx, tx, allChunkFiles); err != nil {
|
|
return fmt.Errorf("batch creating chunk files: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: inserted chunk_files", "duration", time.Since(opStart))
|
|
|
|
// Batch add files to snapshot
|
|
log.Debug("flushCompletedPendingFiles: adding files to snapshot")
|
|
opStart = time.Now()
|
|
if err := s.repos.Snapshots.AddFilesByIDBatch(txCtx, tx, s.snapshotID, allFileIDs); err != nil {
|
|
return fmt.Errorf("batch adding files to snapshot: %w", err)
|
|
}
|
|
log.Debug("flushCompletedPendingFiles: added files to snapshot", "duration", time.Since(opStart))
|
|
|
|
log.Debug("flushCompletedPendingFiles: transaction complete")
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ScanPhaseResult contains the results of the scan phase
|
|
type ScanPhaseResult struct {
|
|
FilesToProcess []*FileToProcess
|
|
UnchangedFileIDs []types.FileID // IDs of unchanged files to associate with snapshot
|
|
}
|
|
|
|
// scanPhase performs the initial directory scan to identify files to process
|
|
// It uses the pre-loaded knownFiles map for fast change detection without DB queries
|
|
// It also populates existingFiles map for deletion detection
|
|
// Returns files needing processing and IDs of unchanged files for snapshot association
|
|
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) (*ScanPhaseResult, error) {
|
|
// Use known file count as estimate for progress (accurate for subsequent backups)
|
|
estimatedTotal := int64(len(knownFiles))
|
|
|
|
var filesToProcess []*FileToProcess
|
|
var unchangedFileIDs []types.FileID // Just IDs - no new records needed
|
|
var mu sync.Mutex
|
|
|
|
// Set up periodic status output
|
|
startTime := time.Now()
|
|
lastStatusTime := time.Now()
|
|
statusInterval := 15 * time.Second
|
|
var filesScanned int64
|
|
|
|
log.Debug("Starting directory walk", "path", path)
|
|
err := afero.Walk(s.fs, path, func(filePath string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
if s.skipErrors {
|
|
log.Error("ERROR: Failed to access file (skipping due to --skip-errors)", "path", filePath, "error", err)
|
|
_, _ = fmt.Fprintf(s.output, "ERROR: Failed to access %s: %v (skipping)\n", filePath, err)
|
|
return nil // Continue scanning
|
|
}
|
|
log.Debug("Error accessing filesystem entry", "path", filePath, "error", err)
|
|
return err
|
|
}
|
|
|
|
// Check context cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// Check exclude patterns - for directories, skip the entire subtree
|
|
if s.shouldExclude(filePath, path) {
|
|
if info.IsDir() {
|
|
return filepath.SkipDir
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Skip non-regular files for processing (but still count them)
|
|
if !info.Mode().IsRegular() {
|
|
return nil
|
|
}
|
|
|
|
// Track this file as existing (for deletion detection)
|
|
existingFiles[filePath] = struct{}{}
|
|
|
|
// Check file against in-memory map (no DB query!)
|
|
file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles)
|
|
|
|
mu.Lock()
|
|
if needsProcessing {
|
|
// New or changed file - will create record after processing
|
|
filesToProcess = append(filesToProcess, &FileToProcess{
|
|
Path: filePath,
|
|
FileInfo: info,
|
|
File: file,
|
|
})
|
|
} else if !file.ID.IsZero() {
|
|
// Unchanged file with existing ID - just need snapshot association
|
|
unchangedFileIDs = append(unchangedFileIDs, file.ID)
|
|
}
|
|
filesScanned++
|
|
changedCount := len(filesToProcess)
|
|
mu.Unlock()
|
|
|
|
// Update result stats
|
|
s.updateScanEntryStats(result, needsProcessing, info)
|
|
|
|
// Output periodic status
|
|
if time.Since(lastStatusTime) >= statusInterval {
|
|
s.printScanProgressLine(filesScanned, changedCount, estimatedTotal, startTime)
|
|
lastStatusTime = time.Now()
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &ScanPhaseResult{
|
|
FilesToProcess: filesToProcess,
|
|
UnchangedFileIDs: unchangedFileIDs,
|
|
}, nil
|
|
}
|
|
|
|
// updateScanEntryStats updates the scan result and progress reporter statistics
|
|
// for a single scanned file entry based on whether it needs processing
|
|
func (s *Scanner) updateScanEntryStats(result *ScanResult, needsProcessing bool, info os.FileInfo) {
|
|
if needsProcessing {
|
|
result.BytesScanned += info.Size()
|
|
if s.progress != nil {
|
|
s.progress.GetStats().BytesScanned.Add(info.Size())
|
|
}
|
|
} else {
|
|
result.FilesSkipped++
|
|
result.BytesSkipped += info.Size()
|
|
if s.progress != nil {
|
|
s.progress.GetStats().FilesSkipped.Add(1)
|
|
s.progress.GetStats().BytesSkipped.Add(info.Size())
|
|
}
|
|
}
|
|
result.FilesScanned++
|
|
if s.progress != nil {
|
|
s.progress.GetStats().FilesScanned.Add(1)
|
|
}
|
|
}
|
|
|
|
// printScanProgressLine prints a periodic progress line during the scan phase,
|
|
// showing files scanned, percentage complete (if estimate available), and ETA
|
|
func (s *Scanner) printScanProgressLine(filesScanned int64, changedCount int, estimatedTotal int64, startTime time.Time) {
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(filesScanned) / elapsed.Seconds()
|
|
|
|
if estimatedTotal > 0 {
|
|
// Show actual scanned vs estimate (may exceed estimate if files were added)
|
|
pct := float64(filesScanned) / float64(estimatedTotal) * 100
|
|
if pct > 100 {
|
|
pct = 100 // Cap at 100% for display
|
|
}
|
|
remaining := estimatedTotal - filesScanned
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
var eta time.Duration
|
|
if rate > 0 && remaining > 0 {
|
|
eta = time.Duration(float64(remaining)/rate) * time.Second
|
|
}
|
|
_, _ = fmt.Fprintf(s.output, "Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
|
|
formatNumber(int(filesScanned)),
|
|
pct,
|
|
formatNumber(changedCount),
|
|
rate,
|
|
elapsed.Round(time.Second))
|
|
if eta > 0 {
|
|
_, _ = fmt.Fprintf(s.output, ", ETA %s", eta.Round(time.Second))
|
|
}
|
|
_, _ = fmt.Fprintln(s.output)
|
|
} else {
|
|
// First backup - no estimate available
|
|
_, _ = fmt.Fprintf(s.output, "Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
|
|
formatNumber(int(filesScanned)),
|
|
formatNumber(changedCount),
|
|
rate,
|
|
elapsed.Round(time.Second))
|
|
}
|
|
}
|
|
|
|
// checkFileInMemory checks if a file needs processing using the in-memory map
|
|
// No database access is performed - this is purely CPU/memory work
|
|
func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) {
|
|
// Get file stats
|
|
stat, ok := info.Sys().(interface {
|
|
Uid() uint32
|
|
Gid() uint32
|
|
})
|
|
|
|
var uid, gid uint32
|
|
if ok {
|
|
uid = stat.Uid()
|
|
gid = stat.Gid()
|
|
}
|
|
|
|
// Check against in-memory map first to get existing ID if available
|
|
existingFile, exists := knownFiles[path]
|
|
|
|
// Create file record with ID set upfront
|
|
// For new files, generate UUID immediately so it's available for chunk associations
|
|
// For existing files, reuse the existing ID
|
|
var fileID types.FileID
|
|
if exists {
|
|
fileID = existingFile.ID
|
|
} else {
|
|
fileID = types.NewFileID()
|
|
}
|
|
|
|
file := &database.File{
|
|
ID: fileID,
|
|
Path: types.FilePath(path),
|
|
SourcePath: types.SourcePath(s.currentSourcePath), // Store source directory for restore path stripping
|
|
MTime: info.ModTime(),
|
|
Size: info.Size(),
|
|
Mode: uint32(info.Mode()),
|
|
UID: uid,
|
|
GID: gid,
|
|
}
|
|
|
|
// New file - needs processing
|
|
if !exists {
|
|
return file, true
|
|
}
|
|
|
|
// Check if file has changed
|
|
if existingFile.Size != file.Size ||
|
|
existingFile.MTime.Unix() != file.MTime.Unix() ||
|
|
existingFile.Mode != file.Mode ||
|
|
existingFile.UID != file.UID ||
|
|
existingFile.GID != file.GID {
|
|
return file, true
|
|
}
|
|
|
|
// File unchanged
|
|
return file, false
|
|
}
|
|
|
|
// batchAddFilesToSnapshot adds existing file IDs to the snapshot association table
|
|
// This is used for unchanged files that already have records in the database
|
|
func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []types.FileID) error {
|
|
const batchSize = 1000
|
|
|
|
startTime := time.Now()
|
|
lastStatusTime := time.Now()
|
|
statusInterval := 5 * time.Second
|
|
|
|
for i := 0; i < len(fileIDs); i += batchSize {
|
|
// Check context cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
end := i + batchSize
|
|
if end > len(fileIDs) {
|
|
end = len(fileIDs)
|
|
}
|
|
batch := fileIDs[i:end]
|
|
|
|
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
for _, fileID := range batch {
|
|
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, fileID); err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Periodic status
|
|
if time.Since(lastStatusTime) >= statusInterval {
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(end) / elapsed.Seconds()
|
|
pct := float64(end) / float64(len(fileIDs)) * 100
|
|
_, _ = fmt.Fprintf(s.output, "Associating files: %s/%s (%.1f%%), %.0f files/sec\n",
|
|
formatNumber(end), formatNumber(len(fileIDs)), pct, rate)
|
|
lastStatusTime = time.Now()
|
|
}
|
|
}
|
|
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(len(fileIDs)) / elapsed.Seconds()
|
|
_, _ = fmt.Fprintf(s.output, "Associated %s unchanged files in %s (%.0f files/sec)\n",
|
|
formatNumber(len(fileIDs)), elapsed.Round(time.Second), rate)
|
|
|
|
return nil
|
|
}
|
|
|
|
// processPhase processes the files that need backing up
|
|
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
|
|
// Calculate total bytes to process
|
|
var totalBytes int64
|
|
for _, f := range filesToProcess {
|
|
totalBytes += f.FileInfo.Size()
|
|
}
|
|
|
|
// Set up periodic status output
|
|
lastStatusTime := time.Now()
|
|
statusInterval := 15 * time.Second
|
|
startTime := time.Now()
|
|
filesProcessed := 0
|
|
var bytesProcessed int64
|
|
totalFiles := len(filesToProcess)
|
|
|
|
// Process each file
|
|
for _, fileToProcess := range filesToProcess {
|
|
// Update progress
|
|
if s.progress != nil {
|
|
s.progress.GetStats().CurrentFile.Store(fileToProcess.Path)
|
|
}
|
|
|
|
// Process file with error handling for deleted files and skip-errors mode
|
|
skipped, err := s.processFileWithErrorHandling(ctx, fileToProcess, result)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if skipped {
|
|
continue
|
|
}
|
|
|
|
// Update files processed counter
|
|
if s.progress != nil {
|
|
s.progress.GetStats().FilesProcessed.Add(1)
|
|
}
|
|
|
|
filesProcessed++
|
|
bytesProcessed += fileToProcess.FileInfo.Size()
|
|
|
|
// Output periodic status
|
|
if time.Since(lastStatusTime) >= statusInterval {
|
|
s.printProcessingProgress(filesProcessed, totalFiles, bytesProcessed, totalBytes, startTime)
|
|
lastStatusTime = time.Now()
|
|
}
|
|
}
|
|
|
|
// Finalize: flush packer, pending files, and handle local blobs
|
|
return s.finalizeProcessPhase(ctx, result)
|
|
}
|
|
|
|
// processFileWithErrorHandling wraps processFileStreaming with error recovery for
|
|
// deleted files and skip-errors mode. Returns (skipped, error).
|
|
func (s *Scanner) processFileWithErrorHandling(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) (bool, error) {
|
|
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
|
|
// Handle files that were deleted between scan and process phases
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path)
|
|
result.FilesSkipped++
|
|
return true, nil
|
|
}
|
|
// Skip file read errors if --skip-errors is enabled
|
|
if s.skipErrors {
|
|
log.Error("ERROR: Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err)
|
|
_, _ = fmt.Fprintf(s.output, "ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err)
|
|
result.FilesSkipped++
|
|
return true, nil
|
|
}
|
|
return false, fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// printProcessingProgress prints a periodic progress line during the process phase,
|
|
// showing files processed, bytes transferred, throughput, and ETA
|
|
func (s *Scanner) printProcessingProgress(filesProcessed, totalFiles int, bytesProcessed, totalBytes int64, startTime time.Time) {
|
|
elapsed := time.Since(startTime)
|
|
pct := float64(bytesProcessed) / float64(totalBytes) * 100
|
|
byteRate := float64(bytesProcessed) / elapsed.Seconds()
|
|
fileRate := float64(filesProcessed) / elapsed.Seconds()
|
|
|
|
// Calculate ETA based on bytes (more accurate than files)
|
|
remainingBytes := totalBytes - bytesProcessed
|
|
var eta time.Duration
|
|
if byteRate > 0 {
|
|
eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second
|
|
}
|
|
|
|
// Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s
|
|
_, _ = fmt.Fprintf(s.output, "Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s",
|
|
formatCompact(filesProcessed),
|
|
formatCompact(totalFiles),
|
|
humanize.Bytes(uint64(bytesProcessed)),
|
|
humanize.Bytes(uint64(totalBytes)),
|
|
pct,
|
|
humanize.Bytes(uint64(byteRate)),
|
|
fileRate,
|
|
elapsed.Round(time.Second))
|
|
if eta > 0 {
|
|
_, _ = fmt.Fprintf(s.output, ", ETA: %s", eta.Round(time.Second))
|
|
}
|
|
_, _ = fmt.Fprintln(s.output)
|
|
}
|
|
|
|
// finalizeProcessPhase flushes the packer, writes remaining pending files to the database,
|
|
// and handles local blob storage when no remote storage is configured
|
|
func (s *Scanner) finalizeProcessPhase(ctx context.Context, result *ScanResult) error {
|
|
// Final packer flush first - this commits remaining chunks to DB
|
|
// and handleBlobReady will flush files whose chunks are now committed
|
|
s.packerMu.Lock()
|
|
if err := s.packer.Flush(); err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("flushing packer: %w", err)
|
|
}
|
|
s.packerMu.Unlock()
|
|
|
|
// Flush any remaining pending files (e.g., files with only pre-existing chunks
|
|
// that didn't trigger a blob finalize)
|
|
if err := s.flushAllPending(ctx); err != nil {
|
|
return fmt.Errorf("flushing remaining pending files: %w", err)
|
|
}
|
|
|
|
// If no storage configured, store any remaining blobs locally
|
|
if s.storage == nil {
|
|
blobs := s.packer.GetFinishedBlobs()
|
|
for _, b := range blobs {
|
|
// Blob metadata is already stored incrementally during packing
|
|
// Just add the blob to the snapshot
|
|
blobID, err := types.ParseBlobID(b.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("parsing blob ID: %w", err)
|
|
}
|
|
err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, types.BlobHash(b.Hash))
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("storing blob metadata: %w", err)
|
|
}
|
|
}
|
|
result.BlobsCreated += len(blobs)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleBlobReady is called by the packer when a blob is finalized
|
|
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
|
startTime := time.Now().UTC()
|
|
finishedBlob := blobWithReader.FinishedBlob
|
|
|
|
if s.progress != nil {
|
|
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
|
|
s.progress.GetStats().BlobsCreated.Add(1)
|
|
}
|
|
|
|
ctx := s.scanCtx
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
|
blobExists, err := s.uploadBlobIfNeeded(ctx, blobPath, blobWithReader, startTime)
|
|
if err != nil {
|
|
s.cleanupBlobTempFile(blobWithReader)
|
|
return fmt.Errorf("uploading blob %s: %w", finishedBlob.Hash, err)
|
|
}
|
|
|
|
if err := s.recordBlobMetadata(ctx, finishedBlob, blobExists, startTime); err != nil {
|
|
s.cleanupBlobTempFile(blobWithReader)
|
|
return err
|
|
}
|
|
|
|
s.cleanupBlobTempFile(blobWithReader)
|
|
|
|
// Chunks from this blob are now committed to DB - remove from pending set
|
|
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
|
|
|
|
// Flush files whose chunks are now all committed
|
|
if err := s.flushCompletedPendingFiles(ctx); err != nil {
|
|
return fmt.Errorf("flushing completed files: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// uploadBlobIfNeeded uploads the blob to storage if it doesn't already exist, returns whether it existed
|
|
func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobWithReader *blob.BlobWithReader, startTime time.Time) (bool, error) {
|
|
finishedBlob := blobWithReader.FinishedBlob
|
|
|
|
// Check if blob already exists (deduplication after restart)
|
|
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
|
log.Info("Blob already exists in storage, skipping upload",
|
|
"hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
|
_, _ = fmt.Fprintf(s.output, "Blob exists: %s (%s, skipped upload)\n",
|
|
finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
|
return true, nil
|
|
}
|
|
|
|
progressCallback := s.makeUploadProgressCallback(ctx, finishedBlob)
|
|
|
|
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
|
log.Error("Failed to upload blob", "hash", finishedBlob.Hash, "error", err)
|
|
return false, fmt.Errorf("uploading blob to storage: %w", err)
|
|
}
|
|
|
|
uploadDuration := time.Since(startTime)
|
|
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
|
|
|
_, _ = fmt.Fprintf(s.output, "Blob stored: %s (%s, %s/sec, %s)\n",
|
|
finishedBlob.Hash[:12]+"...",
|
|
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
|
humanize.Bytes(uint64(uploadSpeedBps)),
|
|
uploadDuration.Round(time.Millisecond))
|
|
|
|
log.Info("Successfully uploaded blob to storage",
|
|
"path", blobPath,
|
|
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
|
"duration", uploadDuration,
|
|
"speed", humanize.SI(uploadSpeedBps*8, "bps"))
|
|
|
|
if s.progress != nil {
|
|
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
|
stats := s.progress.GetStats()
|
|
stats.BlobsUploaded.Add(1)
|
|
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// makeUploadProgressCallback creates a progress callback for blob uploads
|
|
func (s *Scanner) makeUploadProgressCallback(ctx context.Context, finishedBlob *blob.FinishedBlob) func(int64) error {
|
|
lastProgressTime := time.Now()
|
|
lastProgressBytes := int64(0)
|
|
|
|
return func(uploaded int64) error {
|
|
now := time.Now()
|
|
elapsed := now.Sub(lastProgressTime).Seconds()
|
|
if elapsed > 0.5 {
|
|
bytesSinceLastUpdate := uploaded - lastProgressBytes
|
|
speed := float64(bytesSinceLastUpdate) / elapsed
|
|
if s.progress != nil {
|
|
s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed)
|
|
}
|
|
lastProgressTime = now
|
|
lastProgressBytes = uploaded
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// recordBlobMetadata stores blob upload metadata in the database
|
|
func (s *Scanner) recordBlobMetadata(ctx context.Context, finishedBlob *blob.FinishedBlob, blobExists bool, startTime time.Time) error {
|
|
finishedBlobID, err := types.ParseBlobID(finishedBlob.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("parsing finished blob ID: %w", err)
|
|
}
|
|
|
|
uploadDuration := time.Since(startTime)
|
|
|
|
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
|
if err := s.repos.Blobs.UpdateUploaded(txCtx, tx, finishedBlob.ID); err != nil {
|
|
return fmt.Errorf("updating blob upload timestamp: %w", err)
|
|
}
|
|
|
|
if err := s.repos.Snapshots.AddBlob(txCtx, tx, s.snapshotID, finishedBlobID, types.BlobHash(finishedBlob.Hash)); err != nil {
|
|
return fmt.Errorf("adding blob to snapshot: %w", err)
|
|
}
|
|
|
|
if !blobExists {
|
|
upload := &database.Upload{
|
|
BlobHash: finishedBlob.Hash,
|
|
SnapshotID: s.snapshotID,
|
|
UploadedAt: startTime,
|
|
Size: finishedBlob.Compressed,
|
|
DurationMs: uploadDuration.Milliseconds(),
|
|
}
|
|
if err := s.repos.Uploads.Create(txCtx, tx, upload); err != nil {
|
|
return fmt.Errorf("recording upload metrics: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// cleanupBlobTempFile closes and removes the blob's temporary file
|
|
func (s *Scanner) cleanupBlobTempFile(blobWithReader *blob.BlobWithReader) {
|
|
if blobWithReader.TempFile != nil {
|
|
tempName := blobWithReader.TempFile.Name()
|
|
if err := blobWithReader.TempFile.Close(); err != nil {
|
|
log.Fatal("Failed to close temp file", "file", tempName, "error", err)
|
|
}
|
|
if err := s.fs.Remove(tempName); err != nil {
|
|
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// streamingChunkInfo tracks chunk metadata collected during streaming
|
|
type streamingChunkInfo struct {
|
|
fileChunk database.FileChunk
|
|
offset int64
|
|
size int64
|
|
}
|
|
|
|
// processFileStreaming processes a file by streaming chunks directly to the packer
|
|
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
|
|
file, err := s.fs.Open(fileToProcess.Path)
|
|
if err != nil {
|
|
return fmt.Errorf("opening file: %w", err)
|
|
}
|
|
defer func() { _ = file.Close() }()
|
|
|
|
var chunks []streamingChunkInfo
|
|
chunkIndex := 0
|
|
|
|
fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
chunkExists := s.chunkExists(chunk.Hash)
|
|
if !chunkExists {
|
|
s.packer.AddPendingChunk(chunk.Hash, chunk.Size)
|
|
s.addKnownChunk(chunk.Hash)
|
|
s.addPendingChunkHash(chunk.Hash)
|
|
}
|
|
|
|
chunks = append(chunks, streamingChunkInfo{
|
|
fileChunk: database.FileChunk{
|
|
FileID: fileToProcess.File.ID,
|
|
Idx: chunkIndex,
|
|
ChunkHash: types.ChunkHash(chunk.Hash),
|
|
},
|
|
offset: chunk.Offset,
|
|
size: chunk.Size,
|
|
})
|
|
|
|
s.updateChunkStats(chunkExists, chunk.Size, result)
|
|
|
|
if !chunkExists {
|
|
if err := s.addChunkToPacker(chunk); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
chunk.Data = nil
|
|
chunkIndex++
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("chunking file: %w", err)
|
|
}
|
|
|
|
log.Debug("Completed snapshotting file",
|
|
"path", fileToProcess.Path, "file_hash", fileHash, "chunks", len(chunks))
|
|
|
|
s.queueFileForBatchInsert(ctx, fileToProcess, chunks)
|
|
return nil
|
|
}
|
|
|
|
// updateChunkStats updates scan result and progress stats for a processed chunk
|
|
func (s *Scanner) updateChunkStats(chunkExists bool, chunkSize int64, result *ScanResult) {
|
|
if chunkExists {
|
|
result.FilesSkipped++
|
|
result.BytesSkipped += chunkSize
|
|
if s.progress != nil {
|
|
s.progress.GetStats().BytesSkipped.Add(chunkSize)
|
|
}
|
|
} else {
|
|
result.ChunksCreated++
|
|
result.BytesScanned += chunkSize
|
|
if s.progress != nil {
|
|
s.progress.GetStats().ChunksCreated.Add(1)
|
|
s.progress.GetStats().BytesProcessed.Add(chunkSize)
|
|
s.progress.UpdateChunkingActivity()
|
|
}
|
|
}
|
|
}
|
|
|
|
// addChunkToPacker adds a chunk to the blob packer, finalizing the current blob if needed
|
|
func (s *Scanner) addChunkToPacker(chunk chunker.Chunk) error {
|
|
s.packerMu.Lock()
|
|
err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data})
|
|
if err == blob.ErrBlobSizeLimitExceeded {
|
|
if err := s.packer.FinalizeBlob(); err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("finalizing blob: %w", err)
|
|
}
|
|
if err := s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data}); err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("adding chunk after finalize: %w", err)
|
|
}
|
|
} else if err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("adding chunk to packer: %w", err)
|
|
}
|
|
s.packerMu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// queueFileForBatchInsert builds file/chunk associations and queues the file for batch DB insert
|
|
func (s *Scanner) queueFileForBatchInsert(ctx context.Context, fileToProcess *FileToProcess, chunks []streamingChunkInfo) {
|
|
fileChunks := make([]database.FileChunk, len(chunks))
|
|
chunkFiles := make([]database.ChunkFile, len(chunks))
|
|
for i, ci := range chunks {
|
|
fileChunks[i] = database.FileChunk{
|
|
FileID: fileToProcess.File.ID,
|
|
Idx: ci.fileChunk.Idx,
|
|
ChunkHash: ci.fileChunk.ChunkHash,
|
|
}
|
|
chunkFiles[i] = database.ChunkFile{
|
|
ChunkHash: ci.fileChunk.ChunkHash,
|
|
FileID: fileToProcess.File.ID,
|
|
FileOffset: ci.offset,
|
|
Length: ci.size,
|
|
}
|
|
}
|
|
|
|
s.addPendingFile(ctx, pendingFileData{
|
|
file: fileToProcess.File,
|
|
fileChunks: fileChunks,
|
|
chunkFiles: chunkFiles,
|
|
})
|
|
}
|
|
|
|
// GetProgress returns the progress reporter for this scanner
|
|
func (s *Scanner) GetProgress() *ProgressReporter {
|
|
return s.progress
|
|
}
|
|
|
|
// detectDeletedFilesFromMap finds files that existed in previous snapshots but no longer exist
|
|
// Uses pre-loaded maps to avoid any filesystem or database access
|
|
func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[string]*database.File, existingFiles map[string]struct{}, result *ScanResult) error {
|
|
if len(knownFiles) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Check each known file against the enumerated set (no filesystem access needed)
|
|
for path, file := range knownFiles {
|
|
// Check context cancellation periodically
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// Check if the file exists in our enumerated set
|
|
if _, exists := existingFiles[path]; !exists {
|
|
// File has been deleted
|
|
result.FilesDeleted++
|
|
result.BytesDeleted += file.Size
|
|
log.Debug("Detected deleted file", "path", path, "size", file.Size)
|
|
}
|
|
}
|
|
|
|
if result.FilesDeleted > 0 {
|
|
_, _ = fmt.Fprintf(s.output, "Found %s deleted files\n", formatNumber(result.FilesDeleted))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// compileExcludePatterns compiles the exclude patterns into glob matchers
|
|
func compileExcludePatterns(patterns []string) []compiledPattern {
|
|
var compiled []compiledPattern
|
|
for _, p := range patterns {
|
|
if p == "" {
|
|
continue
|
|
}
|
|
|
|
// Check if pattern is anchored (starts with /)
|
|
anchored := strings.HasPrefix(p, "/")
|
|
pattern := p
|
|
if anchored {
|
|
pattern = p[1:] // Remove leading /
|
|
}
|
|
|
|
// Remove trailing slash if present (directory indicator)
|
|
pattern = strings.TrimSuffix(pattern, "/")
|
|
|
|
// Compile the glob pattern
|
|
// For patterns without path separators, we need to match them as components
|
|
// e.g., ".git" should match ".git" anywhere in the path
|
|
g, err := glob.Compile(pattern, '/')
|
|
if err != nil {
|
|
log.Warn("Invalid exclude pattern, skipping", "pattern", p, "error", err)
|
|
continue
|
|
}
|
|
|
|
compiled = append(compiled, compiledPattern{
|
|
pattern: g,
|
|
anchored: anchored,
|
|
original: p,
|
|
})
|
|
}
|
|
return compiled
|
|
}
|
|
|
|
// shouldExclude checks if a path should be excluded based on exclude patterns
|
|
// filePath is the full path to the file
|
|
// rootPath is the root of the backup source directory
|
|
func (s *Scanner) shouldExclude(filePath, rootPath string) bool {
|
|
if len(s.compiledExclude) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Get the relative path from root
|
|
relPath, err := filepath.Rel(rootPath, filePath)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
// Never exclude the root directory itself
|
|
if relPath == "." {
|
|
return false
|
|
}
|
|
|
|
// Normalize path separators
|
|
relPath = filepath.ToSlash(relPath)
|
|
|
|
// Check each pattern
|
|
for _, cp := range s.compiledExclude {
|
|
if cp.anchored {
|
|
// Anchored pattern: must match from the root
|
|
// Match the relative path directly
|
|
if cp.pattern.Match(relPath) {
|
|
return true
|
|
}
|
|
// Also check if any prefix of the path matches (for directory patterns)
|
|
parts := strings.Split(relPath, "/")
|
|
for i := 1; i <= len(parts); i++ {
|
|
prefix := strings.Join(parts[:i], "/")
|
|
if cp.pattern.Match(prefix) {
|
|
return true
|
|
}
|
|
}
|
|
} else {
|
|
// Unanchored pattern: can match anywhere in path
|
|
// Check the full relative path
|
|
if cp.pattern.Match(relPath) {
|
|
return true
|
|
}
|
|
// Check each path component and subpath
|
|
parts := strings.Split(relPath, "/")
|
|
for i := range parts {
|
|
// Match individual component (e.g., ".git" matches ".git" directory)
|
|
if cp.pattern.Match(parts[i]) {
|
|
return true
|
|
}
|
|
// Match subpath from this component onwards
|
|
subpath := strings.Join(parts[i:], "/")
|
|
if cp.pattern.Match(subpath) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// formatNumber formats a number with comma separators
|
|
func formatNumber(n int) string {
|
|
if n < 1000 {
|
|
return fmt.Sprintf("%d", n)
|
|
}
|
|
return humanize.Comma(int64(n))
|
|
}
|
|
|
|
// formatCompact formats a number compactly with k/M suffixes (e.g., 5.7k, 1.2M)
|
|
func formatCompact(n int) string {
|
|
if n < 1000 {
|
|
return fmt.Sprintf("%d", n)
|
|
}
|
|
if n < 10000 {
|
|
return fmt.Sprintf("%.1fk", float64(n)/1000)
|
|
}
|
|
if n < 1000000 {
|
|
return fmt.Sprintf("%.0fk", float64(n)/1000)
|
|
}
|
|
return fmt.Sprintf("%.1fM", float64(n)/1000000)
|
|
}
|