Add pluggable storage backend, PID locking, and improved scan progress
Storage backend: - Add internal/storage package with Storer interface - Implement FileStorer for local filesystem storage (file:// URLs) - Implement S3Storer wrapping existing s3.Client - Support storage_url config field (s3:// or file://) - Migrate all consumers to use storage.Storer interface PID locking: - Add internal/pidlock package to prevent concurrent instances - Acquire lock before app start, release on exit - Detect stale locks from crashed processes Scan progress improvements: - Add fast file enumeration pass before stat() phase - Use enumerated set for deletion detection (no extra filesystem access) - Show progress with percentage, files/sec, elapsed time, and ETA - Change "changed" to "changed/new" for clarity Config improvements: - Add tilde expansion for paths (~/) - Use xdg library for platform-specific default index path
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -14,7 +13,7 @@ import (
|
||||
"git.eeqj.de/sneak/vaultik/internal/chunker"
|
||||
"git.eeqj.de/sneak/vaultik/internal/database"
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
"git.eeqj.de/sneak/vaultik/internal/s3"
|
||||
"git.eeqj.de/sneak/vaultik/internal/storage"
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
@@ -32,7 +31,7 @@ type Scanner struct {
|
||||
chunker *chunker.Chunker
|
||||
packer *blob.Packer
|
||||
repos *database.Repositories
|
||||
s3Client S3Client
|
||||
storage storage.Storer
|
||||
maxBlobSize int64
|
||||
compressionLevel int
|
||||
ageRecipient string
|
||||
@@ -46,19 +45,12 @@ type Scanner struct {
|
||||
scanCtx context.Context
|
||||
}
|
||||
|
||||
// S3Client interface for blob storage operations
|
||||
type S3Client interface {
|
||||
PutObject(ctx context.Context, key string, data io.Reader) error
|
||||
PutObjectWithProgress(ctx context.Context, key string, data io.Reader, size int64, progress s3.ProgressCallback) error
|
||||
StatObject(ctx context.Context, key string) (*s3.ObjectInfo, error)
|
||||
}
|
||||
|
||||
// ScannerConfig contains configuration for the scanner
|
||||
type ScannerConfig struct {
|
||||
FS afero.Fs
|
||||
ChunkSize int64
|
||||
Repositories *database.Repositories
|
||||
S3Client S3Client
|
||||
Storage storage.Storer
|
||||
MaxBlobSize int64
|
||||
CompressionLevel int
|
||||
AgeRecipients []string // Optional, empty means no encryption
|
||||
@@ -111,7 +103,7 @@ func NewScanner(cfg ScannerConfig) *Scanner {
|
||||
chunker: chunker.NewChunker(cfg.ChunkSize),
|
||||
packer: packer,
|
||||
repos: cfg.Repositories,
|
||||
s3Client: cfg.S3Client,
|
||||
storage: cfg.Storage,
|
||||
maxBlobSize: cfg.MaxBlobSize,
|
||||
compressionLevel: cfg.CompressionLevel,
|
||||
ageRecipient: strings.Join(cfg.AgeRecipients, ","),
|
||||
@@ -128,11 +120,11 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
}
|
||||
|
||||
// Set blob handler for concurrent upload
|
||||
if s.s3Client != nil {
|
||||
log.Debug("Setting blob handler for S3 uploads")
|
||||
if s.storage != nil {
|
||||
log.Debug("Setting blob handler for storage uploads")
|
||||
s.packer.SetBlobHandler(s.handleBlobReady)
|
||||
} else {
|
||||
log.Debug("No S3 client configured, blobs will not be uploaded")
|
||||
log.Debug("No storage configured, blobs will not be uploaded")
|
||||
}
|
||||
|
||||
// Start progress reporting if enabled
|
||||
@@ -141,14 +133,23 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
defer s.progress.Stop()
|
||||
}
|
||||
|
||||
// Phase 0: Check for deleted files from previous snapshots
|
||||
if err := s.detectDeletedFiles(ctx, path, result); err != nil {
|
||||
// Phase 0: Quick enumeration of all files on disk
|
||||
fmt.Println("Enumerating files...")
|
||||
existingFiles, err := s.enumerateFiles(ctx, path)
|
||||
if err != nil && err != context.Canceled {
|
||||
log.Warn("Failed to enumerate files", "error", err)
|
||||
existingFiles = make(map[string]struct{})
|
||||
}
|
||||
fmt.Printf("Found %s files\n", formatNumber(len(existingFiles)))
|
||||
|
||||
// Phase 0b: Check for deleted files by comparing DB against enumerated set (no filesystem access)
|
||||
if err := s.detectDeletedFiles(ctx, path, existingFiles, result); err != nil {
|
||||
return nil, fmt.Errorf("detecting deleted files: %w", err)
|
||||
}
|
||||
|
||||
// Phase 1: Scan directory and collect files to process
|
||||
log.Info("Phase 1/3: Scanning directory structure")
|
||||
filesToProcess, err := s.scanPhase(ctx, path, result)
|
||||
filesToProcess, err := s.scanPhase(ctx, path, result, existingFiles)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan phase failed: %w", err)
|
||||
}
|
||||
@@ -216,16 +217,78 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// enumerateFiles performs a quick enumeration to get all file paths without expensive stat() calls
|
||||
// Returns a set of all file paths found on disk
|
||||
func (s *Scanner) enumerateFiles(ctx context.Context, path string) (map[string]struct{}, error) {
|
||||
files := make(map[string]struct{})
|
||||
startTime := time.Now()
|
||||
lastStatusTime := time.Now()
|
||||
statusInterval := 5 * time.Second
|
||||
|
||||
var enumDir func(dirPath string) error
|
||||
enumDir = func(dirPath string) error {
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
f, err := s.fs.Open(dirPath)
|
||||
if err != nil {
|
||||
return nil // Skip directories we can't open
|
||||
}
|
||||
defer func() { _ = f.Close() }()
|
||||
|
||||
for {
|
||||
// Read directory entries in batches
|
||||
entries, err := f.Readdir(1000)
|
||||
if err != nil {
|
||||
break // End of directory or error
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
fullPath := dirPath + "/" + entry.Name()
|
||||
if entry.IsDir() {
|
||||
if err := enumDir(fullPath); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if entry.Mode().IsRegular() {
|
||||
files[fullPath] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Periodic status update
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
elapsed := time.Since(startTime).Round(time.Second)
|
||||
fmt.Printf("Enumerating files: %s found (%s elapsed)\n",
|
||||
formatNumber(len(files)), elapsed)
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := enumDir(path); err != nil {
|
||||
return files, err
|
||||
}
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
// scanPhase performs the initial directory scan to identify files to process
|
||||
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult) ([]*FileToProcess, error) {
|
||||
func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult, existingFiles map[string]struct{}) ([]*FileToProcess, error) {
|
||||
totalFiles := int64(len(existingFiles))
|
||||
|
||||
var filesToProcess []*FileToProcess
|
||||
var mu sync.Mutex
|
||||
|
||||
// Set up periodic status output
|
||||
startTime := time.Now()
|
||||
lastStatusTime := time.Now()
|
||||
statusInterval := 15 * time.Second
|
||||
var filesScanned int64
|
||||
var bytesScanned int64
|
||||
|
||||
log.Debug("Starting directory walk", "path", path)
|
||||
err := afero.Walk(s.fs, path, func(path string, info os.FileInfo, err error) error {
|
||||
@@ -266,7 +329,6 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
// Update scan statistics
|
||||
if info.Mode().IsRegular() {
|
||||
filesScanned++
|
||||
bytesScanned += info.Size()
|
||||
}
|
||||
|
||||
// Output periodic status
|
||||
@@ -275,9 +337,35 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
changedCount := len(filesToProcess)
|
||||
mu.Unlock()
|
||||
|
||||
fmt.Printf("Scan progress: %s files examined, %s changed\n",
|
||||
formatNumber(int(filesScanned)),
|
||||
formatNumber(changedCount))
|
||||
elapsed := time.Since(startTime)
|
||||
rate := float64(filesScanned) / elapsed.Seconds()
|
||||
|
||||
// Build status line
|
||||
if totalFiles > 0 {
|
||||
pct := float64(filesScanned) / float64(totalFiles) * 100
|
||||
remaining := totalFiles - filesScanned
|
||||
var eta time.Duration
|
||||
if rate > 0 {
|
||||
eta = time.Duration(float64(remaining)/rate) * time.Second
|
||||
}
|
||||
fmt.Printf("Scan: %s/%s files (%.1f%%), %s changed/new, %.0f files/sec, %s elapsed",
|
||||
formatNumber(int(filesScanned)),
|
||||
formatNumber(int(totalFiles)),
|
||||
pct,
|
||||
formatNumber(changedCount),
|
||||
rate,
|
||||
elapsed.Round(time.Second))
|
||||
if eta > 0 {
|
||||
fmt.Printf(", ETA %s", eta.Round(time.Second))
|
||||
}
|
||||
fmt.Println()
|
||||
} else {
|
||||
fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
|
||||
formatNumber(int(filesScanned)),
|
||||
formatNumber(changedCount),
|
||||
rate,
|
||||
elapsed.Round(time.Second))
|
||||
}
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
|
||||
@@ -345,8 +433,8 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
|
||||
}
|
||||
s.packerMu.Unlock()
|
||||
|
||||
// If no S3 client, store any remaining blobs
|
||||
if s.s3Client == nil {
|
||||
// If no storage configured, store any remaining blobs locally
|
||||
if s.storage == nil {
|
||||
blobs := s.packer.GetFinishedBlobs()
|
||||
for _, b := range blobs {
|
||||
// Blob metadata is already stored incrementally during packing
|
||||
@@ -573,7 +661,7 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
s.progress.ReportUploadStart(finishedBlob.Hash, finishedBlob.Compressed)
|
||||
}
|
||||
|
||||
// Upload to S3 first (without holding any locks)
|
||||
// Upload to storage first (without holding any locks)
|
||||
// Use scan context for cancellation support
|
||||
ctx := s.scanCtx
|
||||
if ctx == nil {
|
||||
@@ -585,7 +673,6 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
lastProgressBytes := int64(0)
|
||||
|
||||
progressCallback := func(uploaded int64) error {
|
||||
|
||||
// Calculate instantaneous speed
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(lastProgressTime).Seconds()
|
||||
@@ -612,15 +699,15 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
|
||||
// Create sharded path: blobs/ca/fe/cafebabe...
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", finishedBlob.Hash[:2], finishedBlob.Hash[2:4], finishedBlob.Hash)
|
||||
if err := s.s3Client.PutObjectWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
return fmt.Errorf("uploading blob %s to S3: %w", finishedBlob.Hash, err)
|
||||
if err := s.storage.PutWithProgress(ctx, blobPath, blobWithReader.Reader, finishedBlob.Compressed, progressCallback); err != nil {
|
||||
return fmt.Errorf("uploading blob %s to storage: %w", finishedBlob.Hash, err)
|
||||
}
|
||||
|
||||
uploadDuration := time.Since(startTime)
|
||||
|
||||
// Log upload stats
|
||||
uploadSpeed := float64(finishedBlob.Compressed) * 8 / uploadDuration.Seconds() // bits per second
|
||||
log.Info("Successfully uploaded blob to S3 storage",
|
||||
log.Info("Successfully uploaded blob to storage",
|
||||
"path", blobPath,
|
||||
"size", humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
"duration", uploadDuration,
|
||||
@@ -861,17 +948,31 @@ func (s *Scanner) GetProgress() *ProgressReporter {
|
||||
}
|
||||
|
||||
// detectDeletedFiles finds files that existed in previous snapshots but no longer exist
|
||||
func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, result *ScanResult) error {
|
||||
// Uses the pre-enumerated existingFiles set to avoid additional filesystem access
|
||||
func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, existingFiles map[string]struct{}, result *ScanResult) error {
|
||||
// Get all files with this path prefix from the database
|
||||
files, err := s.repos.Files.ListByPrefix(ctx, path)
|
||||
knownFiles, err := s.repos.Files.ListByPrefix(ctx, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing files by prefix: %w", err)
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
// Check if the file still exists on disk
|
||||
_, err := s.fs.Stat(file.Path)
|
||||
if os.IsNotExist(err) {
|
||||
if len(knownFiles) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
fmt.Printf("Checking %s known files for deletions...\n", formatNumber(len(knownFiles)))
|
||||
|
||||
// Check each known file against the enumerated set (no filesystem access needed)
|
||||
for _, file := range knownFiles {
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Check if the file exists in our enumerated set
|
||||
if _, exists := existingFiles[file.Path]; !exists {
|
||||
// File has been deleted
|
||||
result.FilesDeleted++
|
||||
result.BytesDeleted += file.Size
|
||||
@@ -879,6 +980,10 @@ func (s *Scanner) detectDeletedFiles(ctx context.Context, path string, result *S
|
||||
}
|
||||
}
|
||||
|
||||
if result.FilesDeleted > 0 {
|
||||
fmt.Printf("Found %s deleted files\n", formatNumber(result.FilesDeleted))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user