Remove the separate enumerateFiles() function that was doing a full directory walk using Readdir() which calls stat() on every file. Instead, build the existingFiles map during the scan phase walk, and detect deleted files afterward. This eliminates one full filesystem traversal, significantly speeding up the scan phase for large directories.
873 lines
25 KiB
Go
873 lines
25 KiB
Go
package snapshot
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/vaultik/internal/blob"
|
|
"git.eeqj.de/sneak/vaultik/internal/chunker"
|
|
"git.eeqj.de/sneak/vaultik/internal/database"
|
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
|
"git.eeqj.de/sneak/vaultik/internal/storage"
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/spf13/afero"
|
|
)
|
|
|
|
// FileToProcess holds information about a file that needs processing
|
|
type FileToProcess struct {
|
|
Path string
|
|
FileInfo os.FileInfo
|
|
File *database.File
|
|
}
|
|
|
|
// Scanner scans directories and populates the database with file and chunk information
|
|
type Scanner struct {
|
|
fs afero.Fs
|
|
chunker *chunker.Chunker
|
|
packer *blob.Packer
|
|
repos *database.Repositories
|
|
storage storage.Storer
|
|
maxBlobSize int64
|
|
compressionLevel int
|
|
ageRecipient string
|
|
snapshotID string // Current snapshot being processed
|
|
progress *ProgressReporter
|
|
|
|
// Mutex for coordinating blob creation
|
|
packerMu sync.Mutex // Blocks chunk production during blob creation
|
|
|
|
// Context for cancellation
|
|
scanCtx context.Context
|
|
}
|
|
|
|
// ScannerConfig contains configuration for the scanner
|
|
type ScannerConfig struct {
|
|
FS afero.Fs
|
|
ChunkSize int64
|
|
Repositories *database.Repositories
|
|
Storage storage.Storer
|
|
MaxBlobSize int64
|
|
CompressionLevel int
|
|
AgeRecipients []string // Optional, empty means no encryption
|
|
EnableProgress bool // Enable progress reporting
|
|
}
|
|
|
|
// ScanResult contains the results of a scan operation
|
|
type ScanResult struct {
|
|
FilesScanned int
|
|
FilesSkipped int
|
|
FilesDeleted int
|
|
BytesScanned int64
|
|
BytesSkipped int64
|
|
BytesDeleted int64
|
|
ChunksCreated int
|
|
BlobsCreated int
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
}
|
|
|
|
// NewScanner creates a new scanner instance
|
|
func NewScanner(cfg ScannerConfig) *Scanner {
|
|
// Create encryptor (required for blob packing)
|
|
if len(cfg.AgeRecipients) == 0 {
|
|
log.Error("No age recipients configured - encryption is required")
|
|
return nil
|
|
}
|
|
|
|
// Create blob packer with encryption
|
|
packerCfg := blob.PackerConfig{
|
|
MaxBlobSize: cfg.MaxBlobSize,
|
|
CompressionLevel: cfg.CompressionLevel,
|
|
Recipients: cfg.AgeRecipients,
|
|
Repositories: cfg.Repositories,
|
|
Fs: cfg.FS,
|
|
}
|
|
packer, err := blob.NewPacker(packerCfg)
|
|
if err != nil {
|
|
log.Error("Failed to create packer", "error", err)
|
|
return nil
|
|
}
|
|
|
|
var progress *ProgressReporter
|
|
if cfg.EnableProgress {
|
|
progress = NewProgressReporter()
|
|
}
|
|
|
|
return &Scanner{
|
|
fs: cfg.FS,
|
|
chunker: chunker.NewChunker(cfg.ChunkSize),
|
|
packer: packer,
|
|
repos: cfg.Repositories,
|
|
storage: cfg.Storage,
|
|
maxBlobSize: cfg.MaxBlobSize,
|
|
compressionLevel: cfg.CompressionLevel,
|
|
ageRecipient: strings.Join(cfg.AgeRecipients, ","),
|
|
progress: progress,
|
|
}
|
|
}
|
|
|
|
// Scan scans a directory and populates the database
|
|
func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*ScanResult, error) {
|
|
s.snapshotID = snapshotID
|
|
s.scanCtx = ctx
|
|
result := &ScanResult{
|
|
StartTime: time.Now().UTC(),
|
|
}
|
|
|
|
// Set blob handler for concurrent upload
|
|
if s.storage != nil {
|
|
log.Debug("Setting blob handler for storage uploads")
|
|
s.packer.SetBlobHandler(s.handleBlobReady)
|
|
} else {
|
|
log.Debug("No storage configured, blobs will not be uploaded")
|
|
}
|
|
|
|
// Start progress reporting if enabled
|
|
if s.progress != nil {
|
|
s.progress.Start()
|
|
defer s.progress.Stop()
|
|
}
|
|
|
|
// Phase 0: Load known files from database into memory for fast lookup
|
|
fmt.Println("Loading known files from database...")
|
|
knownFiles, err := s.loadKnownFiles(ctx, path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("loading known files: %w", err)
|
|
}
|
|
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
|
|
|
|
// Phase 1: Scan directory, collect files to process, and track existing files
|
|
// (builds existingFiles map during walk to avoid double traversal)
|
|
log.Info("Phase 1/3: Scanning directory structure")
|
|
existingFiles := make(map[string]struct{})
|
|
filesToProcess, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scan phase failed: %w", err)
|
|
}
|
|
|
|
// Phase 1b: Detect deleted files by comparing DB against scanned files
|
|
if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil {
|
|
return nil, fmt.Errorf("detecting deleted files: %w", err)
|
|
}
|
|
|
|
// Calculate total size to process
|
|
var totalSizeToProcess int64
|
|
for _, file := range filesToProcess {
|
|
totalSizeToProcess += file.FileInfo.Size()
|
|
}
|
|
|
|
// Update progress with total size and file count
|
|
if s.progress != nil {
|
|
s.progress.SetTotalSize(totalSizeToProcess)
|
|
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
|
|
}
|
|
|
|
log.Info("Phase 1 complete",
|
|
"total_files", len(filesToProcess),
|
|
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
|
|
"files_skipped", result.FilesSkipped,
|
|
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
|
|
|
|
// Print scan summary
|
|
fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)",
|
|
formatNumber(result.FilesScanned),
|
|
humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)),
|
|
formatNumber(len(filesToProcess)),
|
|
humanize.Bytes(uint64(totalSizeToProcess)))
|
|
if result.FilesDeleted > 0 {
|
|
fmt.Printf(", %s deleted (%s)",
|
|
formatNumber(result.FilesDeleted),
|
|
humanize.Bytes(uint64(result.BytesDeleted)))
|
|
}
|
|
fmt.Println()
|
|
|
|
// Phase 2: Process files and create chunks
|
|
if len(filesToProcess) > 0 {
|
|
fmt.Printf("Processing %s files...\n", formatNumber(len(filesToProcess)))
|
|
log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)")
|
|
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
|
|
return nil, fmt.Errorf("process phase failed: %w", err)
|
|
}
|
|
} else {
|
|
fmt.Printf("No files need processing. Creating metadata-only snapshot.\n")
|
|
log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)")
|
|
}
|
|
|
|
// Get final stats from packer
|
|
blobs := s.packer.GetFinishedBlobs()
|
|
result.BlobsCreated += len(blobs)
|
|
|
|
// Query database for actual blob count created during this snapshot
|
|
// The database is authoritative, especially for concurrent blob uploads
|
|
// We count uploads rather than all snapshot_blobs to get only NEW blobs
|
|
if s.snapshotID != "" {
|
|
uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID)
|
|
if err != nil {
|
|
log.Warn("Failed to query upload count from database", "error", err)
|
|
} else {
|
|
result.BlobsCreated = int(uploadCount)
|
|
}
|
|
}
|
|
|
|
result.EndTime = time.Now().UTC()
|
|
return result, nil
|
|
}
|
|
|
|
// loadKnownFiles loads all known files from the database into a map for fast lookup
|
|
// This avoids per-file database queries during the scan phase
|
|
func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*database.File, error) {
|
|
files, err := s.repos.Files.ListByPrefix(ctx, path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing files by prefix: %w", err)
|
|
}
|
|
|
|
result := make(map[string]*database.File, len(files))
|
|
for _, f := range files {
|
|
result[f.Path] = f
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// scanPhase performs the initial directory scan to identify files to process
|
|
// It uses the pre-loaded knownFiles map for fast change detection without DB queries
|
|
// It also populates existingFiles map for deletion detection
|
|
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) ([]*FileToProcess, error) {
|
|
// Use known file count as estimate for progress (accurate for subsequent backups)
|
|
estimatedTotal := int64(len(knownFiles))
|
|
|
|
var filesToProcess []*FileToProcess
|
|
var allFiles []*database.File // Collect all files for batch insert
|
|
var mu sync.Mutex
|
|
|
|
// Set up periodic status output
|
|
startTime := time.Now()
|
|
lastStatusTime := time.Now()
|
|
statusInterval := 15 * time.Second
|
|
var filesScanned int64
|
|
|
|
log.Debug("Starting directory walk", "path", path)
|
|
err := afero.Walk(s.fs, path, func(filePath string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
log.Debug("Error accessing filesystem entry", "path", filePath, "error", err)
|
|
return err
|
|
}
|
|
|
|
// Check context cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// Skip non-regular files for processing (but still count them)
|
|
if !info.Mode().IsRegular() {
|
|
return nil
|
|
}
|
|
|
|
// Track this file as existing (for deletion detection)
|
|
existingFiles[filePath] = struct{}{}
|
|
|
|
// Check file against in-memory map (no DB query!)
|
|
file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles)
|
|
|
|
mu.Lock()
|
|
allFiles = append(allFiles, file)
|
|
if needsProcessing && info.Size() > 0 {
|
|
filesToProcess = append(filesToProcess, &FileToProcess{
|
|
Path: filePath,
|
|
FileInfo: info,
|
|
File: file,
|
|
})
|
|
}
|
|
filesScanned++
|
|
changedCount := len(filesToProcess)
|
|
mu.Unlock()
|
|
|
|
// Update result stats
|
|
if needsProcessing {
|
|
result.BytesScanned += info.Size()
|
|
} else {
|
|
result.FilesSkipped++
|
|
result.BytesSkipped += info.Size()
|
|
}
|
|
result.FilesScanned++
|
|
|
|
// Output periodic status
|
|
if time.Since(lastStatusTime) >= statusInterval {
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(filesScanned) / elapsed.Seconds()
|
|
|
|
// Build status line - use estimate if available (not first backup)
|
|
if estimatedTotal > 0 {
|
|
// Show actual scanned vs estimate (may exceed estimate if files were added)
|
|
pct := float64(filesScanned) / float64(estimatedTotal) * 100
|
|
if pct > 100 {
|
|
pct = 100 // Cap at 100% for display
|
|
}
|
|
remaining := estimatedTotal - filesScanned
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
var eta time.Duration
|
|
if rate > 0 && remaining > 0 {
|
|
eta = time.Duration(float64(remaining)/rate) * time.Second
|
|
}
|
|
fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
|
|
formatNumber(int(filesScanned)),
|
|
pct,
|
|
formatNumber(changedCount),
|
|
rate,
|
|
elapsed.Round(time.Second))
|
|
if eta > 0 {
|
|
fmt.Printf(", ETA %s", eta.Round(time.Second))
|
|
}
|
|
fmt.Println()
|
|
} else {
|
|
// First backup - no estimate available
|
|
fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
|
|
formatNumber(int(filesScanned)),
|
|
formatNumber(changedCount),
|
|
rate,
|
|
elapsed.Round(time.Second))
|
|
}
|
|
lastStatusTime = time.Now()
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Batch insert all files and snapshot associations
|
|
if len(allFiles) > 0 {
|
|
fmt.Printf("Writing %s file records to database...\n", formatNumber(len(allFiles)))
|
|
if err := s.batchInsertFiles(ctx, allFiles); err != nil {
|
|
return nil, fmt.Errorf("batch inserting files: %w", err)
|
|
}
|
|
}
|
|
|
|
return filesToProcess, nil
|
|
}
|
|
|
|
// checkFileInMemory checks if a file needs processing using the in-memory map
|
|
// No database access is performed - this is purely CPU/memory work
|
|
func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) {
|
|
// Get file stats
|
|
stat, ok := info.Sys().(interface {
|
|
Uid() uint32
|
|
Gid() uint32
|
|
})
|
|
|
|
var uid, gid uint32
|
|
if ok {
|
|
uid = stat.Uid()
|
|
gid = stat.Gid()
|
|
}
|
|
|
|
// Create file record
|
|
file := &database.File{
|
|
Path: path,
|
|
MTime: info.ModTime(),
|
|
CTime: info.ModTime(), // afero doesn't provide ctime
|
|
Size: info.Size(),
|
|
Mode: uint32(info.Mode()),
|
|
UID: uid,
|
|
GID: gid,
|
|
}
|
|
|
|
// Check against in-memory map
|
|
existingFile, exists := knownFiles[path]
|
|
if !exists {
|
|
// New file
|
|
return file, true
|
|
}
|
|
|
|
// Reuse existing ID
|
|
file.ID = existingFile.ID
|
|
|
|
// Check if file has changed
|
|
if existingFile.Size != file.Size ||
|
|
existingFile.MTime.Unix() != file.MTime.Unix() ||
|
|
existingFile.Mode != file.Mode ||
|
|
existingFile.UID != file.UID ||
|
|
existingFile.GID != file.GID {
|
|
return file, true
|
|
}
|
|
|
|
// File unchanged
|
|
return file, false
|
|
}
|
|
|
|
// batchInsertFiles inserts files and snapshot associations in batches
|
|
func (s *Scanner) batchInsertFiles(ctx context.Context, files []*database.File) error {
|
|
const batchSize = 1000
|
|
|
|
startTime := time.Now()
|
|
lastStatusTime := time.Now()
|
|
statusInterval := 5 * time.Second
|
|
|
|
for i := 0; i < len(files); i += batchSize {
|
|
// Check context cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
end := i + batchSize
|
|
if end > len(files) {
|
|
end = len(files)
|
|
}
|
|
batch := files[i:end]
|
|
|
|
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
for _, file := range batch {
|
|
if err := s.repos.Files.Create(ctx, tx, file); err != nil {
|
|
return fmt.Errorf("creating file %s: %w", file.Path, err)
|
|
}
|
|
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, file.ID); err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Periodic status
|
|
if time.Since(lastStatusTime) >= statusInterval {
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(end) / elapsed.Seconds()
|
|
pct := float64(end) / float64(len(files)) * 100
|
|
fmt.Printf("Database write: %s/%s files (%.1f%%), %.0f files/sec\n",
|
|
formatNumber(end), formatNumber(len(files)), pct, rate)
|
|
lastStatusTime = time.Now()
|
|
}
|
|
}
|
|
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(len(files)) / elapsed.Seconds()
|
|
fmt.Printf("Database write complete: %s files in %s (%.0f files/sec)\n",
|
|
formatNumber(len(files)), elapsed.Round(time.Second), rate)
|
|
|
|
return nil
|
|
}
|
|
|
|
// processPhase processes the files that need backing up
|
|
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
|
|
// Set up periodic status output
|
|
lastStatusTime := time.Now()
|
|
statusInterval := 15 * time.Second
|
|
startTime := time.Now()
|
|
filesProcessed := 0
|
|
totalFiles := len(filesToProcess)
|
|
|
|
// Process each file
|
|
for _, fileToProcess := range filesToProcess {
|
|
// Update progress
|
|
if s.progress != nil {
|
|
s.progress.GetStats().CurrentFile.Store(fileToProcess.Path)
|
|
}
|
|
|
|
// Process file in streaming fashion
|
|
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
|
|
return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
|
|
}
|
|
|
|
// Update files processed counter
|
|
if s.progress != nil {
|
|
s.progress.GetStats().FilesProcessed.Add(1)
|
|
}
|
|
|
|
filesProcessed++
|
|
|
|
// Output periodic status
|
|
if time.Since(lastStatusTime) >= statusInterval {
|
|
elapsed := time.Since(startTime)
|
|
remaining := totalFiles - filesProcessed
|
|
var eta time.Duration
|
|
if filesProcessed > 0 {
|
|
eta = elapsed / time.Duration(filesProcessed) * time.Duration(remaining)
|
|
}
|
|
|
|
fmt.Printf("Progress: %s/%s files", formatNumber(filesProcessed), formatNumber(totalFiles))
|
|
if remaining > 0 && eta > 0 {
|
|
fmt.Printf(", ETA: %s", eta.Round(time.Second))
|
|
}
|
|
fmt.Println()
|
|
lastStatusTime = time.Now()
|
|
}
|
|
}
|
|
|
|
// Final flush (outside any transaction)
|
|
s.packerMu.Lock()
|
|
if err := s.packer.Flush(); err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("flushing packer: %w", err)
|
|
}
|
|
s.packerMu.Unlock()
|
|
|
|
// If no storage configured, store any remaining blobs locally
|
|
if s.storage == nil {
|
|
blobs := s.packer.GetFinishedBlobs()
|
|
for _, b := range blobs {
|
|
// Blob metadata is already stored incrementally during packing
|
|
// Just add the blob to the snapshot
|
|
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
|
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, b.ID, b.Hash)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("storing blob metadata: %w", err)
|
|
}
|
|
}
|
|
result.BlobsCreated += len(blobs)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleBlobReady is called by the packer when a blob is finalized
|
|
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
|
startTime := time.Now().UTC()
|
|
finishedBlob := blobWithReader.FinishedBlob
|
|
|
|
// Report upload start
|
|
if s.progress != nil {
|
|
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
|
|
}
|
|
|
|
// Upload to storage first (without holding any locks)
|
|
// Use scan context for cancellation support
|
|
ctx := s.scanCtx
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
// Track bytes uploaded for accurate speed calculation
|
|
lastProgressTime := time.Now()
|
|
lastProgressBytes := int64(0)
|
|
|
|
progressCallback := func(uploaded int64) error {
|
|
// Calculate instantaneous speed
|
|
now := time.Now()
|
|
elapsed := now.Sub(lastProgressTime).Seconds()
|
|
if elapsed > 0.5 { // Update speed every 0.5 seconds
|
|
bytesSinceLastUpdate := uploaded - lastProgressBytes
|
|
speed := float64(bytesSinceLastUpdate) / elapsed
|
|
|
|
if s.progress != nil {
|
|
s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed)
|
|
}
|
|
|
|
lastProgressTime = now
|
|
lastProgressBytes = uploaded
|
|
}
|
|
|
|
// Check for cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Create sharded path: blobs/ca/fe/cafebabe...
|
|
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
|
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
|
return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err)
|
|
}
|
|
|
|
uploadDuration := time.Since(startTime)
|
|
|
|
// Log upload stats
|
|
uploadSpeed := float64(finishedBlob.Compressed) * 8 / uploadDuration.Seconds() // bits per second
|
|
log.Info("Successfully uploaded blob to storage",
|
|
"path", blobPath,
|
|
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
|
"duration", uploadDuration,
|
|
"speed", humanize.SI(uploadSpeed, "bps"))
|
|
|
|
// Report upload complete
|
|
if s.progress != nil {
|
|
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
|
}
|
|
|
|
// Update progress
|
|
if s.progress != nil {
|
|
stats := s.progress.GetStats()
|
|
stats.BlobsUploaded.Add(1)
|
|
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
|
stats.BlobsCreated.Add(1)
|
|
}
|
|
|
|
// Store metadata in database (after upload is complete)
|
|
dbCtx := s.scanCtx
|
|
if dbCtx == nil {
|
|
dbCtx = context.Background()
|
|
}
|
|
err := s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error {
|
|
// Update blob upload timestamp
|
|
if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil {
|
|
return fmt.Errorf("updating blob upload timestamp: %w", err)
|
|
}
|
|
|
|
// Add the blob to the snapshot
|
|
if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlob.ID, finishedBlob.Hash); err != nil {
|
|
return fmt.Errorf("adding blob to snapshot: %w", err)
|
|
}
|
|
|
|
// Record upload metrics
|
|
upload := &database.Upload{
|
|
BlobHash: finishedBlob.Hash,
|
|
SnapshotID: s.snapshotID,
|
|
UploadedAt: startTime,
|
|
Size: finishedBlob.Compressed,
|
|
DurationMs: uploadDuration.Milliseconds(),
|
|
}
|
|
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
|
|
return fmt.Errorf("recording upload metrics: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
// Cleanup temp file if needed
|
|
if blobWithReader.TempFile != nil {
|
|
tempName := blobWithReader.TempFile.Name()
|
|
if err := blobWithReader.TempFile.Close(); err != nil {
|
|
log.Fatal("Failed to close temp file", "file", tempName, "error", err)
|
|
}
|
|
if err := s.fs.Remove(tempName); err != nil {
|
|
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// processFileStreaming processes a file by streaming chunks directly to the packer
|
|
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
|
|
// Open the file
|
|
file, err := s.fs.Open(fileToProcess.Path)
|
|
if err != nil {
|
|
return fmt.Errorf("opening file: %w", err)
|
|
}
|
|
defer func() { _ = file.Close() }()
|
|
|
|
// We'll collect file chunks for database storage
|
|
// but process them for packing as we go
|
|
type chunkInfo struct {
|
|
fileChunk database.FileChunk
|
|
offset int64
|
|
size int64
|
|
}
|
|
var chunks []chunkInfo
|
|
chunkIndex := 0
|
|
|
|
// Process chunks in streaming fashion and get full file hash
|
|
fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
|
|
// Check for cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
log.Debug("Processing content-defined chunk from file",
|
|
"file", fileToProcess.Path,
|
|
"chunk_index", chunkIndex,
|
|
"hash", chunk.Hash,
|
|
"size", chunk.Size)
|
|
|
|
// Check if chunk already exists (outside of transaction)
|
|
existing, err := s.repos.Chunks.GetByHash(ctx, chunk.Hash)
|
|
if err != nil {
|
|
return fmt.Errorf("checking chunk existence: %w", err)
|
|
}
|
|
chunkExists := (existing != nil)
|
|
|
|
// Store chunk if new
|
|
if !chunkExists {
|
|
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
|
dbChunk := &database.Chunk{
|
|
ChunkHash: chunk.Hash,
|
|
Size: chunk.Size,
|
|
}
|
|
if err := s.repos.Chunks.Create(txCtx, tx, dbChunk); err != nil {
|
|
return fmt.Errorf("creating chunk: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("storing chunk: %w", err)
|
|
}
|
|
}
|
|
|
|
// Track file chunk association for later storage
|
|
chunks = append(chunks, chunkInfo{
|
|
fileChunk: database.FileChunk{
|
|
FileID: fileToProcess.File.ID,
|
|
Idx: chunkIndex,
|
|
ChunkHash: chunk.Hash,
|
|
},
|
|
offset: chunk.Offset,
|
|
size: chunk.Size,
|
|
})
|
|
|
|
// Update stats
|
|
if chunkExists {
|
|
result.FilesSkipped++ // Track as skipped for now
|
|
result.BytesSkipped += chunk.Size
|
|
if s.progress != nil {
|
|
s.progress.GetStats().BytesSkipped.Add(chunk.Size)
|
|
}
|
|
} else {
|
|
result.ChunksCreated++
|
|
result.BytesScanned += chunk.Size
|
|
if s.progress != nil {
|
|
s.progress.GetStats().ChunksCreated.Add(1)
|
|
s.progress.GetStats().BytesProcessed.Add(chunk.Size)
|
|
s.progress.UpdateChunkingActivity()
|
|
}
|
|
}
|
|
|
|
// Add chunk to packer immediately (streaming)
|
|
// This happens outside the database transaction
|
|
if !chunkExists {
|
|
s.packerMu.Lock()
|
|
err := s.packer.AddChunk(&blob.ChunkRef{
|
|
Hash: chunk.Hash,
|
|
Data: chunk.Data,
|
|
})
|
|
if err == blob.ErrBlobSizeLimitExceeded {
|
|
// Finalize current blob and retry
|
|
if err := s.packer.FinalizeBlob(); err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("finalizing blob: %w", err)
|
|
}
|
|
// Retry adding the chunk
|
|
if err := s.packer.AddChunk(&blob.ChunkRef{
|
|
Hash: chunk.Hash,
|
|
Data: chunk.Data,
|
|
}); err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("adding chunk after finalize: %w", err)
|
|
}
|
|
} else if err != nil {
|
|
s.packerMu.Unlock()
|
|
return fmt.Errorf("adding chunk to packer: %w", err)
|
|
}
|
|
s.packerMu.Unlock()
|
|
}
|
|
|
|
// Clear chunk data from memory immediately after use
|
|
chunk.Data = nil
|
|
|
|
chunkIndex++
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("chunking file: %w", err)
|
|
}
|
|
|
|
log.Debug("Completed snapshotting file",
|
|
"path", fileToProcess.Path,
|
|
"file_hash", fileHash,
|
|
"chunks", len(chunks))
|
|
|
|
// Store file-chunk associations and chunk-file mappings in database
|
|
err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
|
// First, delete all existing file_chunks and chunk_files for this file
|
|
// This ensures old chunks are no longer associated when file content changes
|
|
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil {
|
|
return fmt.Errorf("deleting old file chunks: %w", err)
|
|
}
|
|
if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil {
|
|
return fmt.Errorf("deleting old chunk files: %w", err)
|
|
}
|
|
|
|
for _, ci := range chunks {
|
|
// Create file-chunk mapping
|
|
if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil {
|
|
return fmt.Errorf("creating file chunk: %w", err)
|
|
}
|
|
|
|
// Create chunk-file mapping
|
|
chunkFile := &database.ChunkFile{
|
|
ChunkHash: ci.fileChunk.ChunkHash,
|
|
FileID: fileToProcess.File.ID,
|
|
FileOffset: ci.offset,
|
|
Length: ci.size,
|
|
}
|
|
if err := s.repos.ChunkFiles.Create(txCtx, tx, chunkFile); err != nil {
|
|
return fmt.Errorf("creating chunk file: %w", err)
|
|
}
|
|
}
|
|
|
|
// Add file to snapshot
|
|
if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, fileToProcess.File.ID); err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// GetProgress returns the progress reporter for this scanner
|
|
func (s *Scanner) GetProgress() *ProgressReporter {
|
|
return s.progress
|
|
}
|
|
|
|
// detectDeletedFilesFromMap finds files that existed in previous snapshots but no longer exist
|
|
// Uses pre-loaded maps to avoid any filesystem or database access
|
|
func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[string]*database.File, existingFiles map[string]struct{}, result *ScanResult) error {
|
|
if len(knownFiles) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Check each known file against the enumerated set (no filesystem access needed)
|
|
for path, file := range knownFiles {
|
|
// Check context cancellation periodically
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// Check if the file exists in our enumerated set
|
|
if _, exists := existingFiles[path]; !exists {
|
|
// File has been deleted
|
|
result.FilesDeleted++
|
|
result.BytesDeleted += file.Size
|
|
log.Debug("Detected deleted file", "path", path, "size", file.Size)
|
|
}
|
|
}
|
|
|
|
if result.FilesDeleted > 0 {
|
|
fmt.Printf("Found %s deleted files\n", formatNumber(result.FilesDeleted))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// formatNumber formats a number with comma separators
|
|
func formatNumber(n int) string {
|
|
if n < 1000 {
|
|
return fmt.Sprintf("%d", n)
|
|
}
|
|
return humanize.Comma(int64(n))
|
|
}
|