speed optimizations
This commit is contained in:
parent
2d46dc70a9
commit
cf79e008b5
@ -10,8 +10,21 @@ const (
|
|||||||
WALFilename = "firehose.db-wal"
|
WALFilename = "firehose.db-wal"
|
||||||
SHMFilename = "firehose.db-shm"
|
SHMFilename = "firehose.db-shm"
|
||||||
|
|
||||||
MinTmpFreeBytes = 500 * 1024 * 1024 * 1024 // 500 GB
|
// Disk space requirements
|
||||||
MinDailiesFreeBytes = 20 * 1024 * 1024 * 1024 // 20 GB
|
bytesPerGB = 1024 * 1024 * 1024
|
||||||
|
MinTmpFreeGB = 500
|
||||||
|
MinDailiesFreeGB = 20
|
||||||
|
MinTmpFreeBytes = MinTmpFreeGB * bytesPerGB
|
||||||
|
MinDailiesFreeBytes = MinDailiesFreeGB * bytesPerGB
|
||||||
|
|
||||||
|
// SQLite cache size in KB (200MB)
|
||||||
|
sqliteCacheSizeKB = 200000
|
||||||
|
|
||||||
|
// Compression level for zstd
|
||||||
|
zstdCompressionLevel = 15
|
||||||
|
|
||||||
|
// Verification sample lines
|
||||||
|
verificationHeadLines = 20
|
||||||
)
|
)
|
||||||
|
|
||||||
var snapshotPattern = regexp.MustCompile(`^zfs-auto-snap_daily-(\d{4}-\d{2}-\d{2})-\d{4}$`)
|
var snapshotPattern = regexp.MustCompile(`^zfs-auto-snap_daily-(\d{4}-\d{2}-\d{2})-\d{4}$`)
|
||||||
|
|||||||
@ -7,6 +7,8 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const copyBufferSize = 4 * 1024 * 1024 // 4MB buffer for large file copies
|
||||||
|
|
||||||
func CopyFile(src, dst string) (err error) {
|
func CopyFile(src, dst string) (err error) {
|
||||||
slog.Info("copying file", "src", src, "dst", dst)
|
slog.Info("copying file", "src", src, "dst", dst)
|
||||||
|
|
||||||
@ -31,7 +33,9 @@ func CopyFile(src, dst string) (err error) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
written, err := io.Copy(dstFile, srcFile)
|
// Use a larger buffer for better performance with large database files
|
||||||
|
buf := make([]byte, copyBufferSize)
|
||||||
|
written, err := io.CopyBuffer(dstFile, srcFile, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("copying data: %w", err)
|
return fmt.Errorf("copying data: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -13,8 +13,8 @@ func CheckFreeSpace(path string, minBytes uint64, label string) error {
|
|||||||
return fmt.Errorf("statfs %s (%s): %w", path, label, err)
|
return fmt.Errorf("statfs %s (%s): %w", path, label, err)
|
||||||
}
|
}
|
||||||
free := uint64(stat.Bavail) * uint64(stat.Bsize)
|
free := uint64(stat.Bavail) * uint64(stat.Bsize)
|
||||||
freeGB := float64(free) / (1024 * 1024 * 1024)
|
freeGB := float64(free) / float64(bytesPerGB)
|
||||||
minGB := float64(minBytes) / (1024 * 1024 * 1024)
|
minGB := float64(minBytes) / float64(bytesPerGB)
|
||||||
slog.Info("disk space check", "label", label, "path", path,
|
slog.Info("disk space check", "label", label, "path", path,
|
||||||
"free_gb", fmt.Sprintf("%.1f", freeGB),
|
"free_gb", fmt.Sprintf("%.1f", freeGB),
|
||||||
"required_gb", fmt.Sprintf("%.1f", minGB))
|
"required_gb", fmt.Sprintf("%.1f", minGB))
|
||||||
|
|||||||
@ -31,7 +31,7 @@ func DumpAndCompress(dbPath, outputPath string) (err error) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
dumpCmd := exec.Command("sqlite3", dbPath, ".dump")
|
dumpCmd := exec.Command("sqlite3", dbPath, ".dump")
|
||||||
zstdCmd := exec.Command("zstdmt", "-15")
|
zstdCmd := exec.Command("zstdmt", fmt.Sprintf("-%d", zstdCompressionLevel))
|
||||||
|
|
||||||
pipe, err := dumpCmd.StdoutPipe()
|
pipe, err := dumpCmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -68,8 +68,9 @@ func DumpAndCompress(dbPath, outputPath string) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("stat output: %w", err)
|
return fmt.Errorf("stat output: %w", err)
|
||||||
}
|
}
|
||||||
|
const bytesPerMB = 1024 * 1024
|
||||||
slog.Info("compressed output written", "path", outputPath,
|
slog.Info("compressed output written", "path", outputPath,
|
||||||
"size_bytes", info.Size(), "size_mb", info.Size()/(1024*1024))
|
"size_bytes", info.Size(), "size_mb", info.Size()/bytesPerMB)
|
||||||
|
|
||||||
if info.Size() == 0 {
|
if info.Size() == 0 {
|
||||||
return fmt.Errorf("compressed output is empty")
|
return fmt.Errorf("compressed output is empty")
|
||||||
|
|||||||
@ -22,7 +22,9 @@ func ExtractDay(srcDBPath, dstDBPath string, targetDay time.Time) error {
|
|||||||
|
|
||||||
slog.Info("extracting day", "from", dayStart, "until", dayEnd)
|
slog.Info("extracting day", "from", dayStart, "until", dayEnd)
|
||||||
|
|
||||||
db, err := sql.Open("sqlite", dstDBPath+"?_pragma=journal_mode(MEMORY)&_pragma=synchronous(OFF)&_pragma=cache_size(-200000)&_pragma=foreign_keys(OFF)")
|
// Maximum performance pragmas - we don't care about crash safety for temp files
|
||||||
|
pragmas := fmt.Sprintf("?_pragma=journal_mode(OFF)&_pragma=synchronous(OFF)&_pragma=cache_size(%d)&_pragma=foreign_keys(OFF)&_pragma=locking_mode(EXCLUSIVE)&_pragma=temp_store(MEMORY)", sqliteCacheSizeKB)
|
||||||
|
db, err := sql.Open("sqlite", dstDBPath+pragmas)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("opening destination database: %w", err)
|
return fmt.Errorf("opening destination database: %w", err)
|
||||||
}
|
}
|
||||||
@ -59,9 +61,20 @@ func ExtractDay(srcDBPath, dstDBPath string, targetDay time.Time) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Begin transaction for bulk inserts
|
||||||
|
tx, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("beginning transaction: %w", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Insert target day's data
|
// Insert target day's data
|
||||||
slog.Info("inserting posts for target day")
|
slog.Info("inserting posts for target day")
|
||||||
result, err := db.Exec("INSERT INTO posts SELECT * FROM src.posts WHERE timestamp >= ? AND timestamp < ?", dayStart, dayEnd)
|
result, err := tx.Exec("INSERT INTO posts SELECT * FROM src.posts WHERE timestamp >= ? AND timestamp < ?", dayStart, dayEnd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("inserting posts: %w", err)
|
return fmt.Errorf("inserting posts: %w", err)
|
||||||
}
|
}
|
||||||
@ -74,26 +87,43 @@ func ExtractDay(srcDBPath, dstDBPath string, targetDay time.Time) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("inserting junction and lookup tables")
|
slog.Info("inserting junction and lookup tables")
|
||||||
if _, err := db.Exec("INSERT INTO posts_hashtags SELECT * FROM src.posts_hashtags WHERE post_id IN (SELECT id FROM posts)"); err != nil {
|
if _, err := tx.Exec("INSERT INTO posts_hashtags SELECT * FROM src.posts_hashtags WHERE post_id IN (SELECT id FROM posts)"); err != nil {
|
||||||
return fmt.Errorf("inserting posts_hashtags: %w", err)
|
return fmt.Errorf("inserting posts_hashtags: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := db.Exec("INSERT INTO posts_urls SELECT * FROM src.posts_urls WHERE post_id IN (SELECT id FROM posts)"); err != nil {
|
if _, err := tx.Exec("INSERT INTO posts_urls SELECT * FROM src.posts_urls WHERE post_id IN (SELECT id FROM posts)"); err != nil {
|
||||||
return fmt.Errorf("inserting posts_urls: %w", err)
|
return fmt.Errorf("inserting posts_urls: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := db.Exec("INSERT INTO hashtags SELECT * FROM src.hashtags WHERE id IN (SELECT hashtag_id FROM posts_hashtags)"); err != nil {
|
if _, err := tx.Exec("INSERT INTO hashtags SELECT * FROM src.hashtags WHERE id IN (SELECT hashtag_id FROM posts_hashtags)"); err != nil {
|
||||||
return fmt.Errorf("inserting hashtags: %w", err)
|
return fmt.Errorf("inserting hashtags: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := db.Exec("INSERT INTO urls SELECT * FROM src.urls WHERE id IN (SELECT url_id FROM posts_urls)"); err != nil {
|
if _, err := tx.Exec("INSERT INTO urls SELECT * FROM src.urls WHERE id IN (SELECT url_id FROM posts_urls)"); err != nil {
|
||||||
return fmt.Errorf("inserting urls: %w", err)
|
return fmt.Errorf("inserting urls: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := db.Exec("INSERT INTO users SELECT * FROM src.users WHERE did IN (SELECT user_did FROM posts)"); err != nil {
|
if _, err := tx.Exec("INSERT INTO users SELECT * FROM src.users WHERE did IN (SELECT user_did FROM posts)"); err != nil {
|
||||||
return fmt.Errorf("inserting users: %w", err)
|
return fmt.Errorf("inserting users: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if media table exists in source and copy if present
|
||||||
|
var mediaTableExists int
|
||||||
|
if err := db.QueryRow("SELECT COUNT(*) FROM src.sqlite_master WHERE type='table' AND name='media'").Scan(&mediaTableExists); err != nil {
|
||||||
|
slog.Warn("checking for media table", "error", err)
|
||||||
|
} else if mediaTableExists > 0 {
|
||||||
|
slog.Info("inserting media entries")
|
||||||
|
// Get post blob_cids for this day's posts
|
||||||
|
if _, err := tx.Exec("INSERT INTO media SELECT * FROM src.media WHERE content_hash IN (SELECT blob_cids FROM posts WHERE blob_cids IS NOT NULL)"); err != nil {
|
||||||
|
slog.Warn("inserting media (may not have matching entries)", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit the transaction
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return fmt.Errorf("committing transaction: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Create indexes after bulk insert for speed
|
// Create indexes after bulk insert for speed
|
||||||
slog.Info("creating indexes")
|
slog.Info("creating indexes")
|
||||||
idxRows, err := db.Query("SELECT sql FROM src.sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%' AND sql IS NOT NULL ORDER BY name")
|
idxRows, err := db.Query("SELECT sql FROM src.sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%' AND sql IS NOT NULL ORDER BY name")
|
||||||
|
|||||||
148
internal/bsdaily/parallel.go
Normal file
148
internal/bsdaily/parallel.go
Normal file
@ -0,0 +1,148 @@
|
|||||||
|
package bsdaily
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxParallelExtractions = 2 // Limited to minimize seek thrashing on spinning disks
|
||||||
|
|
||||||
|
type extractionJob struct {
|
||||||
|
targetDay time.Time
|
||||||
|
srcDB string
|
||||||
|
tmpDir string
|
||||||
|
}
|
||||||
|
|
||||||
|
type extractionResult struct {
|
||||||
|
targetDay time.Time
|
||||||
|
err error
|
||||||
|
skipped bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func processParallel(jobs []extractionJob) (processed, skipped int, firstErr error) {
|
||||||
|
// Use limited parallelism - too many concurrent SQLite operations can be counterproductive
|
||||||
|
workers := runtime.NumCPU()
|
||||||
|
if workers > maxParallelExtractions {
|
||||||
|
workers = maxParallelExtractions
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("processing days in parallel", "workers", workers, "total_days", len(jobs))
|
||||||
|
|
||||||
|
jobsChan := make(chan extractionJob, len(jobs))
|
||||||
|
resultsChan := make(chan extractionResult, len(jobs))
|
||||||
|
|
||||||
|
// Start workers
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < workers; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(workerID int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for job := range jobsChan {
|
||||||
|
result := processSingleDay(job)
|
||||||
|
resultsChan <- result
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send all jobs
|
||||||
|
for _, job := range jobs {
|
||||||
|
jobsChan <- job
|
||||||
|
}
|
||||||
|
close(jobsChan)
|
||||||
|
|
||||||
|
// Wait for workers to finish
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(resultsChan)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Collect results
|
||||||
|
for result := range resultsChan {
|
||||||
|
if result.err != nil {
|
||||||
|
if firstErr == nil {
|
||||||
|
firstErr = result.err
|
||||||
|
}
|
||||||
|
slog.Error("day processing failed", "date", result.targetDay.Format("2006-01-02"), "error", result.err)
|
||||||
|
} else if result.skipped {
|
||||||
|
skipped++
|
||||||
|
} else {
|
||||||
|
processed++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return processed, skipped, firstErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func processSingleDay(job extractionJob) extractionResult {
|
||||||
|
targetDay := job.targetDay
|
||||||
|
dayStr := targetDay.Format("2006-01-02")
|
||||||
|
|
||||||
|
slog.Info("processing day", "date", dayStr)
|
||||||
|
|
||||||
|
// Check if output already exists
|
||||||
|
outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01"))
|
||||||
|
outputFinal := filepath.Join(outputDir, dayStr+".sql.zst")
|
||||||
|
if _, err := os.Stat(outputFinal); err == nil {
|
||||||
|
slog.Info("output already exists, skipping", "path", outputFinal)
|
||||||
|
return extractionResult{targetDay: targetDay, skipped: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract target day into a per-day database
|
||||||
|
extractedDB := filepath.Join(job.tmpDir, "extracted-"+dayStr+".db")
|
||||||
|
slog.Info("extracting target day", "src", job.srcDB, "dst", extractedDB)
|
||||||
|
if err := ExtractDay(job.srcDB, extractedDB, targetDay); err != nil {
|
||||||
|
if err == ErrNoPosts {
|
||||||
|
slog.Warn("no posts found, skipping day", "date", dayStr)
|
||||||
|
os.Remove(extractedDB)
|
||||||
|
return extractionResult{targetDay: targetDay, skipped: true}
|
||||||
|
}
|
||||||
|
return extractionResult{targetDay: targetDay, err: fmt.Errorf("extracting day %s: %w", dayStr, err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dump to SQL and compress
|
||||||
|
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
||||||
|
os.Remove(extractedDB)
|
||||||
|
return extractionResult{targetDay: targetDay, err: fmt.Errorf("creating output directory %s: %w", outputDir, err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp")
|
||||||
|
|
||||||
|
slog.Info("dumping and compressing", "tmp_output", outputTmp)
|
||||||
|
if err := DumpAndCompress(extractedDB, outputTmp); err != nil {
|
||||||
|
os.Remove(outputTmp)
|
||||||
|
os.Remove(extractedDB)
|
||||||
|
return extractionResult{targetDay: targetDay, err: fmt.Errorf("dump and compress for %s: %w", dayStr, err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("verifying compressed output", "date", dayStr)
|
||||||
|
if err := VerifyOutput(outputTmp); err != nil {
|
||||||
|
os.Remove(outputTmp)
|
||||||
|
os.Remove(extractedDB)
|
||||||
|
return extractionResult{targetDay: targetDay, err: fmt.Errorf("verification failed for %s: %w", dayStr, err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Atomic rename to final path
|
||||||
|
slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal)
|
||||||
|
if err := os.Rename(outputTmp, outputFinal); err != nil {
|
||||||
|
os.Remove(outputTmp)
|
||||||
|
os.Remove(extractedDB)
|
||||||
|
return extractionResult{targetDay: targetDay, err: fmt.Errorf("atomic rename for %s: %w", dayStr, err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := os.Stat(outputFinal)
|
||||||
|
if err != nil {
|
||||||
|
os.Remove(extractedDB)
|
||||||
|
return extractionResult{targetDay: targetDay, err: fmt.Errorf("stat final output: %w", err)}
|
||||||
|
}
|
||||||
|
slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size())
|
||||||
|
|
||||||
|
// Remove extracted DB to reclaim space
|
||||||
|
os.Remove(extractedDB)
|
||||||
|
|
||||||
|
return extractionResult{targetDay: targetDay}
|
||||||
|
}
|
||||||
@ -76,70 +76,88 @@ func Run(targetDates []time.Time) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process each day
|
// Process days - use parallel processing for multiple days
|
||||||
processed := 0
|
var processed, skipped int
|
||||||
skipped := 0
|
var processErr error
|
||||||
|
|
||||||
for _, targetDay := range targetDates {
|
if len(targetDates) > 1 {
|
||||||
dayStr := targetDay.Format("2006-01-02")
|
// Prepare jobs for parallel processing
|
||||||
slog.Info("processing day", "date", dayStr)
|
jobs := make([]extractionJob, len(targetDates))
|
||||||
|
for i, targetDay := range targetDates {
|
||||||
// Check if output already exists
|
jobs[i] = extractionJob{
|
||||||
outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01"))
|
targetDay: targetDay,
|
||||||
outputFinal := filepath.Join(outputDir, dayStr+".sql.zst")
|
srcDB: dstDB,
|
||||||
if _, err := os.Stat(outputFinal); err == nil {
|
tmpDir: tmpDir,
|
||||||
slog.Info("output already exists, skipping", "path", outputFinal)
|
}
|
||||||
skipped++
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
processed, skipped, processErr = processParallel(jobs)
|
||||||
|
} else {
|
||||||
|
// Single day - process sequentially (simpler error handling)
|
||||||
|
for _, targetDay := range targetDates {
|
||||||
|
dayStr := targetDay.Format("2006-01-02")
|
||||||
|
slog.Info("processing day", "date", dayStr)
|
||||||
|
|
||||||
// Extract target day into a per-day database
|
// Check if output already exists
|
||||||
extractedDB := filepath.Join(tmpDir, "extracted-"+dayStr+".db")
|
outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01"))
|
||||||
slog.Info("extracting target day", "src", dstDB, "dst", extractedDB)
|
outputFinal := filepath.Join(outputDir, dayStr+".sql.zst")
|
||||||
if err := ExtractDay(dstDB, extractedDB, targetDay); err != nil {
|
if _, err := os.Stat(outputFinal); err == nil {
|
||||||
if errors.Is(err, ErrNoPosts) {
|
slog.Info("output already exists, skipping", "path", outputFinal)
|
||||||
slog.Warn("no posts found, skipping day", "date", dayStr)
|
|
||||||
os.Remove(extractedDB)
|
|
||||||
skipped++
|
skipped++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return fmt.Errorf("extracting day %s: %w", dayStr, err)
|
|
||||||
|
// Extract target day into a per-day database
|
||||||
|
extractedDB := filepath.Join(tmpDir, "extracted-"+dayStr+".db")
|
||||||
|
slog.Info("extracting target day", "src", dstDB, "dst", extractedDB)
|
||||||
|
if err := ExtractDay(dstDB, extractedDB, targetDay); err != nil {
|
||||||
|
if errors.Is(err, ErrNoPosts) {
|
||||||
|
slog.Warn("no posts found, skipping day", "date", dayStr)
|
||||||
|
os.Remove(extractedDB)
|
||||||
|
skipped++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return fmt.Errorf("extracting day %s: %w", dayStr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dump to SQL and compress
|
||||||
|
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
||||||
|
return fmt.Errorf("creating output directory %s: %w", outputDir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp")
|
||||||
|
|
||||||
|
slog.Info("dumping and compressing", "tmp_output", outputTmp)
|
||||||
|
if err := DumpAndCompress(extractedDB, outputTmp); err != nil {
|
||||||
|
os.Remove(outputTmp)
|
||||||
|
return fmt.Errorf("dump and compress for %s: %w", dayStr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("verifying compressed output")
|
||||||
|
if err := VerifyOutput(outputTmp); err != nil {
|
||||||
|
os.Remove(outputTmp)
|
||||||
|
return fmt.Errorf("verification failed for %s: %w", dayStr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Atomic rename to final path
|
||||||
|
slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal)
|
||||||
|
if err := os.Rename(outputTmp, outputFinal); err != nil {
|
||||||
|
return fmt.Errorf("atomic rename for %s: %w", dayStr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := os.Stat(outputFinal)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("stat final output: %w", err)
|
||||||
|
}
|
||||||
|
slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size())
|
||||||
|
|
||||||
|
// Remove extracted DB to reclaim space
|
||||||
|
os.Remove(extractedDB)
|
||||||
|
processed++
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Dump to SQL and compress
|
if processErr != nil {
|
||||||
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
return processErr
|
||||||
return fmt.Errorf("creating output directory %s: %w", outputDir, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp")
|
|
||||||
|
|
||||||
slog.Info("dumping and compressing", "tmp_output", outputTmp)
|
|
||||||
if err := DumpAndCompress(extractedDB, outputTmp); err != nil {
|
|
||||||
os.Remove(outputTmp)
|
|
||||||
return fmt.Errorf("dump and compress for %s: %w", dayStr, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
slog.Info("verifying compressed output")
|
|
||||||
if err := VerifyOutput(outputTmp); err != nil {
|
|
||||||
os.Remove(outputTmp)
|
|
||||||
return fmt.Errorf("verification failed for %s: %w", dayStr, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Atomic rename to final path
|
|
||||||
slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal)
|
|
||||||
if err := os.Rename(outputTmp, outputFinal); err != nil {
|
|
||||||
return fmt.Errorf("atomic rename for %s: %w", dayStr, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
info, err := os.Stat(outputFinal)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("stat final output: %w", err)
|
|
||||||
}
|
|
||||||
slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size())
|
|
||||||
|
|
||||||
// Remove extracted DB to reclaim space
|
|
||||||
os.Remove(extractedDB)
|
|
||||||
processed++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("run summary", "processed", processed, "skipped", skipped, "total", len(targetDates))
|
slog.Info("run summary", "processed", processed, "skipped", skipped, "total", len(targetDates))
|
||||||
|
|||||||
@ -19,7 +19,7 @@ func VerifyOutput(path string) error {
|
|||||||
|
|
||||||
slog.Info("verifying SQL content")
|
slog.Info("verifying SQL content")
|
||||||
catCmd := exec.Command("zstdcat", path)
|
catCmd := exec.Command("zstdcat", path)
|
||||||
headCmd := exec.Command("head", "-20")
|
headCmd := exec.Command("head", fmt.Sprintf("-%d", verificationHeadLines))
|
||||||
|
|
||||||
pipe, err := catCmd.StdoutPipe()
|
pipe, err := catCmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -30,16 +30,23 @@ func VerifyOutput(path string) error {
|
|||||||
var headOut strings.Builder
|
var headOut strings.Builder
|
||||||
headCmd.Stdout = &headOut
|
headCmd.Stdout = &headOut
|
||||||
|
|
||||||
if err := headCmd.Start(); err != nil {
|
|
||||||
return fmt.Errorf("starting head: %w", err)
|
|
||||||
}
|
|
||||||
if err := catCmd.Start(); err != nil {
|
if err := catCmd.Start(); err != nil {
|
||||||
return fmt.Errorf("starting zstdcat: %w", err)
|
return fmt.Errorf("starting zstdcat: %w", err)
|
||||||
}
|
}
|
||||||
|
if err := headCmd.Start(); err != nil {
|
||||||
|
catCmd.Process.Kill() // Clean up if head fails to start
|
||||||
|
return fmt.Errorf("starting head: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// head closes pipe early, zstdcat gets SIGPIPE - expected
|
// Wait for head first (it will exit when it has enough lines)
|
||||||
_ = headCmd.Wait()
|
if err := headCmd.Wait(); err != nil {
|
||||||
_ = catCmd.Wait()
|
catCmd.Process.Kill()
|
||||||
|
return fmt.Errorf("head command failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Kill zstdcat since head closed the pipe (expected SIGPIPE)
|
||||||
|
catCmd.Process.Kill()
|
||||||
|
_ = catCmd.Wait() // Reap the process
|
||||||
|
|
||||||
content := headOut.String()
|
content := headOut.String()
|
||||||
if len(content) == 0 {
|
if len(content) == 0 {
|
||||||
@ -53,9 +60,10 @@ func VerifyOutput(path string) error {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
const verificationSampleBytes = 200
|
||||||
if !hasSQLMarker {
|
if !hasSQLMarker {
|
||||||
return fmt.Errorf("decompressed content does not look like SQL; first 200 bytes: %s",
|
return fmt.Errorf("decompressed content does not look like SQL; first %d bytes: %s",
|
||||||
content[:min(200, len(content))])
|
verificationSampleBytes, content[:min(verificationSampleBytes, len(content))])
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("SQL content verification passed")
|
slog.Info("SQL content verification passed")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user