vaultik/internal/snapshot/scanner.go
sneak 43a69c2cfb Fix FK constraint errors in batched file insertion
Generate file UUIDs upfront in checkFileInMemory() rather than
deferring to Files.Create(). This ensures file_chunks and chunk_files
records have valid FileID values when constructed during file
processing, before the batch insert transaction.

Root cause: For new files, file.ID was empty when building the
fileChunks and chunkFiles slices. The ID was only generated later
in Files.Create(), but by then the slices already had empty FileID
values, causing FK constraint failures.

Also adds PROCESS.md documenting the snapshot creation lifecycle,
database transactions, and FK dependency ordering.
2025-12-19 19:48:48 +07:00

1053 lines
31 KiB
Go

package snapshot
import (
"context"
"database/sql"
"fmt"
"os"
"strings"
"sync"
"time"
"git.eeqj.de/sneak/vaultik/internal/blob"
"git.eeqj.de/sneak/vaultik/internal/chunker"
"git.eeqj.de/sneak/vaultik/internal/database"
"git.eeqj.de/sneak/vaultik/internal/log"
"git.eeqj.de/sneak/vaultik/internal/storage"
"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/spf13/afero"
)
// FileToProcess holds information about a file that needs processing
type FileToProcess struct {
Path string
FileInfo os.FileInfo
File *database.File
}
// pendingFileData holds all data needed to commit a file to the database
type pendingFileData struct {
file *database.File
fileChunks []database.FileChunk
chunkFiles []database.ChunkFile
}
// Scanner scans directories and populates the database with file and chunk information
type Scanner struct {
fs afero.Fs
chunker *chunker.Chunker
packer *blob.Packer
repos *database.Repositories
storage storage.Storer
maxBlobSize int64
compressionLevel int
ageRecipient string
snapshotID string // Current snapshot being processed
progress *ProgressReporter
// In-memory cache of known chunk hashes for fast existence checks
knownChunks map[string]struct{}
knownChunksMu sync.RWMutex
// Pending file data buffer for batch insertion
pendingFiles []pendingFileData
pendingFilesMu sync.Mutex
// Mutex for coordinating blob creation
packerMu sync.Mutex // Blocks chunk production during blob creation
// Context for cancellation
scanCtx context.Context
}
const (
// Batch size for file database operations
fileBatchSize = 100
)
// ScannerConfig contains configuration for the scanner
type ScannerConfig struct {
FS afero.Fs
ChunkSize int64
Repositories *database.Repositories
Storage storage.Storer
MaxBlobSize int64
CompressionLevel int
AgeRecipients []string // Optional, empty means no encryption
EnableProgress bool // Enable progress reporting
}
// ScanResult contains the results of a scan operation
type ScanResult struct {
FilesScanned int
FilesSkipped int
FilesDeleted int
BytesScanned int64
BytesSkipped int64
BytesDeleted int64
ChunksCreated int
BlobsCreated int
StartTime time.Time
EndTime time.Time
}
// NewScanner creates a new scanner instance
func NewScanner(cfg ScannerConfig) *Scanner {
// Create encryptor (required for blob packing)
if len(cfg.AgeRecipients) == 0 {
log.Error("No age recipients configured - encryption is required")
return nil
}
// Create blob packer with encryption
packerCfg := blob.PackerConfig{
MaxBlobSize: cfg.MaxBlobSize,
CompressionLevel: cfg.CompressionLevel,
Recipients: cfg.AgeRecipients,
Repositories: cfg.Repositories,
Fs: cfg.FS,
}
packer, err := blob.NewPacker(packerCfg)
if err != nil {
log.Error("Failed to create packer", "error", err)
return nil
}
var progress *ProgressReporter
if cfg.EnableProgress {
progress = NewProgressReporter()
}
return &Scanner{
fs: cfg.FS,
chunker: chunker.NewChunker(cfg.ChunkSize),
packer: packer,
repos: cfg.Repositories,
storage: cfg.Storage,
maxBlobSize: cfg.MaxBlobSize,
compressionLevel: cfg.CompressionLevel,
ageRecipient: strings.Join(cfg.AgeRecipients, ","),
progress: progress,
}
}
// Scan scans a directory and populates the database
func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*ScanResult, error) {
s.snapshotID = snapshotID
s.scanCtx = ctx
result := &ScanResult{
StartTime: time.Now().UTC(),
}
// Set blob handler for concurrent upload
if s.storage != nil {
log.Debug("Setting blob handler for storage uploads")
s.packer.SetBlobHandler(s.handleBlobReady)
} else {
log.Debug("No storage configured, blobs will not be uploaded")
}
// Start progress reporting if enabled
if s.progress != nil {
s.progress.Start()
defer s.progress.Stop()
}
// Phase 0: Load known files and chunks from database into memory for fast lookup
fmt.Println("Loading known files from database...")
knownFiles, err := s.loadKnownFiles(ctx, path)
if err != nil {
return nil, fmt.Errorf("loading known files: %w", err)
}
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
fmt.Println("Loading known chunks from database...")
if err := s.loadKnownChunks(ctx); err != nil {
return nil, fmt.Errorf("loading known chunks: %w", err)
}
fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
// Phase 1: Scan directory, collect files to process, and track existing files
// (builds existingFiles map during walk to avoid double traversal)
log.Info("Phase 1/3: Scanning directory structure")
existingFiles := make(map[string]struct{})
scanResult, err := s.scanPhase(ctx, path, result, existingFiles, knownFiles)
if err != nil {
return nil, fmt.Errorf("scan phase failed: %w", err)
}
filesToProcess := scanResult.FilesToProcess
// Phase 1b: Detect deleted files by comparing DB against scanned files
if err := s.detectDeletedFilesFromMap(ctx, knownFiles, existingFiles, result); err != nil {
return nil, fmt.Errorf("detecting deleted files: %w", err)
}
// Phase 1c: Associate unchanged files with this snapshot (no new records needed)
if len(scanResult.UnchangedFileIDs) > 0 {
fmt.Printf("Associating %s unchanged files with snapshot...\n", formatNumber(len(scanResult.UnchangedFileIDs)))
if err := s.batchAddFilesToSnapshot(ctx, scanResult.UnchangedFileIDs); err != nil {
return nil, fmt.Errorf("associating unchanged files: %w", err)
}
}
// Calculate total size to process
var totalSizeToProcess int64
for _, file := range filesToProcess {
totalSizeToProcess += file.FileInfo.Size()
}
// Update progress with total size and file count
if s.progress != nil {
s.progress.SetTotalSize(totalSizeToProcess)
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
}
log.Info("Phase 1 complete",
"total_files", len(filesToProcess),
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
"files_skipped", result.FilesSkipped,
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
// Print scan summary
fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)",
formatNumber(result.FilesScanned),
humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)),
formatNumber(len(filesToProcess)),
humanize.Bytes(uint64(totalSizeToProcess)))
if result.FilesDeleted > 0 {
fmt.Printf(", %s deleted (%s)",
formatNumber(result.FilesDeleted),
humanize.Bytes(uint64(result.BytesDeleted)))
}
fmt.Println()
// Phase 2: Process files and create chunks
if len(filesToProcess) > 0 {
fmt.Printf("Processing %s files...\n", formatNumber(len(filesToProcess)))
log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)")
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
return nil, fmt.Errorf("process phase failed: %w", err)
}
} else {
fmt.Printf("No files need processing. Creating metadata-only snapshot.\n")
log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)")
}
// Get final stats from packer
blobs := s.packer.GetFinishedBlobs()
result.BlobsCreated += len(blobs)
// Query database for actual blob count created during this snapshot
// The database is authoritative, especially for concurrent blob uploads
// We count uploads rather than all snapshot_blobs to get only NEW blobs
if s.snapshotID != "" {
uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID)
if err != nil {
log.Warn("Failed to query upload count from database", "error", err)
} else {
result.BlobsCreated = int(uploadCount)
}
}
result.EndTime = time.Now().UTC()
return result, nil
}
// loadKnownFiles loads all known files from the database into a map for fast lookup
// This avoids per-file database queries during the scan phase
func (s *Scanner) loadKnownFiles(ctx context.Context, path string) (map[string]*database.File, error) {
files, err := s.repos.Files.ListByPrefix(ctx, path)
if err != nil {
return nil, fmt.Errorf("listing files by prefix: %w", err)
}
result := make(map[string]*database.File, len(files))
for _, f := range files {
result[f.Path] = f
}
return result, nil
}
// loadKnownChunks loads all known chunk hashes from the database into a map for fast lookup
// This avoids per-chunk database queries during file processing
func (s *Scanner) loadKnownChunks(ctx context.Context) error {
chunks, err := s.repos.Chunks.List(ctx)
if err != nil {
return fmt.Errorf("listing chunks: %w", err)
}
s.knownChunksMu.Lock()
s.knownChunks = make(map[string]struct{}, len(chunks))
for _, c := range chunks {
s.knownChunks[c.ChunkHash] = struct{}{}
}
s.knownChunksMu.Unlock()
return nil
}
// chunkExists checks if a chunk hash exists in the in-memory cache
func (s *Scanner) chunkExists(hash string) bool {
s.knownChunksMu.RLock()
_, exists := s.knownChunks[hash]
s.knownChunksMu.RUnlock()
return exists
}
// addKnownChunk adds a chunk hash to the in-memory cache
func (s *Scanner) addKnownChunk(hash string) {
s.knownChunksMu.Lock()
s.knownChunks[hash] = struct{}{}
s.knownChunksMu.Unlock()
}
// addPendingFile adds a file to the pending buffer and flushes if needed
func (s *Scanner) addPendingFile(ctx context.Context, data pendingFileData) error {
s.pendingFilesMu.Lock()
s.pendingFiles = append(s.pendingFiles, data)
needsFlush := len(s.pendingFiles) >= fileBatchSize
s.pendingFilesMu.Unlock()
if needsFlush {
return s.flushPendingFiles(ctx)
}
return nil
}
// flushPendingFiles writes all pending files to the database in a single transaction
func (s *Scanner) flushPendingFiles(ctx context.Context) error {
s.pendingFilesMu.Lock()
files := s.pendingFiles
s.pendingFiles = nil
s.pendingFilesMu.Unlock()
if len(files) == 0 {
return nil
}
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
for _, data := range files {
// Create or update the file record
if err := s.repos.Files.Create(txCtx, tx, data.file); err != nil {
return fmt.Errorf("creating file record: %w", err)
}
// Delete any existing file_chunks and chunk_files for this file
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
return fmt.Errorf("deleting old file chunks: %w", err)
}
if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
return fmt.Errorf("deleting old chunk files: %w", err)
}
// Create file-chunk mappings
for i := range data.fileChunks {
if err := s.repos.FileChunks.Create(txCtx, tx, &data.fileChunks[i]); err != nil {
return fmt.Errorf("creating file chunk: %w", err)
}
}
// Create chunk-file mappings
for i := range data.chunkFiles {
if err := s.repos.ChunkFiles.Create(txCtx, tx, &data.chunkFiles[i]); err != nil {
return fmt.Errorf("creating chunk file: %w", err)
}
}
// Add file to snapshot
if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID); err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
}
return nil
})
}
// flushAllPending flushes all pending files to the database
func (s *Scanner) flushAllPending(ctx context.Context) error {
return s.flushPendingFiles(ctx)
}
// ScanPhaseResult contains the results of the scan phase
type ScanPhaseResult struct {
FilesToProcess []*FileToProcess
UnchangedFileIDs []string // IDs of unchanged files to associate with snapshot
}
// scanPhase performs the initial directory scan to identify files to process
// It uses the pre-loaded knownFiles map for fast change detection without DB queries
// It also populates existingFiles map for deletion detection
// Returns files needing processing and IDs of unchanged files for snapshot association
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}, knownFiles map[string]*database.File) (*ScanPhaseResult, error) {
// Use known file count as estimate for progress (accurate for subsequent backups)
estimatedTotal := int64(len(knownFiles))
var filesToProcess []*FileToProcess
var unchangedFileIDs []string // Just IDs - no new records needed
var mu sync.Mutex
// Set up periodic status output
startTime := time.Now()
lastStatusTime := time.Now()
statusInterval := 15 * time.Second
var filesScanned int64
log.Debug("Starting directory walk", "path", path)
err := afero.Walk(s.fs, path, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
log.Debug("Error accessing filesystem entry", "path", filePath, "error", err)
return err
}
// Check context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Skip non-regular files for processing (but still count them)
if !info.Mode().IsRegular() {
return nil
}
// Track this file as existing (for deletion detection)
existingFiles[filePath] = struct{}{}
// Check file against in-memory map (no DB query!)
file, needsProcessing := s.checkFileInMemory(filePath, info, knownFiles)
mu.Lock()
if needsProcessing {
// New or changed file - will create record after processing
filesToProcess = append(filesToProcess, &FileToProcess{
Path: filePath,
FileInfo: info,
File: file,
})
} else if file.ID != "" {
// Unchanged file with existing ID - just need snapshot association
unchangedFileIDs = append(unchangedFileIDs, file.ID)
}
filesScanned++
changedCount := len(filesToProcess)
mu.Unlock()
// Update result stats
if needsProcessing {
result.BytesScanned += info.Size()
} else {
result.FilesSkipped++
result.BytesSkipped += info.Size()
}
result.FilesScanned++
// Output periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime)
rate := float64(filesScanned) / elapsed.Seconds()
// Build status line - use estimate if available (not first backup)
if estimatedTotal > 0 {
// Show actual scanned vs estimate (may exceed estimate if files were added)
pct := float64(filesScanned) / float64(estimatedTotal) * 100
if pct > 100 {
pct = 100 // Cap at 100% for display
}
remaining := estimatedTotal - filesScanned
if remaining < 0 {
remaining = 0
}
var eta time.Duration
if rate > 0 && remaining > 0 {
eta = time.Duration(float64(remaining)/rate) * time.Second
}
fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
formatNumber(int(filesScanned)),
pct,
formatNumber(changedCount),
rate,
elapsed.Round(time.Second))
if eta > 0 {
fmt.Printf(", ETA %s", eta.Round(time.Second))
}
fmt.Println()
} else {
// First backup - no estimate available
fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
formatNumber(int(filesScanned)),
formatNumber(changedCount),
rate,
elapsed.Round(time.Second))
}
lastStatusTime = time.Now()
}
return nil
})
if err != nil {
return nil, err
}
return &ScanPhaseResult{
FilesToProcess: filesToProcess,
UnchangedFileIDs: unchangedFileIDs,
}, nil
}
// checkFileInMemory checks if a file needs processing using the in-memory map
// No database access is performed - this is purely CPU/memory work
func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) {
// Get file stats
stat, ok := info.Sys().(interface {
Uid() uint32
Gid() uint32
})
var uid, gid uint32
if ok {
uid = stat.Uid()
gid = stat.Gid()
}
// Check against in-memory map first to get existing ID if available
existingFile, exists := knownFiles[path]
// Create file record with ID set upfront
// For new files, generate UUID immediately so it's available for chunk associations
// For existing files, reuse the existing ID
var fileID string
if exists {
fileID = existingFile.ID
} else {
fileID = uuid.New().String()
}
file := &database.File{
ID: fileID,
Path: path,
MTime: info.ModTime(),
CTime: info.ModTime(), // afero doesn't provide ctime
Size: info.Size(),
Mode: uint32(info.Mode()),
UID: uid,
GID: gid,
}
// New file - needs processing
if !exists {
return file, true
}
// Check if file has changed
if existingFile.Size != file.Size ||
existingFile.MTime.Unix() != file.MTime.Unix() ||
existingFile.Mode != file.Mode ||
existingFile.UID != file.UID ||
existingFile.GID != file.GID {
return file, true
}
// File unchanged
return file, false
}
// batchAddFilesToSnapshot adds existing file IDs to the snapshot association table
// This is used for unchanged files that already have records in the database
func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []string) error {
const batchSize = 1000
startTime := time.Now()
lastStatusTime := time.Now()
statusInterval := 5 * time.Second
for i := 0; i < len(fileIDs); i += batchSize {
// Check context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
end := i + batchSize
if end > len(fileIDs) {
end = len(fileIDs)
}
batch := fileIDs[i:end]
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
for _, fileID := range batch {
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, fileID); err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
}
return nil
})
if err != nil {
return err
}
// Periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime)
rate := float64(end) / elapsed.Seconds()
pct := float64(end) / float64(len(fileIDs)) * 100
fmt.Printf("Associating files: %s/%s (%.1f%%), %.0f files/sec\n",
formatNumber(end), formatNumber(len(fileIDs)), pct, rate)
lastStatusTime = time.Now()
}
}
elapsed := time.Since(startTime)
rate := float64(len(fileIDs)) / elapsed.Seconds()
fmt.Printf("Associated %s unchanged files in %s (%.0f files/sec)\n",
formatNumber(len(fileIDs)), elapsed.Round(time.Second), rate)
return nil
}
// processPhase processes the files that need backing up
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
// Calculate total bytes to process
var totalBytes int64
for _, f := range filesToProcess {
totalBytes += f.FileInfo.Size()
}
// Set up periodic status output
lastStatusTime := time.Now()
statusInterval := 15 * time.Second
startTime := time.Now()
filesProcessed := 0
var bytesProcessed int64
totalFiles := len(filesToProcess)
// Process each file
for _, fileToProcess := range filesToProcess {
// Update progress
if s.progress != nil {
s.progress.GetStats().CurrentFile.Store(fileToProcess.Path)
}
// Process file in streaming fashion
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
}
// Update files processed counter
if s.progress != nil {
s.progress.GetStats().FilesProcessed.Add(1)
}
filesProcessed++
bytesProcessed += fileToProcess.FileInfo.Size()
// Output periodic status
if time.Since(lastStatusTime) >= statusInterval {
elapsed := time.Since(startTime)
pct := float64(bytesProcessed) / float64(totalBytes) * 100
byteRate := float64(bytesProcessed) / elapsed.Seconds()
fileRate := float64(filesProcessed) / elapsed.Seconds()
// Calculate ETA based on bytes (more accurate than files)
remainingBytes := totalBytes - bytesProcessed
var eta time.Duration
if byteRate > 0 {
eta = time.Duration(float64(remainingBytes)/byteRate) * time.Second
}
// Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s
fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s",
formatCompact(filesProcessed),
formatCompact(totalFiles),
humanize.Bytes(uint64(bytesProcessed)),
humanize.Bytes(uint64(totalBytes)),
pct,
humanize.Bytes(uint64(byteRate)),
fileRate,
elapsed.Round(time.Second))
if eta > 0 {
fmt.Printf(", ETA: %s", eta.Round(time.Second))
}
fmt.Println()
lastStatusTime = time.Now()
}
}
// Flush any remaining pending chunks and files to database
if err := s.flushAllPending(ctx); err != nil {
return fmt.Errorf("flushing pending database operations: %w", err)
}
// Final flush (outside any transaction)
s.packerMu.Lock()
if err := s.packer.Flush(); err != nil {
s.packerMu.Unlock()
return fmt.Errorf("flushing packer: %w", err)
}
s.packerMu.Unlock()
// If no storage configured, store any remaining blobs locally
if s.storage == nil {
blobs := s.packer.GetFinishedBlobs()
for _, b := range blobs {
// Blob metadata is already stored incrementally during packing
// Just add the blob to the snapshot
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, b.ID, b.Hash)
})
if err != nil {
return fmt.Errorf("storing blob metadata: %w", err)
}
}
result.BlobsCreated += len(blobs)
}
return nil
}
// handleBlobReady is called by the packer when a blob is finalized
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
startTime := time.Now().UTC()
finishedBlob := blobWithReader.FinishedBlob
// Report upload start
if s.progress != nil {
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
}
// Upload to storage first (without holding any locks)
// Use scan context for cancellation support
ctx := s.scanCtx
if ctx == nil {
ctx = context.Background()
}
// Track bytes uploaded for accurate speed calculation
lastProgressTime := time.Now()
lastProgressBytes := int64(0)
progressCallback := func(uploaded int64) error {
// Calculate instantaneous speed
now := time.Now()
elapsed := now.Sub(lastProgressTime).Seconds()
if elapsed > 0.5 { // Update speed every 0.5 seconds
bytesSinceLastUpdate := uploaded - lastProgressBytes
speed := float64(bytesSinceLastUpdate) / elapsed
if s.progress != nil {
s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed)
}
lastProgressTime = now
lastProgressBytes = uploaded
}
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
// Create sharded path: blobs/ca/fe/cafebabe...
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err)
}
uploadDuration := time.Since(startTime)
// Calculate upload speed
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
// Print blob stored message
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
finishedBlob.Hash[:12]+"...",
humanize.Bytes(uint64(finishedBlob.Compressed)),
humanize.Bytes(uint64(uploadSpeedBps)),
uploadDuration.Round(time.Millisecond))
// Log upload stats
uploadSpeedBits := uploadSpeedBps * 8 // bits per second
log.Info("Successfully uploaded blob to storage",
"path", blobPath,
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
"duration", uploadDuration,
"speed", humanize.SI(uploadSpeedBits, "bps"))
// Report upload complete
if s.progress != nil {
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
}
// Update progress
if s.progress != nil {
stats := s.progress.GetStats()
stats.BlobsUploaded.Add(1)
stats.BytesUploaded.Add(finishedBlob.Compressed)
stats.BlobsCreated.Add(1)
}
// Store metadata in database (after upload is complete)
dbCtx := s.scanCtx
if dbCtx == nil {
dbCtx = context.Background()
}
err := s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error {
// Update blob upload timestamp
if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil {
return fmt.Errorf("updating blob upload timestamp: %w", err)
}
// Add the blob to the snapshot
if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlob.ID, finishedBlob.Hash); err != nil {
return fmt.Errorf("adding blob to snapshot: %w", err)
}
// Record upload metrics
upload := &database.Upload{
BlobHash: finishedBlob.Hash,
SnapshotID: s.snapshotID,
UploadedAt: startTime,
Size: finishedBlob.Compressed,
DurationMs: uploadDuration.Milliseconds(),
}
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
return fmt.Errorf("recording upload metrics: %w", err)
}
return nil
})
// Cleanup temp file if needed
if blobWithReader.TempFile != nil {
tempName := blobWithReader.TempFile.Name()
if err := blobWithReader.TempFile.Close(); err != nil {
log.Fatal("Failed to close temp file", "file", tempName, "error", err)
}
if err := s.fs.Remove(tempName); err != nil {
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
}
}
return err
}
// processFileStreaming processes a file by streaming chunks directly to the packer
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
// Open the file
file, err := s.fs.Open(fileToProcess.Path)
if err != nil {
return fmt.Errorf("opening file: %w", err)
}
defer func() { _ = file.Close() }()
// We'll collect file chunks for database storage
// but process them for packing as we go
type chunkInfo struct {
fileChunk database.FileChunk
offset int64
size int64
}
var chunks []chunkInfo
chunkIndex := 0
// Process chunks in streaming fashion and get full file hash
fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
log.Debug("Processing content-defined chunk from file",
"file", fileToProcess.Path,
"chunk_index", chunkIndex,
"hash", chunk.Hash,
"size", chunk.Size)
// Check if chunk already exists (fast in-memory lookup)
chunkExists := s.chunkExists(chunk.Hash)
// Store chunk in database if new (must happen before packer.AddChunk
// because packer creates blob_chunk entries that reference chunks)
if !chunkExists {
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
dbChunk := &database.Chunk{
ChunkHash: chunk.Hash,
Size: chunk.Size,
}
return s.repos.Chunks.Create(txCtx, tx, dbChunk)
})
if err != nil {
return fmt.Errorf("creating chunk: %w", err)
}
// Add to in-memory cache for fast duplicate detection
s.addKnownChunk(chunk.Hash)
}
// Track file chunk association for later storage
chunks = append(chunks, chunkInfo{
fileChunk: database.FileChunk{
FileID: fileToProcess.File.ID,
Idx: chunkIndex,
ChunkHash: chunk.Hash,
},
offset: chunk.Offset,
size: chunk.Size,
})
// Update stats
if chunkExists {
result.FilesSkipped++ // Track as skipped for now
result.BytesSkipped += chunk.Size
if s.progress != nil {
s.progress.GetStats().BytesSkipped.Add(chunk.Size)
}
} else {
result.ChunksCreated++
result.BytesScanned += chunk.Size
if s.progress != nil {
s.progress.GetStats().ChunksCreated.Add(1)
s.progress.GetStats().BytesProcessed.Add(chunk.Size)
s.progress.UpdateChunkingActivity()
}
}
// Add chunk to packer immediately (streaming)
// This happens outside the database transaction
if !chunkExists {
s.packerMu.Lock()
err := s.packer.AddChunk(&blob.ChunkRef{
Hash: chunk.Hash,
Data: chunk.Data,
})
if err == blob.ErrBlobSizeLimitExceeded {
// Finalize current blob and retry
if err := s.packer.FinalizeBlob(); err != nil {
s.packerMu.Unlock()
return fmt.Errorf("finalizing blob: %w", err)
}
// Retry adding the chunk
if err := s.packer.AddChunk(&blob.ChunkRef{
Hash: chunk.Hash,
Data: chunk.Data,
}); err != nil {
s.packerMu.Unlock()
return fmt.Errorf("adding chunk after finalize: %w", err)
}
} else if err != nil {
s.packerMu.Unlock()
return fmt.Errorf("adding chunk to packer: %w", err)
}
s.packerMu.Unlock()
}
// Clear chunk data from memory immediately after use
chunk.Data = nil
chunkIndex++
return nil
})
if err != nil {
return fmt.Errorf("chunking file: %w", err)
}
log.Debug("Completed snapshotting file",
"path", fileToProcess.Path,
"file_hash", fileHash,
"chunks", len(chunks))
// Build file data for batch insertion
// Update chunk associations with the file ID
fileChunks := make([]database.FileChunk, len(chunks))
chunkFiles := make([]database.ChunkFile, len(chunks))
for i, ci := range chunks {
fileChunks[i] = database.FileChunk{
FileID: fileToProcess.File.ID,
Idx: ci.fileChunk.Idx,
ChunkHash: ci.fileChunk.ChunkHash,
}
chunkFiles[i] = database.ChunkFile{
ChunkHash: ci.fileChunk.ChunkHash,
FileID: fileToProcess.File.ID,
FileOffset: ci.offset,
Length: ci.size,
}
}
// Queue file for batch insertion
return s.addPendingFile(ctx, pendingFileData{
file: fileToProcess.File,
fileChunks: fileChunks,
chunkFiles: chunkFiles,
})
}
// GetProgress returns the progress reporter for this scanner
func (s *Scanner) GetProgress() *ProgressReporter {
return s.progress
}
// detectDeletedFilesFromMap finds files that existed in previous snapshots but no longer exist
// Uses pre-loaded maps to avoid any filesystem or database access
func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[string]*database.File, existingFiles map[string]struct{}, result *ScanResult) error {
if len(knownFiles) == 0 {
return nil
}
// Check each known file against the enumerated set (no filesystem access needed)
for path, file := range knownFiles {
// Check context cancellation periodically
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Check if the file exists in our enumerated set
if _, exists := existingFiles[path]; !exists {
// File has been deleted
result.FilesDeleted++
result.BytesDeleted += file.Size
log.Debug("Detected deleted file", "path", path, "size", file.Size)
}
}
if result.FilesDeleted > 0 {
fmt.Printf("Found %s deleted files\n", formatNumber(result.FilesDeleted))
}
return nil
}
// formatNumber formats a number with comma separators
func formatNumber(n int) string {
if n < 1000 {
return fmt.Sprintf("%d", n)
}
return humanize.Comma(int64(n))
}
// formatCompact formats a number compactly with k/M suffixes (e.g., 5.7k, 1.2M)
func formatCompact(n int) string {
if n < 1000 {
return fmt.Sprintf("%d", n)
}
if n < 10000 {
return fmt.Sprintf("%.1fk", float64(n)/1000)
}
if n < 1000000 {
return fmt.Sprintf("%.0fk", float64(n)/1000)
}
return fmt.Sprintf("%.1fM", float64(n)/1000000)
}