mfer/internal/scanner/scanner.go
sneak c5ca3e2ced Change FileProgress callback to channel-based progress
Replace callback-based progress reporting in Builder.AddFile with
channel-based FileHashProgress for consistency with EnumerateStatus
and ScanStatus patterns. Update scanner.go to use the new channel API.
2025-12-17 14:30:10 -08:00

425 lines
11 KiB
Go

package scanner
import (
"context"
"io"
"io/fs"
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/spf13/afero"
"sneak.berlin/go/mfer/mfer"
)
// Phase 1: Enumeration
// ---------------------
// Walking directories and calling stat() on files to collect metadata.
// Builds the list of files to be scanned. Relatively fast (metadata only).
// EnumerateStatus contains progress information for the enumeration phase.
type EnumerateStatus struct {
FilesFound int64 // Number of files discovered so far
BytesFound int64 // Total size of discovered files (from stat)
}
// Phase 2: Scan (ToManifest)
// --------------------------
// Reading file contents and computing hashes for manifest generation.
// This is the expensive phase that reads all file data.
// ScanStatus contains progress information for the scan phase.
type ScanStatus struct {
TotalFiles int64 // Total number of files to scan
ScannedFiles int64 // Number of files scanned so far
TotalBytes int64 // Total bytes to read (sum of all file sizes)
ScannedBytes int64 // Bytes read so far
BytesPerSec float64 // Current throughput rate
ETA time.Duration // Estimated time to completion
}
// Options configures scanner behavior.
type Options struct {
IgnoreDotfiles bool // Skip files and directories starting with a dot
FollowSymLinks bool // Resolve symlinks instead of skipping them
Fs afero.Fs // Filesystem to use, defaults to OsFs if nil
}
// FileEntry represents a file that has been enumerated.
type FileEntry struct {
Path string // Relative path (used in manifest)
AbsPath string // Absolute path (used for reading file content)
Size int64 // File size in bytes
Mtime time.Time // Last modification time
Ctime time.Time // Creation time (platform-dependent)
}
// Scanner accumulates files and generates manifests from them.
type Scanner struct {
mu sync.RWMutex
files []*FileEntry
options *Options
fs afero.Fs
}
// New creates a new Scanner with default options.
func New() *Scanner {
return NewWithOptions(nil)
}
// NewWithOptions creates a new Scanner with the given options.
func NewWithOptions(opts *Options) *Scanner {
if opts == nil {
opts = &Options{}
}
fs := opts.Fs
if fs == nil {
fs = afero.NewOsFs()
}
return &Scanner{
files: make([]*FileEntry, 0),
options: opts,
fs: fs,
}
}
// EnumerateFile adds a single file to the scanner, calling stat() to get metadata.
func (s *Scanner) EnumerateFile(filePath string) error {
abs, err := filepath.Abs(filePath)
if err != nil {
return err
}
info, err := s.fs.Stat(abs)
if err != nil {
return err
}
// For single files, use the filename as the relative path
basePath := filepath.Dir(abs)
return s.enumerateFileWithInfo(filepath.Base(abs), basePath, info, nil)
}
// EnumeratePath walks a directory path and adds all files to the scanner.
// If progress is non-nil, status updates are sent as files are discovered.
// The progress channel is closed when the method returns.
func (s *Scanner) EnumeratePath(inputPath string, progress chan<- EnumerateStatus) error {
if progress != nil {
defer close(progress)
}
abs, err := filepath.Abs(inputPath)
if err != nil {
return err
}
afs := afero.NewReadOnlyFs(afero.NewBasePathFs(s.fs, abs))
return s.enumerateFS(afs, abs, progress)
}
// EnumeratePaths walks multiple directory paths and adds all files to the scanner.
// If progress is non-nil, status updates are sent as files are discovered.
// The progress channel is closed when the method returns.
func (s *Scanner) EnumeratePaths(progress chan<- EnumerateStatus, inputPaths ...string) error {
if progress != nil {
defer close(progress)
}
for _, p := range inputPaths {
abs, err := filepath.Abs(p)
if err != nil {
return err
}
afs := afero.NewReadOnlyFs(afero.NewBasePathFs(s.fs, abs))
if err := s.enumerateFS(afs, abs, progress); err != nil {
return err
}
}
return nil
}
// EnumerateFS walks an afero filesystem and adds all files to the scanner.
// If progress is non-nil, status updates are sent as files are discovered.
// The progress channel is closed when the method returns.
// basePath is used to compute absolute paths for file reading.
func (s *Scanner) EnumerateFS(afs afero.Fs, basePath string, progress chan<- EnumerateStatus) error {
if progress != nil {
defer close(progress)
}
return s.enumerateFS(afs, basePath, progress)
}
// enumerateFS is the internal implementation that doesn't close the progress channel.
func (s *Scanner) enumerateFS(afs afero.Fs, basePath string, progress chan<- EnumerateStatus) error {
return afero.Walk(afs, "/", func(p string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if s.options.IgnoreDotfiles && pathIsHidden(p) {
if info.IsDir() {
return filepath.SkipDir
}
return nil
}
return s.enumerateFileWithInfo(p, basePath, info, progress)
})
}
// enumerateFileWithInfo adds a file with pre-existing fs.FileInfo.
func (s *Scanner) enumerateFileWithInfo(filePath string, basePath string, info fs.FileInfo, progress chan<- EnumerateStatus) error {
if info.IsDir() {
// Manifests contain only files, directories are implied
return nil
}
// Clean the path - remove leading slash if present
cleanPath := filePath
if len(cleanPath) > 0 && cleanPath[0] == '/' {
cleanPath = cleanPath[1:]
}
// Compute absolute path for file reading
absPath := filepath.Join(basePath, cleanPath)
// Handle symlinks
if info.Mode()&fs.ModeSymlink != 0 {
if !s.options.FollowSymLinks {
// Skip symlinks when not following them
return nil
}
// Resolve symlink to get real file info
realPath, err := filepath.EvalSymlinks(absPath)
if err != nil {
// Skip broken symlinks
return nil
}
realInfo, err := s.fs.Stat(realPath)
if err != nil {
return nil
}
// Skip if symlink points to a directory
if realInfo.IsDir() {
return nil
}
// Use resolved path for reading, but keep original path in manifest
absPath = realPath
info = realInfo
}
entry := &FileEntry{
Path: cleanPath,
AbsPath: absPath,
Size: info.Size(),
Mtime: info.ModTime(),
// Note: Ctime not available from fs.FileInfo on all platforms
// Will need platform-specific code to extract it
}
s.mu.Lock()
s.files = append(s.files, entry)
filesFound := int64(len(s.files))
var bytesFound int64
for _, f := range s.files {
bytesFound += f.Size
}
s.mu.Unlock()
sendEnumerateStatus(progress, EnumerateStatus{
FilesFound: filesFound,
BytesFound: bytesFound,
})
return nil
}
// Files returns a copy of all files added to the scanner.
func (s *Scanner) Files() []*FileEntry {
s.mu.RLock()
defer s.mu.RUnlock()
out := make([]*FileEntry, len(s.files))
copy(out, s.files)
return out
}
// FileCount returns the number of files in the scanner.
func (s *Scanner) FileCount() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return int64(len(s.files))
}
// TotalBytes returns the total size of all files in the scanner.
func (s *Scanner) TotalBytes() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
var total int64
for _, f := range s.files {
total += f.Size
}
return total
}
// ToManifest reads all file contents, computes hashes, and generates a manifest.
// If progress is non-nil, status updates are sent approximately once per second.
// The progress channel is closed when the method returns.
// The manifest is written to the provided io.Writer.
func (s *Scanner) ToManifest(ctx context.Context, w io.Writer, progress chan<- ScanStatus) error {
if progress != nil {
defer close(progress)
}
s.mu.RLock()
files := make([]*FileEntry, len(s.files))
copy(files, s.files)
totalFiles := int64(len(files))
var totalBytes int64
for _, f := range files {
totalBytes += f.Size
}
s.mu.RUnlock()
builder := mfer.NewBuilder()
var scannedFiles int64
var scannedBytes int64
lastProgressTime := time.Now()
startTime := time.Now()
for _, entry := range files {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Open file
f, err := s.fs.Open(entry.AbsPath)
if err != nil {
return err
}
// Create progress channel for this file
var fileProgress chan mfer.FileHashProgress
var wg sync.WaitGroup
if progress != nil {
fileProgress = make(chan mfer.FileHashProgress, 1)
wg.Add(1)
go func(baseScannedBytes int64) {
defer wg.Done()
for p := range fileProgress {
// Send progress at most once per second
now := time.Now()
if now.Sub(lastProgressTime) >= time.Second {
elapsed := now.Sub(startTime).Seconds()
currentBytes := baseScannedBytes + p.BytesRead
var rate float64
var eta time.Duration
if elapsed > 0 && currentBytes > 0 {
rate = float64(currentBytes) / elapsed
remainingBytes := totalBytes - currentBytes
if rate > 0 {
eta = time.Duration(float64(remainingBytes)/rate) * time.Second
}
}
sendScanStatus(progress, ScanStatus{
TotalFiles: totalFiles,
ScannedFiles: scannedFiles,
TotalBytes: totalBytes,
ScannedBytes: currentBytes,
BytesPerSec: rate,
ETA: eta,
})
lastProgressTime = now
}
}
}(scannedBytes)
}
// Add to manifest with progress channel
bytesRead, err := builder.AddFile(
entry.Path,
entry.Size,
entry.Mtime,
f,
fileProgress,
)
_ = f.Close()
// Close channel and wait for goroutine to finish
if fileProgress != nil {
close(fileProgress)
wg.Wait()
}
if err != nil {
return err
}
scannedFiles++
scannedBytes += bytesRead
}
// Send final progress (ETA is 0 at completion)
if progress != nil {
elapsed := time.Since(startTime).Seconds()
var rate float64
if elapsed > 0 {
rate = float64(scannedBytes) / elapsed
}
sendScanStatus(progress, ScanStatus{
TotalFiles: totalFiles,
ScannedFiles: scannedFiles,
TotalBytes: totalBytes,
ScannedBytes: scannedBytes,
BytesPerSec: rate,
ETA: 0,
})
}
// Build and write manifest
return builder.Build(w)
}
// pathIsHidden returns true if the path or any of its parent directories
// start with a dot (hidden files/directories).
func pathIsHidden(p string) bool {
tp := path.Clean(p)
if strings.HasPrefix(tp, ".") {
return true
}
for {
d, f := path.Split(tp)
if strings.HasPrefix(f, ".") {
return true
}
if d == "" {
return false
}
tp = d[0 : len(d)-1] // trim trailing slash from dir
}
}
// sendEnumerateStatus sends a status update without blocking.
// If the channel is full, the update is dropped.
func sendEnumerateStatus(ch chan<- EnumerateStatus, status EnumerateStatus) {
if ch == nil {
return
}
select {
case ch <- status:
default:
// Channel full, drop this update
}
}
// sendScanStatus sends a status update without blocking.
// If the channel is full, the update is dropped.
func sendScanStatus(ch chan<- ScanStatus, status ScanStatus) {
if ch == nil {
return
}
select {
case ch <- status:
default:
// Channel full, drop this update
}
}