Add exclude patterns, snapshot prune, and other improvements
- Implement exclude patterns with anchored pattern support: - Patterns starting with / only match from root of source dir - Unanchored patterns match anywhere in path - Support for glob patterns (*.log, .*, **/*.pack) - Directory patterns skip entire subtrees - Add gobwas/glob dependency for pattern matching - Add 16 comprehensive tests for exclude functionality - Add snapshot prune command to clean orphaned data: - Removes incomplete snapshots from database - Cleans orphaned files, chunks, and blobs - Runs automatically at backup start for consistency - Add snapshot remove command for deleting snapshots - Add VAULTIK_AGE_SECRET_KEY environment variable support - Fix duplicate fx module provider in restore command - Change snapshot ID format to hostname_YYYY-MM-DDTHH:MM:SSZ
This commit is contained in:
@@ -3,8 +3,10 @@ package snapshot
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -15,6 +17,7 @@ import (
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
"git.eeqj.de/sneak/vaultik/internal/storage"
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/gobwas/glob"
|
||||
"github.com/google/uuid"
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
@@ -33,6 +36,13 @@ type pendingFileData struct {
|
||||
chunkFiles []database.ChunkFile
|
||||
}
|
||||
|
||||
// compiledPattern holds a compiled glob pattern and whether it's anchored
|
||||
type compiledPattern struct {
|
||||
pattern glob.Glob
|
||||
anchored bool // If true, only matches from root of source dir
|
||||
original string
|
||||
}
|
||||
|
||||
// Scanner scans directories and populates the database with file and chunk information
|
||||
type Scanner struct {
|
||||
fs afero.Fs
|
||||
@@ -43,7 +53,9 @@ type Scanner struct {
|
||||
maxBlobSize int64
|
||||
compressionLevel int
|
||||
ageRecipient string
|
||||
snapshotID string // Current snapshot being processed
|
||||
snapshotID string // Current snapshot being processed
|
||||
exclude []string // Glob patterns for files/directories to exclude
|
||||
compiledExclude []compiledPattern // Compiled glob patterns
|
||||
progress *ProgressReporter
|
||||
|
||||
// In-memory cache of known chunk hashes for fast existence checks
|
||||
@@ -77,6 +89,7 @@ type ScannerConfig struct {
|
||||
CompressionLevel int
|
||||
AgeRecipients []string // Optional, empty means no encryption
|
||||
EnableProgress bool // Enable progress reporting
|
||||
Exclude []string // Glob patterns for files/directories to exclude
|
||||
}
|
||||
|
||||
// ScanResult contains the results of a scan operation
|
||||
@@ -120,6 +133,9 @@ func NewScanner(cfg ScannerConfig) *Scanner {
|
||||
progress = NewProgressReporter()
|
||||
}
|
||||
|
||||
// Compile exclude patterns
|
||||
compiledExclude := compileExcludePatterns(cfg.Exclude)
|
||||
|
||||
return &Scanner{
|
||||
fs: cfg.FS,
|
||||
chunker: chunker.NewChunker(cfg.ChunkSize),
|
||||
@@ -129,6 +145,8 @@ func NewScanner(cfg ScannerConfig) *Scanner {
|
||||
maxBlobSize: cfg.MaxBlobSize,
|
||||
compressionLevel: cfg.CompressionLevel,
|
||||
ageRecipient: strings.Join(cfg.AgeRecipients, ","),
|
||||
exclude: cfg.Exclude,
|
||||
compiledExclude: compiledExclude,
|
||||
progress: progress,
|
||||
pendingChunkHashes: make(map[string]struct{}),
|
||||
}
|
||||
@@ -314,11 +332,14 @@ func (s *Scanner) addPendingChunkHash(hash string) {
|
||||
|
||||
// removePendingChunkHashes removes committed chunk hashes from the pending set
|
||||
func (s *Scanner) removePendingChunkHashes(hashes []string) {
|
||||
log.Debug("removePendingChunkHashes: starting", "count", len(hashes))
|
||||
start := time.Now()
|
||||
s.pendingChunkHashesMu.Lock()
|
||||
for _, hash := range hashes {
|
||||
delete(s.pendingChunkHashes, hash)
|
||||
}
|
||||
s.pendingChunkHashesMu.Unlock()
|
||||
log.Debug("removePendingChunkHashes: done", "count", len(hashes), "duration", time.Since(start))
|
||||
}
|
||||
|
||||
// isChunkPending returns true if the chunk is still pending (not yet committed to DB)
|
||||
@@ -395,12 +416,19 @@ func (s *Scanner) flushAllPending(ctx context.Context) error {
|
||||
// flushCompletedPendingFiles flushes only files whose chunks are all committed to DB
|
||||
// Files with pending chunks are kept in the queue for later flushing
|
||||
func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
|
||||
flushStart := time.Now()
|
||||
log.Debug("flushCompletedPendingFiles: starting")
|
||||
|
||||
log.Debug("flushCompletedPendingFiles: acquiring pendingFilesMu lock")
|
||||
s.pendingFilesMu.Lock()
|
||||
log.Debug("flushCompletedPendingFiles: acquired lock", "pending_files", len(s.pendingFiles))
|
||||
|
||||
// Separate files into complete (can flush) and incomplete (keep pending)
|
||||
var canFlush []pendingFileData
|
||||
var stillPending []pendingFileData
|
||||
|
||||
log.Debug("flushCompletedPendingFiles: checking which files can flush")
|
||||
checkStart := time.Now()
|
||||
for _, data := range s.pendingFiles {
|
||||
allChunksCommitted := true
|
||||
for _, fc := range data.fileChunks {
|
||||
@@ -415,11 +443,14 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
|
||||
stillPending = append(stillPending, data)
|
||||
}
|
||||
}
|
||||
log.Debug("flushCompletedPendingFiles: check done", "duration", time.Since(checkStart), "can_flush", len(canFlush), "still_pending", len(stillPending))
|
||||
|
||||
s.pendingFiles = stillPending
|
||||
s.pendingFilesMu.Unlock()
|
||||
log.Debug("flushCompletedPendingFiles: released lock")
|
||||
|
||||
if len(canFlush) == 0 {
|
||||
log.Debug("flushCompletedPendingFiles: nothing to flush")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -427,43 +458,85 @@ func (s *Scanner) flushCompletedPendingFiles(ctx context.Context) error {
|
||||
"files_to_flush", len(canFlush),
|
||||
"files_still_pending", len(stillPending))
|
||||
|
||||
// Flush the complete files
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
for _, data := range canFlush {
|
||||
// Create or update the file record
|
||||
if err := s.repos.Files.Create(txCtx, tx, data.file); err != nil {
|
||||
return fmt.Errorf("creating file record: %w", err)
|
||||
}
|
||||
// Collect all data for batch operations
|
||||
log.Debug("flushCompletedPendingFiles: collecting data for batch ops")
|
||||
collectStart := time.Now()
|
||||
var allFileChunks []database.FileChunk
|
||||
var allChunkFiles []database.ChunkFile
|
||||
var allFileIDs []string
|
||||
var allFiles []*database.File
|
||||
|
||||
// Delete any existing file_chunks and chunk_files for this file
|
||||
if err := s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
|
||||
return fmt.Errorf("deleting old file chunks: %w", err)
|
||||
}
|
||||
if err := s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID); err != nil {
|
||||
return fmt.Errorf("deleting old chunk files: %w", err)
|
||||
}
|
||||
for _, data := range canFlush {
|
||||
allFileChunks = append(allFileChunks, data.fileChunks...)
|
||||
allChunkFiles = append(allChunkFiles, data.chunkFiles...)
|
||||
allFileIDs = append(allFileIDs, data.file.ID)
|
||||
allFiles = append(allFiles, data.file)
|
||||
}
|
||||
log.Debug("flushCompletedPendingFiles: collected data",
|
||||
"duration", time.Since(collectStart),
|
||||
"file_chunks", len(allFileChunks),
|
||||
"chunk_files", len(allChunkFiles),
|
||||
"files", len(allFiles))
|
||||
|
||||
// Create file-chunk mappings
|
||||
for i := range data.fileChunks {
|
||||
if err := s.repos.FileChunks.Create(txCtx, tx, &data.fileChunks[i]); err != nil {
|
||||
return fmt.Errorf("creating file chunk: %w", err)
|
||||
}
|
||||
}
|
||||
// Flush the complete files using batch operations
|
||||
log.Debug("flushCompletedPendingFiles: starting transaction")
|
||||
txStart := time.Now()
|
||||
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
log.Debug("flushCompletedPendingFiles: inside transaction")
|
||||
|
||||
// Create chunk-file mappings
|
||||
for i := range data.chunkFiles {
|
||||
if err := s.repos.ChunkFiles.Create(txCtx, tx, &data.chunkFiles[i]); err != nil {
|
||||
return fmt.Errorf("creating chunk file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add file to snapshot
|
||||
if err := s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID); err != nil {
|
||||
return fmt.Errorf("adding file to snapshot: %w", err)
|
||||
}
|
||||
// Batch delete old file_chunks and chunk_files
|
||||
log.Debug("flushCompletedPendingFiles: deleting old file_chunks")
|
||||
opStart := time.Now()
|
||||
if err := s.repos.FileChunks.DeleteByFileIDs(txCtx, tx, allFileIDs); err != nil {
|
||||
return fmt.Errorf("batch deleting old file chunks: %w", err)
|
||||
}
|
||||
log.Debug("flushCompletedPendingFiles: deleted file_chunks", "duration", time.Since(opStart))
|
||||
|
||||
log.Debug("flushCompletedPendingFiles: deleting old chunk_files")
|
||||
opStart = time.Now()
|
||||
if err := s.repos.ChunkFiles.DeleteByFileIDs(txCtx, tx, allFileIDs); err != nil {
|
||||
return fmt.Errorf("batch deleting old chunk files: %w", err)
|
||||
}
|
||||
log.Debug("flushCompletedPendingFiles: deleted chunk_files", "duration", time.Since(opStart))
|
||||
|
||||
// Batch create/update file records
|
||||
log.Debug("flushCompletedPendingFiles: creating files")
|
||||
opStart = time.Now()
|
||||
if err := s.repos.Files.CreateBatch(txCtx, tx, allFiles); err != nil {
|
||||
return fmt.Errorf("batch creating file records: %w", err)
|
||||
}
|
||||
log.Debug("flushCompletedPendingFiles: created files", "duration", time.Since(opStart))
|
||||
|
||||
// Batch insert file_chunks
|
||||
log.Debug("flushCompletedPendingFiles: inserting file_chunks")
|
||||
opStart = time.Now()
|
||||
if err := s.repos.FileChunks.CreateBatch(txCtx, tx, allFileChunks); err != nil {
|
||||
return fmt.Errorf("batch creating file chunks: %w", err)
|
||||
}
|
||||
log.Debug("flushCompletedPendingFiles: inserted file_chunks", "duration", time.Since(opStart))
|
||||
|
||||
// Batch insert chunk_files
|
||||
log.Debug("flushCompletedPendingFiles: inserting chunk_files")
|
||||
opStart = time.Now()
|
||||
if err := s.repos.ChunkFiles.CreateBatch(txCtx, tx, allChunkFiles); err != nil {
|
||||
return fmt.Errorf("batch creating chunk files: %w", err)
|
||||
}
|
||||
log.Debug("flushCompletedPendingFiles: inserted chunk_files", "duration", time.Since(opStart))
|
||||
|
||||
// Batch add files to snapshot
|
||||
log.Debug("flushCompletedPendingFiles: adding files to snapshot")
|
||||
opStart = time.Now()
|
||||
if err := s.repos.Snapshots.AddFilesByIDBatch(txCtx, tx, s.snapshotID, allFileIDs); err != nil {
|
||||
return fmt.Errorf("batch adding files to snapshot: %w", err)
|
||||
}
|
||||
log.Debug("flushCompletedPendingFiles: added files to snapshot", "duration", time.Since(opStart))
|
||||
|
||||
log.Debug("flushCompletedPendingFiles: transaction complete")
|
||||
return nil
|
||||
})
|
||||
log.Debug("flushCompletedPendingFiles: transaction done", "duration", time.Since(txStart))
|
||||
log.Debug("flushCompletedPendingFiles: total duration", "duration", time.Since(flushStart))
|
||||
return err
|
||||
}
|
||||
|
||||
// ScanPhaseResult contains the results of the scan phase
|
||||
@@ -504,6 +577,14 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
default:
|
||||
}
|
||||
|
||||
// Check exclude patterns - for directories, skip the entire subtree
|
||||
if s.shouldExclude(filePath, path) {
|
||||
if info.IsDir() {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip non-regular files for processing (but still count them)
|
||||
if !info.Mode().IsRegular() {
|
||||
return nil
|
||||
@@ -730,6 +811,12 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
|
||||
|
||||
// Process file in streaming fashion
|
||||
if err := s.processFileStreaming(ctx, fileToProcess, result); err != nil {
|
||||
// Handle files that were deleted between scan and process phases
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
log.Warn("File was deleted during backup, skipping", "path", fileToProcess.Path)
|
||||
result.FilesSkipped++
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("processing file %s: %w", fileToProcess.Path, err)
|
||||
}
|
||||
|
||||
@@ -939,14 +1026,19 @@ func (s *Scanner) handleBlobReady(blobWithReader *blob.BlobWithReader) error {
|
||||
}
|
||||
|
||||
// Chunks from this blob are now committed to DB - remove from pending set
|
||||
log.Debug("handleBlobReady: removing pending chunk hashes")
|
||||
s.removePendingChunkHashes(blobWithReader.InsertedChunkHashes)
|
||||
log.Debug("handleBlobReady: removed pending chunk hashes")
|
||||
|
||||
// Flush files whose chunks are now all committed
|
||||
// This maintains database consistency after each blob
|
||||
log.Debug("handleBlobReady: calling flushCompletedPendingFiles")
|
||||
if err := s.flushCompletedPendingFiles(dbCtx); err != nil {
|
||||
return fmt.Errorf("flushing completed files: %w", err)
|
||||
}
|
||||
log.Debug("handleBlobReady: flushCompletedPendingFiles returned")
|
||||
|
||||
log.Debug("handleBlobReady: complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1135,6 +1227,105 @@ func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[
|
||||
return nil
|
||||
}
|
||||
|
||||
// compileExcludePatterns compiles the exclude patterns into glob matchers
|
||||
func compileExcludePatterns(patterns []string) []compiledPattern {
|
||||
var compiled []compiledPattern
|
||||
for _, p := range patterns {
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if pattern is anchored (starts with /)
|
||||
anchored := strings.HasPrefix(p, "/")
|
||||
pattern := p
|
||||
if anchored {
|
||||
pattern = p[1:] // Remove leading /
|
||||
}
|
||||
|
||||
// Remove trailing slash if present (directory indicator)
|
||||
pattern = strings.TrimSuffix(pattern, "/")
|
||||
|
||||
// Compile the glob pattern
|
||||
// For patterns without path separators, we need to match them as components
|
||||
// e.g., ".git" should match ".git" anywhere in the path
|
||||
g, err := glob.Compile(pattern, '/')
|
||||
if err != nil {
|
||||
log.Warn("Invalid exclude pattern, skipping", "pattern", p, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
compiled = append(compiled, compiledPattern{
|
||||
pattern: g,
|
||||
anchored: anchored,
|
||||
original: p,
|
||||
})
|
||||
}
|
||||
return compiled
|
||||
}
|
||||
|
||||
// shouldExclude checks if a path should be excluded based on exclude patterns
|
||||
// filePath is the full path to the file
|
||||
// rootPath is the root of the backup source directory
|
||||
func (s *Scanner) shouldExclude(filePath, rootPath string) bool {
|
||||
if len(s.compiledExclude) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Get the relative path from root
|
||||
relPath, err := filepath.Rel(rootPath, filePath)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Never exclude the root directory itself
|
||||
if relPath == "." {
|
||||
return false
|
||||
}
|
||||
|
||||
// Normalize path separators
|
||||
relPath = filepath.ToSlash(relPath)
|
||||
|
||||
// Check each pattern
|
||||
for _, cp := range s.compiledExclude {
|
||||
if cp.anchored {
|
||||
// Anchored pattern: must match from the root
|
||||
// Match the relative path directly
|
||||
if cp.pattern.Match(relPath) {
|
||||
return true
|
||||
}
|
||||
// Also check if any prefix of the path matches (for directory patterns)
|
||||
parts := strings.Split(relPath, "/")
|
||||
for i := 1; i <= len(parts); i++ {
|
||||
prefix := strings.Join(parts[:i], "/")
|
||||
if cp.pattern.Match(prefix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Unanchored pattern: can match anywhere in path
|
||||
// Check the full relative path
|
||||
if cp.pattern.Match(relPath) {
|
||||
return true
|
||||
}
|
||||
// Check each path component and subpath
|
||||
parts := strings.Split(relPath, "/")
|
||||
for i := range parts {
|
||||
// Match individual component (e.g., ".git" matches ".git" directory)
|
||||
if cp.pattern.Match(parts[i]) {
|
||||
return true
|
||||
}
|
||||
// Match subpath from this component onwards
|
||||
subpath := strings.Join(parts[i:], "/")
|
||||
if cp.pattern.Match(subpath) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// formatNumber formats a number with comma separators
|
||||
func formatNumber(n int) string {
|
||||
if n < 1000 {
|
||||
|
||||
Reference in New Issue
Block a user