Major refactoring: Updated manifest format and renamed backup to snapshot
- Created manifest.go with proper Manifest structure including blob sizes - Updated manifest generation to include compressed size for each blob - Added TotalCompressedSize field to manifest for quick access - Renamed backup package to snapshot for clarity - Updated snapshot list to show all remote snapshots - Remote snapshots not in local DB fetch manifest to get size - Local snapshots not in remote are automatically deleted - Removed backwards compatibility code (pre-1.0, no users) - Fixed prune command to use new manifest format - Updated all imports and references from backup to snapshot
This commit is contained in:
856
internal/snapshot/scanner.go
Normal file
856
internal/snapshot/scanner.go
Normal file
@@ -0,0 +1,856 @@
|
||||
package snapshot
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/blob"
|
||||
"git.eeqj.de/sneak/vaultik/internal/chunker"
|
||||
"git.eeqj.de/sneak/vaultik/internal/database"
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
"git.eeqj.de/sneak/vaultik/internal/s3"
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
|
||||
// FileToProcess holds information about a file that needs processing
|
||||
type FileToProcess struct {
|
||||
Path string
|
||||
FileInfo os.FileInfo
|
||||
File *database.File
|
||||
}
|
||||
|
||||
// Scanner scans directories and populates the database with file and chunk information
|
||||
type Scanner struct {
|
||||
fs afero.Fs
|
||||
chunker *chunker.Chunker
|
||||
packer *blob.Packer
|
||||
repos *database.Repositories
|
||||
s3Client S3Client
|
||||
maxBlobSize int64
|
||||
compressionLevel int
|
||||
ageRecipient string
|
||||
snapshotID string // Current snapshot being processed
|
||||
progress *ProgressReporter
|
||||
|
||||
// Mutex for coordinating blob creation
|
||||
packerMu sync.Mutex // Blocks chunk production during blob creation
|
||||
|
||||
// Context for cancellation
|
||||
scanCtx context.Context
|
||||
}
|
||||
|
||||
// S3Client interface for blob storage operations
|
||||
type S3Client interface {
|
||||
PutObject(ctx context.Context, key string, data io.Reader) error
|
||||
PutObjectWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress s3.ProgressCallback) error
|
||||
StatObject(ctx context.Context, key string) (*s3.ObjectInfo, error)
|
||||
}
|
||||
|
||||
// ScannerConfig contains configuration for the scanner
|
||||
type ScannerConfig struct {
|
||||
FS afero.Fs
|
||||
ChunkSize int64
|
||||
Repositories *database.Repositories
|
||||
S3Client S3Client
|
||||
MaxBlobSize int64
|
||||
CompressionLevel int
|
||||
AgeRecipients []string // Optional, empty means no encryption
|
||||
EnableProgress bool // Enable progress reporting
|
||||
}
|
||||
|
||||
// ScanResult contains the results of a scan operation
|
||||
type ScanResult struct {
|
||||
FilesScanned int
|
||||
FilesSkipped int
|
||||
BytesScanned int64
|
||||
BytesSkipped int64
|
||||
ChunksCreated int
|
||||
BlobsCreated int
|
||||
StartTime time.Time
|
||||
EndTime time.Time
|
||||
}
|
||||
|
||||
// NewScanner creates a new scanner instance
|
||||
func NewScanner(cfg ScannerConfig) *Scanner {
|
||||
// Create encryptor (required for blob packing)
|
||||
if len(cfg.AgeRecipients) == 0 {
|
||||
log.Error("No age recipients configured - encryption is required")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create blob packer with encryption
|
||||
packerCfg := blob.PackerConfig{
|
||||
MaxBlobSize: cfg.MaxBlobSize,
|
||||
CompressionLevel: cfg.CompressionLevel,
|
||||
Recipients: cfg.AgeRecipients,
|
||||
Repositories: cfg.Repositories,
|
||||
}
|
||||
packer, err := blob.NewPacker(packerCfg)
|
||||
if err != nil {
|
||||
log.Error("Failed to create packer", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var progress *ProgressReporter
|
||||
if cfg.EnableProgress {
|
||||
progress = NewProgressReporter()
|
||||
}
|
||||
|
||||
return &Scanner{
|
||||
fs: cfg.FS,
|
||||
chunker: chunker.NewChunker(cfg.ChunkSize),
|
||||
packer: packer,
|
||||
repos: cfg.Repositories,
|
||||
s3Client: cfg.S3Client,
|
||||
maxBlobSize: cfg.MaxBlobSize,
|
||||
compressionLevel: cfg.CompressionLevel,
|
||||
ageRecipient: strings.Join(cfg.AgeRecipients, ","),
|
||||
progress: progress,
|
||||
}
|
||||
}
|
||||
|
||||
// Scan scans a directory and populates the database
|
||||
func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*ScanResult, error) {
|
||||
s.snapshotID = snapshotID
|
||||
s.scanCtx = ctx
|
||||
result := &ScanResult{
|
||||
StartTime: time.Now().UTC(),
|
||||
}
|
||||
|
||||
// Set blob handler for concurrent upload
|
||||
if s.s3Client != nil {
|
||||
log.Debug("Setting blob handler for S3 uploads")
|
||||
s.packer.SetBlobHandler(s.handleBlobReady)
|
||||
} else {
|
||||
log.Debug("No S3 client configured, blobs will not be uploaded")
|
||||
}
|
||||
|
||||
// Start progress reporting if enabled
|
||||
if s.progress != nil {
|
||||
s.progress.Start()
|
||||
defer s.progress.Stop()
|
||||
}
|
||||
|
||||
// Phase 1: Scan directory and collect files to process
|
||||
log.Info("Phase 1/3: Scanning directory structure")
|
||||
filesToProcess, err := s.scanPhase(ctx, path, result)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan phase failed: %w", err)
|
||||
}
|
||||
|
||||
// Calculate total size to process
|
||||
var totalSizeToProcess int64
|
||||
for _, file := range filesToProcess {
|
||||
totalSizeToProcess += file.FileInfo.Size()
|
||||
}
|
||||
|
||||
// Update progress with total size and file count
|
||||
if s.progress != nil {
|
||||
s.progress.SetTotalSize(totalSizeToProcess)
|
||||
s.progress.GetStats().TotalFiles.Store(int64(len(filesToProcess)))
|
||||
}
|
||||
|
||||
log.Info("Phase 1 complete",
|
||||
"total_files", len(filesToProcess),
|
||||
"total_size", humanize.Bytes(uint64(totalSizeToProcess)),
|
||||
"files_skipped", result.FilesSkipped,
|
||||
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
|
||||
|
||||
// Print detailed scan summary
|
||||
fmt.Printf("\n=== Scan Summary ===\n")
|
||||
fmt.Printf("Total files examined: %d\n", result.FilesScanned)
|
||||
fmt.Printf("Files with content changes: %d\n", len(filesToProcess))
|
||||
fmt.Printf("Files with unchanged content: %d\n", result.FilesSkipped)
|
||||
fmt.Printf("Total size of changed files: %s\n", humanize.Bytes(uint64(totalSizeToProcess)))
|
||||
fmt.Printf("Total size of unchanged files: %s\n", humanize.Bytes(uint64(result.BytesSkipped)))
|
||||
if len(filesToProcess) > 0 {
|
||||
fmt.Printf("\nStarting snapshot of %d changed files...\n\n", len(filesToProcess))
|
||||
} else {
|
||||
fmt.Printf("\nNo file contents have changed.\n")
|
||||
fmt.Printf("Creating metadata-only snapshot to capture current state...\n\n")
|
||||
}
|
||||
|
||||
// Phase 2: Process files and create chunks
|
||||
if len(filesToProcess) > 0 {
|
||||
log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)")
|
||||
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
|
||||
return nil, fmt.Errorf("process phase failed: %w", err)
|
||||
}
|
||||
} else {
|
||||
log.Info("Phase 2/3: Skipping (no file contents changed, metadata-only snapshot)")
|
||||
}
|
||||
|
||||
// Get final stats from packer
|
||||
blobs := s.packer.GetFinishedBlobs()
|
||||
result.BlobsCreated += len(blobs)
|
||||
|
||||
// Query database for actual blob count created during this snapshot
|
||||
// The database is authoritative, especially for concurrent blob uploads
|
||||
// We count uploads rather than all snapshot_blobs to get only NEW blobs
|
||||
if s.snapshotID != "" {
|
||||
uploadCount, err := s.repos.Uploads.GetCountBySnapshot(ctx, s.snapshotID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to query upload count from database", "error", err)
|
||||
} else {
|
||||
result.BlobsCreated = int(uploadCount)
|
||||
}
|
||||
}
|
||||
|
||||
result.EndTime = time.Now().UTC()
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// scanPhase performs the initial directory scan to identify files to process
|
||||
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult) ([]*FileToProcess, error) {
|
||||
var filesToProcess []*FileToProcess
|
||||
var mu sync.Mutex
|
||||
|
||||
// Set up periodic status output
|
||||
lastStatusTime := time.Now()
|
||||
statusInterval := 15 * time.Second
|
||||
var filesScanned int64
|
||||
var bytesScanned int64
|
||||
|
||||
log.Debug("Starting directory walk", "path", path)
|
||||
err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error {
|
||||
log.Debug("Scanning filesystem entry", "path", path)
|
||||
if err != nil {
|
||||
log.Debug("Error accessing filesystem entry", "path", path, "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Check file and update metadata
|
||||
file, needsProcessing, err := s.checkFileAndUpdateMetadata(ctx, path, info, result)
|
||||
if err != nil {
|
||||
// Don't log context cancellation as an error
|
||||
if err == context.Canceled {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("failed to check %s: %w", path, err)
|
||||
}
|
||||
|
||||
// If file needs processing, add to list
|
||||
if needsProcessing && info.Mode().IsRegular() && info.Size() > 0 {
|
||||
mu.Lock()
|
||||
filesToProcess = append(filesToProcess, &FileToProcess{
|
||||
Path: path,
|
||||
FileInfo: info,
|
||||
File: file,
|
||||
})
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
// Update scan statistics
|
||||
if info.Mode().IsRegular() {
|
||||
filesScanned++
|
||||
bytesScanned += info.Size()
|
||||
}
|
||||
|
||||
// Output periodic status
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
mu.Lock()
|
||||
changedCount := len(filesToProcess)
|
||||
mu.Unlock()
|
||||
|
||||
fmt.Printf("Scan progress: %d files examined, %s total size, %d files changed\n",
|
||||
filesScanned,
|
||||
humanize.Bytes(uint64(bytesScanned)),
|
||||
changedCount)
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return filesToProcess, nil
|
||||
}
|
||||
|
||||
// processPhase processes the files that need backing up
|
||||
func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProcess, result *ScanResult) error {
|
||||
// Set up periodic status output
|
||||
lastStatusTime := time.Now()
|
||||
statusInterval := 15 * time.Second
|
||||
startTime := time.Now()
|
||||
filesProcessed := 0
|
||||
totalFiles := len(filesToProcess)
|
||||
|
||||
// Process each file
|
||||
for _, fileToProcess := range filesToProcess {
|
||||
// Update progress
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().CurrentFile.Store(fileToProcess.Path)
|
||||
}
|
||||
|
||||
// Process file in streaming fashion
|
||||
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
|
||||
return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
|
||||
}
|
||||
|
||||
// Update files processed counter
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().FilesProcessed.Add(1)
|
||||
}
|
||||
|
||||
filesProcessed++
|
||||
|
||||
// Output periodic status
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
elapsed := time.Since(startTime)
|
||||
remaining := totalFiles - filesProcessed
|
||||
var eta time.Duration
|
||||
if filesProcessed > 0 {
|
||||
eta = elapsed / time.Duration(filesProcessed) * time.Duration(remaining)
|
||||
}
|
||||
|
||||
fmt.Printf("Snapshot progress: %d/%d files processed, %d chunks created, %d blobs uploaded",
|
||||
filesProcessed, totalFiles, result.ChunksCreated, result.BlobsCreated)
|
||||
if remaining > 0 && eta > 0 {
|
||||
fmt.Printf(", ETA: %s", eta.Round(time.Second))
|
||||
}
|
||||
fmt.Println()
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// Final flush (outside any transaction)
|
||||
s.packerMu.Lock()
|
||||
if err := s.packer.Flush(); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("flushing packer: %w", err)
|
||||
}
|
||||
s.packerMu.Unlock()
|
||||
|
||||
// If no S3 client, store any remaining blobs
|
||||
if s.s3Client == nil {
|
||||
blobs := s.packer.GetFinishedBlobs()
|
||||
for _, b := range blobs {
|
||||
// Blob metadata is already stored incrementally during packing
|
||||
// Just add the blob to the snapshot
|
||||
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, b.ID, b.Hash)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("storing blob metadata: %w", err)
|
||||
}
|
||||
}
|
||||
result.BlobsCreated += len(blobs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkFileAndUpdateMetadata checks if a file needs processing and updates metadata
|
||||
func (s *Scanner) checkFileAndUpdateMetadata(ctx context.Context, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) {
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, false, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Process file without holding a long transaction
|
||||
return s.checkFile(ctx, path, info, result)
|
||||
}
|
||||
|
||||
// checkFile checks if a file needs processing and updates metadata
|
||||
func (s *Scanner) checkFile(ctx context.Context, path string, info os.FileInfo, result *ScanResult) (*database.File, bool, error) {
|
||||
// Get file stats
|
||||
stat, ok := info.Sys().(interface {
|
||||
Uid() uint32
|
||||
Gid() uint32
|
||||
})
|
||||
|
||||
var uid, gid uint32
|
||||
if ok {
|
||||
uid = stat.Uid()
|
||||
gid = stat.Gid()
|
||||
}
|
||||
|
||||
// Check if it's a symlink
|
||||
var linkTarget string
|
||||
if info.Mode()&os.ModeSymlink != 0 {
|
||||
// Read the symlink target
|
||||
if linker, ok := s.fs.(afero.LinkReader); ok {
|
||||
linkTarget, _ = linker.ReadlinkIfPossible(path)
|
||||
}
|
||||
}
|
||||
|
||||
// Create file record
|
||||
file := &database.File{
|
||||
Path: path,
|
||||
MTime: info.ModTime(),
|
||||
CTime: info.ModTime(), // afero doesn't provide ctime
|
||||
Size: info.Size(),
|
||||
Mode: uint32(info.Mode()),
|
||||
UID: uid,
|
||||
GID: gid,
|
||||
LinkTarget: linkTarget,
|
||||
}
|
||||
|
||||
// Check if file has changed since last backup (no transaction needed for read)
|
||||
log.Debug("Querying database for existing file record", "path", path)
|
||||
existingFile, err := s.repos.Files.GetByPath(ctx, path)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("checking existing file: %w", err)
|
||||
}
|
||||
|
||||
fileChanged := existingFile == nil || s.hasFileChanged(existingFile, file)
|
||||
|
||||
// Update file metadata and add to snapshot in a single transaction
|
||||
log.Debug("Updating file record in database and adding to snapshot", "path", path, "changed", fileChanged, "snapshot", s.snapshotID)
|
||||
err = s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
// First create/update the file
|
||||
if err := s.repos.Files.Create(ctx, tx, file); err != nil {
|
||||
return fmt.Errorf("creating file: %w", err)
|
||||
}
|
||||
// Then add it to the snapshot using the file ID
|
||||
if err := s.repos.Snapshots.AddFileByID(ctx, tx, s.snapshotID, file.ID); err != nil {
|
||||
return fmt.Errorf("adding file to snapshot: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
log.Debug("File record added to snapshot association", "path", path)
|
||||
|
||||
result.FilesScanned++
|
||||
|
||||
// Update progress
|
||||
if s.progress != nil {
|
||||
stats := s.progress.GetStats()
|
||||
stats.FilesScanned.Add(1)
|
||||
stats.CurrentFile.Store(path)
|
||||
}
|
||||
|
||||
// Track skipped files
|
||||
if info.Mode().IsRegular() && info.Size() > 0 && !fileChanged {
|
||||
result.FilesSkipped++
|
||||
result.BytesSkipped += info.Size()
|
||||
if s.progress != nil {
|
||||
stats := s.progress.GetStats()
|
||||
stats.FilesSkipped.Add(1)
|
||||
stats.BytesSkipped.Add(info.Size())
|
||||
}
|
||||
// File hasn't changed, but we still need to associate existing chunks with this snapshot
|
||||
log.Debug("File content unchanged, reusing existing chunks and blobs", "path", path)
|
||||
if err := s.associateExistingChunks(ctx, path); err != nil {
|
||||
return nil, false, fmt.Errorf("associating existing chunks: %w", err)
|
||||
}
|
||||
log.Debug("Existing chunks and blobs associated with snapshot", "path", path)
|
||||
} else {
|
||||
// File changed or is not a regular file
|
||||
result.BytesScanned += info.Size()
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesScanned.Add(info.Size())
|
||||
}
|
||||
}
|
||||
|
||||
return file, fileChanged, nil
|
||||
}
|
||||
|
||||
// hasFileChanged determines if a file has changed since last backup
|
||||
func (s *Scanner) hasFileChanged(existingFile, newFile *database.File) bool {
|
||||
// Check if any metadata has changed
|
||||
if existingFile.Size != newFile.Size {
|
||||
return true
|
||||
}
|
||||
if existingFile.MTime.Unix() != newFile.MTime.Unix() {
|
||||
return true
|
||||
}
|
||||
if existingFile.Mode != newFile.Mode {
|
||||
return true
|
||||
}
|
||||
if existingFile.UID != newFile.UID {
|
||||
return true
|
||||
}
|
||||
if existingFile.GID != newFile.GID {
|
||||
return true
|
||||
}
|
||||
if existingFile.LinkTarget != newFile.LinkTarget {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// associateExistingChunks links existing chunks from an unchanged file to the current snapshot
|
||||
func (s *Scanner) associateExistingChunks(ctx context.Context, path string) error {
|
||||
log.Debug("associateExistingChunks start", "path", path)
|
||||
|
||||
// Get existing file chunks (no transaction needed for read)
|
||||
log.Debug("Querying database for file's chunk associations", "path", path)
|
||||
fileChunks, err := s.repos.FileChunks.GetByFile(ctx, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting existing file chunks: %w", err)
|
||||
}
|
||||
log.Debug("Retrieved file chunk associations from database", "path", path, "count", len(fileChunks))
|
||||
|
||||
// Collect unique blob IDs that need to be added to snapshot
|
||||
blobsToAdd := make(map[string]string) // blob ID -> blob hash
|
||||
for i, fc := range fileChunks {
|
||||
log.Debug("Looking up blob containing chunk", "path", path, "chunk_index", i, "chunk_hash", fc.ChunkHash)
|
||||
|
||||
// Find which blob contains this chunk (no transaction needed for read)
|
||||
log.Debug("Querying database for blob containing chunk", "chunk_hash", fc.ChunkHash)
|
||||
blobChunk, err := s.repos.BlobChunks.GetByChunkHash(ctx, fc.ChunkHash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("finding blob for chunk %s: %w", fc.ChunkHash, err)
|
||||
}
|
||||
if blobChunk == nil {
|
||||
log.Warn("Chunk record exists in database but not associated with any blob", "chunk", fc.ChunkHash, "file", path)
|
||||
continue
|
||||
}
|
||||
log.Debug("Found blob record containing chunk", "chunk_hash", fc.ChunkHash, "blob_id", blobChunk.BlobID)
|
||||
|
||||
// Track blob ID for later processing
|
||||
if _, exists := blobsToAdd[blobChunk.BlobID]; !exists {
|
||||
blobsToAdd[blobChunk.BlobID] = "" // We'll get the hash later
|
||||
}
|
||||
}
|
||||
|
||||
// Now get blob hashes outside of transaction operations
|
||||
for blobID := range blobsToAdd {
|
||||
blob, err := s.repos.Blobs.GetByID(ctx, blobID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting blob %s: %w", blobID, err)
|
||||
}
|
||||
if blob == nil {
|
||||
log.Warn("Blob record missing from database", "blob_id", blobID)
|
||||
delete(blobsToAdd, blobID)
|
||||
continue
|
||||
}
|
||||
blobsToAdd[blobID] = blob.Hash
|
||||
}
|
||||
|
||||
// Add blobs to snapshot using short transactions
|
||||
for blobID, blobHash := range blobsToAdd {
|
||||
log.Debug("Adding blob reference to snapshot association", "blob_id", blobID, "blob_hash", blobHash, "snapshot", s.snapshotID)
|
||||
err := s.repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
return s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, blobID, blobHash)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("adding existing blob to snapshot: %w", err)
|
||||
}
|
||||
log.Debug("Created snapshot-blob association in database", "blob_id", blobID)
|
||||
}
|
||||
|
||||
log.Debug("associateExistingChunks complete", "path", path, "blobs_processed", len(blobsToAdd))
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleBlobReady is called by the packer when a blob is finalized
|
||||
func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
log.Debug("Invoking blob upload handler", "blob_hash", blobWithReader.Hash[:8]+"...")
|
||||
|
||||
startTime := time.Now().UTC()
|
||||
finishedBlob := blobWithReader.FinishedBlob
|
||||
|
||||
// Report upload start
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
|
||||
}
|
||||
|
||||
// Upload to S3 first (without holding any locks)
|
||||
// Use scan context for cancellation support
|
||||
ctx := s.scanCtx
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// Track bytes uploaded for accurate speed calculation
|
||||
lastProgressTime := time.Now()
|
||||
lastProgressBytes := int64(0)
|
||||
|
||||
progressCallback := func(uploaded int64) error {
|
||||
|
||||
// Calculate instantaneous speed
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(lastProgressTime).Seconds()
|
||||
if elapsed > 0.5 { // Update speed every 0.5 seconds
|
||||
bytesSinceLastUpdate := uploaded - lastProgressBytes
|
||||
speed := float64(bytesSinceLastUpdate) / elapsed
|
||||
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadProgress(finishedBlob.Hash, uploaded, finishedBlob.Compressed, speed)
|
||||
}
|
||||
|
||||
lastProgressTime = now
|
||||
lastProgressBytes = uploaded
|
||||
}
|
||||
|
||||
// Check for cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Create sharded path: blobs/ca/fe/cafebabe...
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
||||
if err := s.s3Client.PutObjectWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
return fmt.Errorf("uploading blob %s to S3: %w", finishedBlob.Hash, err)
|
||||
}
|
||||
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
// Log upload stats
|
||||
uploadSpeed := float64(finishedBlob.Compressed) * 8 / uploadDuration.Seconds() // bits per second
|
||||
log.Info("Successfully uploaded blob to S3 storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
"speed", humanize.SI(uploadSpeed, "bps"))
|
||||
|
||||
// Report upload complete
|
||||
if s.progress != nil {
|
||||
s.progress.ReportUploadComplete(finishedBlob.Hash, finishedBlob.Compressed, uploadDuration)
|
||||
}
|
||||
|
||||
// Update progress
|
||||
if s.progress != nil {
|
||||
stats := s.progress.GetStats()
|
||||
stats.BlobsUploaded.Add(1)
|
||||
stats.BytesUploaded.Add(finishedBlob.Compressed)
|
||||
stats.BlobsCreated.Add(1)
|
||||
}
|
||||
|
||||
// Store metadata in database (after upload is complete)
|
||||
dbCtx := s.scanCtx
|
||||
if dbCtx == nil {
|
||||
dbCtx = context.Background()
|
||||
}
|
||||
err := s.repos.WithTx(dbCtx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
// Update blob upload timestamp
|
||||
if err := s.repos.Blobs.UpdateUploaded(ctx, tx, finishedBlob.ID); err != nil {
|
||||
return fmt.Errorf("updating blob upload timestamp: %w", err)
|
||||
}
|
||||
|
||||
// Add the blob to the snapshot
|
||||
if err := s.repos.Snapshots.AddBlob(ctx, tx, s.snapshotID, finishedBlob.ID, finishedBlob.Hash); err != nil {
|
||||
return fmt.Errorf("adding blob to snapshot: %w", err)
|
||||
}
|
||||
|
||||
// Record upload metrics
|
||||
upload := &database.Upload{
|
||||
BlobHash: finishedBlob.Hash,
|
||||
SnapshotID: s.snapshotID,
|
||||
UploadedAt: startTime,
|
||||
Size: finishedBlob.Compressed,
|
||||
DurationMs: uploadDuration.Milliseconds(),
|
||||
}
|
||||
if err := s.repos.Uploads.Create(ctx, tx, upload); err != nil {
|
||||
return fmt.Errorf("recording upload metrics: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
// Cleanup temp file if needed
|
||||
if blobWithReader.TempFile != nil {
|
||||
tempName := blobWithReader.TempFile.Name()
|
||||
if err := blobWithReader.TempFile.Close(); err != nil {
|
||||
log.Fatal("Failed to close temp file", "file", tempName, "error", err)
|
||||
}
|
||||
if err := os.Remove(tempName); err != nil {
|
||||
log.Fatal("Failed to remove temp file", "file", tempName, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// processFileStreaming processes a file by streaming chunks directly to the packer
|
||||
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
|
||||
// Open the file
|
||||
file, err := s.fs.Open(fileToProcess.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening file: %w", err)
|
||||
}
|
||||
defer func() { _ = file.Close() }()
|
||||
|
||||
// We'll collect file chunks for database storage
|
||||
// but process them for packing as we go
|
||||
type chunkInfo struct {
|
||||
fileChunk database.FileChunk
|
||||
offset int64
|
||||
size int64
|
||||
}
|
||||
var chunks []chunkInfo
|
||||
chunkIndex := 0
|
||||
|
||||
// Process chunks in streaming fashion and get full file hash
|
||||
fileHash, err := s.chunker.ChunkReaderStreaming(file, func(chunk chunker.Chunk) error {
|
||||
// Check for cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
log.Debug("Processing content-defined chunk from file",
|
||||
"file", fileToProcess.Path,
|
||||
"chunk_index", chunkIndex,
|
||||
"hash", chunk.Hash,
|
||||
"size", chunk.Size)
|
||||
|
||||
// Check if chunk already exists (outside of transaction)
|
||||
existing, err := s.repos.Chunks.GetByHash(ctx, chunk.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("checking chunk existence: %w", err)
|
||||
}
|
||||
chunkExists := (existing != nil)
|
||||
|
||||
// Store chunk if new
|
||||
if !chunkExists {
|
||||
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
dbChunk := &database.Chunk{
|
||||
ChunkHash: chunk.Hash,
|
||||
Size: chunk.Size,
|
||||
}
|
||||
if err := s.repos.Chunks.Create(txCtx, tx, dbChunk); err != nil {
|
||||
return fmt.Errorf("creating chunk: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("storing chunk: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Track file chunk association for later storage
|
||||
chunks = append(chunks, chunkInfo{
|
||||
fileChunk: database.FileChunk{
|
||||
FileID: fileToProcess.File.ID,
|
||||
Idx: chunkIndex,
|
||||
ChunkHash: chunk.Hash,
|
||||
},
|
||||
offset: chunk.Offset,
|
||||
size: chunk.Size,
|
||||
})
|
||||
|
||||
// Update stats
|
||||
if chunkExists {
|
||||
result.FilesSkipped++ // Track as skipped for now
|
||||
result.BytesSkipped += chunk.Size
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().BytesSkipped.Add(chunk.Size)
|
||||
}
|
||||
} else {
|
||||
result.ChunksCreated++
|
||||
result.BytesScanned += chunk.Size
|
||||
if s.progress != nil {
|
||||
s.progress.GetStats().ChunksCreated.Add(1)
|
||||
s.progress.GetStats().BytesProcessed.Add(chunk.Size)
|
||||
s.progress.UpdateChunkingActivity()
|
||||
}
|
||||
}
|
||||
|
||||
// Add chunk to packer immediately (streaming)
|
||||
// This happens outside the database transaction
|
||||
if !chunkExists {
|
||||
s.packerMu.Lock()
|
||||
err := s.packer.AddChunk(&blob.ChunkRef{
|
||||
Hash: chunk.Hash,
|
||||
Data: chunk.Data,
|
||||
})
|
||||
if err == blob.ErrBlobSizeLimitExceeded {
|
||||
// Finalize current blob and retry
|
||||
if err := s.packer.FinalizeBlob(); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("finalizing blob: %w", err)
|
||||
}
|
||||
// Retry adding the chunk
|
||||
if err := s.packer.AddChunk(&blob.ChunkRef{
|
||||
Hash: chunk.Hash,
|
||||
Data: chunk.Data,
|
||||
}); err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk after finalize: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
s.packerMu.Unlock()
|
||||
return fmt.Errorf("adding chunk to packer: %w", err)
|
||||
}
|
||||
s.packerMu.Unlock()
|
||||
}
|
||||
|
||||
// Clear chunk data from memory immediately after use
|
||||
chunk.Data = nil
|
||||
|
||||
chunkIndex++
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("chunking file: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("Completed snapshotting file",
|
||||
"path", fileToProcess.Path,
|
||||
"file_hash", fileHash,
|
||||
"chunks", len(chunks))
|
||||
|
||||
// Store file-chunk associations and chunk-file mappings in database
|
||||
err = s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
// First, delete all existing file_chunks and chunk_files for this file
|
||||
// This ensures old chunks are no longer associated when file content changes
|
||||
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil {
|
||||
return fmt.Errorf("deleting old file chunks: %w", err)
|
||||
}
|
||||
if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, fileToProcess.File.ID); err != nil {
|
||||
return fmt.Errorf("deleting old chunk files: %w", err)
|
||||
}
|
||||
|
||||
for _, ci := range chunks {
|
||||
// Create file-chunk mapping
|
||||
if err := s.repos.FileChunks.Create(txCtx, tx, &ci.fileChunk); err != nil {
|
||||
return fmt.Errorf("creating file chunk: %w", err)
|
||||
}
|
||||
|
||||
// Create chunk-file mapping
|
||||
chunkFile := &database.ChunkFile{
|
||||
ChunkHash: ci.fileChunk.ChunkHash,
|
||||
FileID: fileToProcess.File.ID,
|
||||
FileOffset: ci.offset,
|
||||
Length: ci.size,
|
||||
}
|
||||
if err := s.repos.ChunkFiles.Create(txCtx, tx, chunkFile); err != nil {
|
||||
return fmt.Errorf("creating chunk file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add file to snapshot
|
||||
if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, fileToProcess.File.ID); err != nil {
|
||||
return fmt.Errorf("adding file to snapshot: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// GetProgress returns the progress reporter for this scanner
|
||||
func (s *Scanner) GetProgress() *ProgressReporter {
|
||||
return s.progress
|
||||
}
|
||||
Reference in New Issue
Block a user