vaultik/internal/snapshot/progress.go
sneak e29a995120 Refactor: Move Vaultik struct and methods to internal/vaultik package
- Created new internal/vaultik package with unified Vaultik struct
- Moved all command methods (snapshot, info, prune, verify) from CLI to vaultik package
- Implemented single constructor that handles crypto capabilities automatically
- Added CanDecrypt() method to check if decryption is available
- Updated all CLI commands to use the new vaultik.Vaultik struct
- Removed old fragmented App structs and WithCrypto wrapper
- Fixed context management - Vaultik now owns its context lifecycle
- Cleaned up package imports and dependencies

This creates a cleaner separation between CLI/Cobra code and business logic,
with all vaultik operations now centralized in the internal/vaultik package.
2025-07-26 14:47:26 +02:00

420 lines
13 KiB
Go

package snapshot
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"git.eeqj.de/sneak/vaultik/internal/log"
"github.com/dustin/go-humanize"
)
const (
// SummaryInterval defines how often one-line status updates are printed.
// These updates show current progress, ETA, and the file being processed.
SummaryInterval = 10 * time.Second
// DetailInterval defines how often multi-line detailed status reports are printed.
// These reports include comprehensive statistics about files, chunks, blobs, and uploads.
DetailInterval = 60 * time.Second
// UploadProgressInterval defines how often upload progress messages are logged.
UploadProgressInterval = 15 * time.Second
)
// ProgressStats holds atomic counters for progress tracking
type ProgressStats struct {
FilesScanned atomic.Int64 // Total files seen during scan (includes skipped)
FilesProcessed atomic.Int64 // Files actually processed in phase 2
FilesSkipped atomic.Int64 // Files skipped due to no changes
BytesScanned atomic.Int64 // Bytes from new/changed files only
BytesSkipped atomic.Int64 // Bytes from unchanged files
BytesProcessed atomic.Int64 // Actual bytes processed (for ETA calculation)
ChunksCreated atomic.Int64
BlobsCreated atomic.Int64
BlobsUploaded atomic.Int64
BytesUploaded atomic.Int64
UploadDurationMs atomic.Int64 // Total milliseconds spent uploading to S3
CurrentFile atomic.Value // stores string
TotalSize atomic.Int64 // Total size to process (set after scan phase)
TotalFiles atomic.Int64 // Total files to process in phase 2
ProcessStartTime atomic.Value // stores time.Time when processing starts
StartTime time.Time
mu sync.RWMutex
lastDetailTime time.Time
// Upload tracking
CurrentUpload atomic.Value // stores *UploadInfo
lastChunkingTime time.Time // Track when we last showed chunking progress
}
// UploadInfo tracks current upload progress
type UploadInfo struct {
BlobHash string
Size int64
StartTime time.Time
LastLogTime time.Time
}
// ProgressReporter handles periodic progress reporting
type ProgressReporter struct {
stats *ProgressStats
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
detailTicker *time.Ticker
summaryTicker *time.Ticker
sigChan chan os.Signal
}
// NewProgressReporter creates a new progress reporter
func NewProgressReporter() *ProgressReporter {
stats := &ProgressStats{
StartTime: time.Now().UTC(),
lastDetailTime: time.Now().UTC(),
}
stats.CurrentFile.Store("")
ctx, cancel := context.WithCancel(context.Background())
pr := &ProgressReporter{
stats: stats,
ctx: ctx,
cancel: cancel,
summaryTicker: time.NewTicker(SummaryInterval),
detailTicker: time.NewTicker(DetailInterval),
sigChan: make(chan os.Signal, 1),
}
// Register for SIGUSR1
signal.Notify(pr.sigChan, syscall.SIGUSR1)
return pr
}
// Start begins the progress reporting
func (pr *ProgressReporter) Start() {
pr.wg.Add(1)
go pr.run()
// Print initial multi-line status
pr.printDetailedStatus()
}
// Stop stops the progress reporting
func (pr *ProgressReporter) Stop() {
pr.cancel()
pr.summaryTicker.Stop()
pr.detailTicker.Stop()
signal.Stop(pr.sigChan)
close(pr.sigChan)
pr.wg.Wait()
}
// GetStats returns the progress stats for updating
func (pr *ProgressReporter) GetStats() *ProgressStats {
return pr.stats
}
// SetTotalSize sets the total size to process (after scan phase)
func (pr *ProgressReporter) SetTotalSize(size int64) {
pr.stats.TotalSize.Store(size)
pr.stats.ProcessStartTime.Store(time.Now().UTC())
}
// run is the main progress reporting loop
func (pr *ProgressReporter) run() {
defer pr.wg.Done()
for {
select {
case <-pr.ctx.Done():
return
case <-pr.summaryTicker.C:
pr.printSummaryStatus()
case <-pr.detailTicker.C:
pr.printDetailedStatus()
case <-pr.sigChan:
// SIGUSR1 received, print detailed status
log.Info("SIGUSR1 received, printing detailed status")
pr.printDetailedStatus()
}
}
}
// printSummaryStatus prints a one-line status update
func (pr *ProgressReporter) printSummaryStatus() {
// Check if we're currently uploading
if uploadInfo, ok := pr.stats.CurrentUpload.Load().(*UploadInfo); ok && uploadInfo != nil {
// Show upload progress instead
pr.printUploadProgress(uploadInfo)
return
}
// Only show chunking progress if we've done chunking recently
pr.stats.mu.RLock()
timeSinceLastChunk := time.Since(pr.stats.lastChunkingTime)
pr.stats.mu.RUnlock()
if timeSinceLastChunk > SummaryInterval*2 {
// No recent chunking activity, don't show progress
return
}
elapsed := time.Since(pr.stats.StartTime)
bytesScanned := pr.stats.BytesScanned.Load()
bytesSkipped := pr.stats.BytesSkipped.Load()
bytesProcessed := pr.stats.BytesProcessed.Load()
totalSize := pr.stats.TotalSize.Load()
currentFile := pr.stats.CurrentFile.Load().(string)
// Calculate ETA if we have total size and are processing
etaStr := ""
if totalSize > 0 && bytesProcessed > 0 {
processStart, ok := pr.stats.ProcessStartTime.Load().(time.Time)
if ok && !processStart.IsZero() {
processElapsed := time.Since(processStart)
rate := float64(bytesProcessed) / processElapsed.Seconds()
if rate > 0 {
remainingBytes := totalSize - bytesProcessed
remainingSeconds := float64(remainingBytes) / rate
eta := time.Duration(remainingSeconds * float64(time.Second))
etaStr = fmt.Sprintf(" | ETA: %s", formatDuration(eta))
}
}
}
rate := float64(bytesScanned+bytesSkipped) / elapsed.Seconds()
// Show files processed / total files to process
filesProcessed := pr.stats.FilesProcessed.Load()
totalFiles := pr.stats.TotalFiles.Load()
status := fmt.Sprintf("Snapshot progress: %d/%d files, %s/%s (%.1f%%), %s/s%s",
filesProcessed,
totalFiles,
humanize.Bytes(uint64(bytesProcessed)),
humanize.Bytes(uint64(totalSize)),
float64(bytesProcessed)/float64(totalSize)*100,
humanize.Bytes(uint64(rate)),
etaStr,
)
if currentFile != "" {
status += fmt.Sprintf(" | Current: %s", truncatePath(currentFile, 40))
}
log.Info(status)
}
// printDetailedStatus prints a multi-line detailed status
func (pr *ProgressReporter) printDetailedStatus() {
pr.stats.mu.Lock()
pr.stats.lastDetailTime = time.Now().UTC()
pr.stats.mu.Unlock()
elapsed := time.Since(pr.stats.StartTime)
filesScanned := pr.stats.FilesScanned.Load()
filesSkipped := pr.stats.FilesSkipped.Load()
bytesScanned := pr.stats.BytesScanned.Load()
bytesSkipped := pr.stats.BytesSkipped.Load()
bytesProcessed := pr.stats.BytesProcessed.Load()
totalSize := pr.stats.TotalSize.Load()
chunksCreated := pr.stats.ChunksCreated.Load()
blobsCreated := pr.stats.BlobsCreated.Load()
blobsUploaded := pr.stats.BlobsUploaded.Load()
bytesUploaded := pr.stats.BytesUploaded.Load()
currentFile := pr.stats.CurrentFile.Load().(string)
totalBytes := bytesScanned + bytesSkipped
rate := float64(totalBytes) / elapsed.Seconds()
log.Notice("=== Snapshot Progress Report ===")
log.Info("Elapsed time", "duration", formatDuration(elapsed))
// Calculate and show ETA if we have data
if totalSize > 0 && bytesProcessed > 0 {
processStart, ok := pr.stats.ProcessStartTime.Load().(time.Time)
if ok && !processStart.IsZero() {
processElapsed := time.Since(processStart)
processRate := float64(bytesProcessed) / processElapsed.Seconds()
if processRate > 0 {
remainingBytes := totalSize - bytesProcessed
remainingSeconds := float64(remainingBytes) / processRate
eta := time.Duration(remainingSeconds * float64(time.Second))
percentComplete := float64(bytesProcessed) / float64(totalSize) * 100
log.Info("Overall progress",
"percent", fmt.Sprintf("%.1f%%", percentComplete),
"processed", humanize.Bytes(uint64(bytesProcessed)),
"total", humanize.Bytes(uint64(totalSize)),
"rate", humanize.Bytes(uint64(processRate))+"/s",
"eta", formatDuration(eta))
}
}
}
log.Info("Files processed",
"scanned", filesScanned,
"skipped", filesSkipped,
"total", filesScanned,
"skip_rate", formatPercent(filesSkipped, filesScanned))
log.Info("Data scanned",
"new", humanize.Bytes(uint64(bytesScanned)),
"skipped", humanize.Bytes(uint64(bytesSkipped)),
"total", humanize.Bytes(uint64(totalBytes)),
"scan_rate", humanize.Bytes(uint64(rate))+"/s")
log.Info("Chunks created", "count", chunksCreated)
log.Info("Blobs status",
"created", blobsCreated,
"uploaded", blobsUploaded,
"pending", blobsCreated-blobsUploaded)
log.Info("Total uploaded to S3",
"uploaded", humanize.Bytes(uint64(bytesUploaded)),
"compression_ratio", formatRatio(bytesUploaded, bytesScanned))
if currentFile != "" {
log.Info("Current file", "path", currentFile)
}
log.Notice("=============================")
}
// Helper functions
func formatDuration(d time.Duration) string {
if d < 0 {
return "unknown"
}
if d < time.Minute {
return fmt.Sprintf("%ds", int(d.Seconds()))
}
if d < time.Hour {
return fmt.Sprintf("%dm%ds", int(d.Minutes()), int(d.Seconds())%60)
}
return fmt.Sprintf("%dh%dm", int(d.Hours()), int(d.Minutes())%60)
}
func formatPercent(numerator, denominator int64) string {
if denominator == 0 {
return "0.0%"
}
return fmt.Sprintf("%.1f%%", float64(numerator)/float64(denominator)*100)
}
func formatRatio(compressed, uncompressed int64) string {
if uncompressed == 0 {
return "1.00"
}
ratio := float64(compressed) / float64(uncompressed)
return fmt.Sprintf("%.2f", ratio)
}
func truncatePath(path string, maxLen int) string {
if len(path) <= maxLen {
return path
}
// Keep the last maxLen-3 characters and prepend "..."
return "..." + path[len(path)-(maxLen-3):]
}
// printUploadProgress prints upload progress
func (pr *ProgressReporter) printUploadProgress(info *UploadInfo) {
// This function is called repeatedly during upload, not just at start
// Don't print anything here - the actual progress is shown by ReportUploadProgress
}
// ReportUploadStart marks the beginning of a blob upload
func (pr *ProgressReporter) ReportUploadStart(blobHash string, size int64) {
info := &UploadInfo{
BlobHash: blobHash,
Size: size,
StartTime: time.Now().UTC(),
}
pr.stats.CurrentUpload.Store(info)
// Log the start of upload
log.Info("Starting blob upload to S3",
"hash", blobHash[:8]+"...",
"size", humanize.Bytes(uint64(size)))
}
// ReportUploadComplete marks the completion of a blob upload
func (pr *ProgressReporter) ReportUploadComplete(blobHash string, size int64, duration time.Duration) {
// Clear current upload
pr.stats.CurrentUpload.Store((*UploadInfo)(nil))
// Add to total upload duration
pr.stats.UploadDurationMs.Add(duration.Milliseconds())
// Calculate speed
if duration < time.Millisecond {
duration = time.Millisecond
}
bytesPerSec := float64(size) / duration.Seconds()
bitsPerSec := bytesPerSec * 8
// Format speed
var speedStr string
if bitsPerSec >= 1e9 {
speedStr = fmt.Sprintf("%.1fGbit/sec", bitsPerSec/1e9)
} else if bitsPerSec >= 1e6 {
speedStr = fmt.Sprintf("%.0fMbit/sec", bitsPerSec/1e6)
} else if bitsPerSec >= 1e3 {
speedStr = fmt.Sprintf("%.0fKbit/sec", bitsPerSec/1e3)
} else {
speedStr = fmt.Sprintf("%.0fbit/sec", bitsPerSec)
}
log.Info("Blob upload completed",
"hash", blobHash[:8]+"...",
"size", humanize.Bytes(uint64(size)),
"duration", formatDuration(duration),
"speed", speedStr)
}
// UpdateChunkingActivity updates the last chunking time
func (pr *ProgressReporter) UpdateChunkingActivity() {
pr.stats.mu.Lock()
pr.stats.lastChunkingTime = time.Now().UTC()
pr.stats.mu.Unlock()
}
// ReportUploadProgress reports current upload progress with instantaneous speed
func (pr *ProgressReporter) ReportUploadProgress(blobHash string, bytesUploaded, totalSize int64, instantSpeed float64) {
// Update the current upload info with progress
if uploadInfo, ok := pr.stats.CurrentUpload.Load().(*UploadInfo); ok && uploadInfo != nil {
now := time.Now()
// Only log at the configured interval
if now.Sub(uploadInfo.LastLogTime) >= UploadProgressInterval {
// Format speed in bits/second using humanize
bitsPerSec := instantSpeed * 8
speedStr := humanize.SI(bitsPerSec, "bit/sec")
percent := float64(bytesUploaded) / float64(totalSize) * 100
// Calculate ETA based on current speed
etaStr := "unknown"
if instantSpeed > 0 && bytesUploaded < totalSize {
remainingBytes := totalSize - bytesUploaded
remainingSeconds := float64(remainingBytes) / instantSpeed
eta := time.Duration(remainingSeconds * float64(time.Second))
etaStr = formatDuration(eta)
}
log.Info("Blob upload progress",
"hash", blobHash[:8]+"...",
"progress", fmt.Sprintf("%.1f%%", percent),
"uploaded", humanize.Bytes(uint64(bytesUploaded)),
"total", humanize.Bytes(uint64(totalSize)),
"speed", speedStr,
"eta", etaStr)
uploadInfo.LastLogTime = now
}
}
}