From cf79e008b51116745377c2e903036a37d48e9a06 Mon Sep 17 00:00:00 2001 From: sneak Date: Thu, 12 Feb 2026 13:01:52 -0800 Subject: [PATCH] speed optimizations --- internal/bsdaily/config.go | 17 +++- internal/bsdaily/copy.go | 6 +- internal/bsdaily/disk.go | 4 +- internal/bsdaily/dump.go | 5 +- internal/bsdaily/extract.go | 44 +++++++++-- internal/bsdaily/parallel.go | 148 +++++++++++++++++++++++++++++++++++ internal/bsdaily/run.go | 130 +++++++++++++++++------------- internal/bsdaily/verify.go | 26 +++--- 8 files changed, 301 insertions(+), 79 deletions(-) create mode 100644 internal/bsdaily/parallel.go diff --git a/internal/bsdaily/config.go b/internal/bsdaily/config.go index df532ce..34da70e 100644 --- a/internal/bsdaily/config.go +++ b/internal/bsdaily/config.go @@ -10,8 +10,21 @@ const ( WALFilename = "firehose.db-wal" SHMFilename = "firehose.db-shm" - MinTmpFreeBytes = 500 * 1024 * 1024 * 1024 // 500 GB - MinDailiesFreeBytes = 20 * 1024 * 1024 * 1024 // 20 GB + // Disk space requirements + bytesPerGB = 1024 * 1024 * 1024 + MinTmpFreeGB = 500 + MinDailiesFreeGB = 20 + MinTmpFreeBytes = MinTmpFreeGB * bytesPerGB + MinDailiesFreeBytes = MinDailiesFreeGB * bytesPerGB + + // SQLite cache size in KB (200MB) + sqliteCacheSizeKB = 200000 + + // Compression level for zstd + zstdCompressionLevel = 15 + + // Verification sample lines + verificationHeadLines = 20 ) var snapshotPattern = regexp.MustCompile(`^zfs-auto-snap_daily-(\d{4}-\d{2}-\d{2})-\d{4}$`) diff --git a/internal/bsdaily/copy.go b/internal/bsdaily/copy.go index 2ae1b45..471332c 100644 --- a/internal/bsdaily/copy.go +++ b/internal/bsdaily/copy.go @@ -7,6 +7,8 @@ import ( "os" ) +const copyBufferSize = 4 * 1024 * 1024 // 4MB buffer for large file copies + func CopyFile(src, dst string) (err error) { slog.Info("copying file", "src", src, "dst", dst) @@ -31,7 +33,9 @@ func CopyFile(src, dst string) (err error) { } }() - written, err := io.Copy(dstFile, srcFile) + // Use a larger buffer for better performance with large database files + buf := make([]byte, copyBufferSize) + written, err := io.CopyBuffer(dstFile, srcFile, buf) if err != nil { return fmt.Errorf("copying data: %w", err) } diff --git a/internal/bsdaily/disk.go b/internal/bsdaily/disk.go index 51245db..60cd32e 100644 --- a/internal/bsdaily/disk.go +++ b/internal/bsdaily/disk.go @@ -13,8 +13,8 @@ func CheckFreeSpace(path string, minBytes uint64, label string) error { return fmt.Errorf("statfs %s (%s): %w", path, label, err) } free := uint64(stat.Bavail) * uint64(stat.Bsize) - freeGB := float64(free) / (1024 * 1024 * 1024) - minGB := float64(minBytes) / (1024 * 1024 * 1024) + freeGB := float64(free) / float64(bytesPerGB) + minGB := float64(minBytes) / float64(bytesPerGB) slog.Info("disk space check", "label", label, "path", path, "free_gb", fmt.Sprintf("%.1f", freeGB), "required_gb", fmt.Sprintf("%.1f", minGB)) diff --git a/internal/bsdaily/dump.go b/internal/bsdaily/dump.go index 549070d..9a835a7 100644 --- a/internal/bsdaily/dump.go +++ b/internal/bsdaily/dump.go @@ -31,7 +31,7 @@ func DumpAndCompress(dbPath, outputPath string) (err error) { }() dumpCmd := exec.Command("sqlite3", dbPath, ".dump") - zstdCmd := exec.Command("zstdmt", "-15") + zstdCmd := exec.Command("zstdmt", fmt.Sprintf("-%d", zstdCompressionLevel)) pipe, err := dumpCmd.StdoutPipe() if err != nil { @@ -68,8 +68,9 @@ func DumpAndCompress(dbPath, outputPath string) (err error) { if err != nil { return fmt.Errorf("stat output: %w", err) } + const bytesPerMB = 1024 * 1024 slog.Info("compressed output written", "path", outputPath, - "size_bytes", info.Size(), "size_mb", info.Size()/(1024*1024)) + "size_bytes", info.Size(), "size_mb", info.Size()/bytesPerMB) if info.Size() == 0 { return fmt.Errorf("compressed output is empty") diff --git a/internal/bsdaily/extract.go b/internal/bsdaily/extract.go index 6444b22..6947f17 100644 --- a/internal/bsdaily/extract.go +++ b/internal/bsdaily/extract.go @@ -22,7 +22,9 @@ func ExtractDay(srcDBPath, dstDBPath string, targetDay time.Time) error { slog.Info("extracting day", "from", dayStart, "until", dayEnd) - db, err := sql.Open("sqlite", dstDBPath+"?_pragma=journal_mode(MEMORY)&_pragma=synchronous(OFF)&_pragma=cache_size(-200000)&_pragma=foreign_keys(OFF)") + // Maximum performance pragmas - we don't care about crash safety for temp files + pragmas := fmt.Sprintf("?_pragma=journal_mode(OFF)&_pragma=synchronous(OFF)&_pragma=cache_size(%d)&_pragma=foreign_keys(OFF)&_pragma=locking_mode(EXCLUSIVE)&_pragma=temp_store(MEMORY)", sqliteCacheSizeKB) + db, err := sql.Open("sqlite", dstDBPath+pragmas) if err != nil { return fmt.Errorf("opening destination database: %w", err) } @@ -59,9 +61,20 @@ func ExtractDay(srcDBPath, dstDBPath string, targetDay time.Time) error { } } + // Begin transaction for bulk inserts + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("beginning transaction: %w", err) + } + defer func() { + if err != nil { + tx.Rollback() + } + }() + // Insert target day's data slog.Info("inserting posts for target day") - result, err := db.Exec("INSERT INTO posts SELECT * FROM src.posts WHERE timestamp >= ? AND timestamp < ?", dayStart, dayEnd) + result, err := tx.Exec("INSERT INTO posts SELECT * FROM src.posts WHERE timestamp >= ? AND timestamp < ?", dayStart, dayEnd) if err != nil { return fmt.Errorf("inserting posts: %w", err) } @@ -74,26 +87,43 @@ func ExtractDay(srcDBPath, dstDBPath string, targetDay time.Time) error { } slog.Info("inserting junction and lookup tables") - if _, err := db.Exec("INSERT INTO posts_hashtags SELECT * FROM src.posts_hashtags WHERE post_id IN (SELECT id FROM posts)"); err != nil { + if _, err := tx.Exec("INSERT INTO posts_hashtags SELECT * FROM src.posts_hashtags WHERE post_id IN (SELECT id FROM posts)"); err != nil { return fmt.Errorf("inserting posts_hashtags: %w", err) } - if _, err := db.Exec("INSERT INTO posts_urls SELECT * FROM src.posts_urls WHERE post_id IN (SELECT id FROM posts)"); err != nil { + if _, err := tx.Exec("INSERT INTO posts_urls SELECT * FROM src.posts_urls WHERE post_id IN (SELECT id FROM posts)"); err != nil { return fmt.Errorf("inserting posts_urls: %w", err) } - if _, err := db.Exec("INSERT INTO hashtags SELECT * FROM src.hashtags WHERE id IN (SELECT hashtag_id FROM posts_hashtags)"); err != nil { + if _, err := tx.Exec("INSERT INTO hashtags SELECT * FROM src.hashtags WHERE id IN (SELECT hashtag_id FROM posts_hashtags)"); err != nil { return fmt.Errorf("inserting hashtags: %w", err) } - if _, err := db.Exec("INSERT INTO urls SELECT * FROM src.urls WHERE id IN (SELECT url_id FROM posts_urls)"); err != nil { + if _, err := tx.Exec("INSERT INTO urls SELECT * FROM src.urls WHERE id IN (SELECT url_id FROM posts_urls)"); err != nil { return fmt.Errorf("inserting urls: %w", err) } - if _, err := db.Exec("INSERT INTO users SELECT * FROM src.users WHERE did IN (SELECT user_did FROM posts)"); err != nil { + if _, err := tx.Exec("INSERT INTO users SELECT * FROM src.users WHERE did IN (SELECT user_did FROM posts)"); err != nil { return fmt.Errorf("inserting users: %w", err) } + // Check if media table exists in source and copy if present + var mediaTableExists int + if err := db.QueryRow("SELECT COUNT(*) FROM src.sqlite_master WHERE type='table' AND name='media'").Scan(&mediaTableExists); err != nil { + slog.Warn("checking for media table", "error", err) + } else if mediaTableExists > 0 { + slog.Info("inserting media entries") + // Get post blob_cids for this day's posts + if _, err := tx.Exec("INSERT INTO media SELECT * FROM src.media WHERE content_hash IN (SELECT blob_cids FROM posts WHERE blob_cids IS NOT NULL)"); err != nil { + slog.Warn("inserting media (may not have matching entries)", "error", err) + } + } + + // Commit the transaction + if err := tx.Commit(); err != nil { + return fmt.Errorf("committing transaction: %w", err) + } + // Create indexes after bulk insert for speed slog.Info("creating indexes") idxRows, err := db.Query("SELECT sql FROM src.sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%' AND sql IS NOT NULL ORDER BY name") diff --git a/internal/bsdaily/parallel.go b/internal/bsdaily/parallel.go new file mode 100644 index 0000000..8c5d2cd --- /dev/null +++ b/internal/bsdaily/parallel.go @@ -0,0 +1,148 @@ +package bsdaily + +import ( + "fmt" + "log/slog" + "os" + "path/filepath" + "runtime" + "sync" + "time" +) + +const maxParallelExtractions = 2 // Limited to minimize seek thrashing on spinning disks + +type extractionJob struct { + targetDay time.Time + srcDB string + tmpDir string +} + +type extractionResult struct { + targetDay time.Time + err error + skipped bool +} + +func processParallel(jobs []extractionJob) (processed, skipped int, firstErr error) { + // Use limited parallelism - too many concurrent SQLite operations can be counterproductive + workers := runtime.NumCPU() + if workers > maxParallelExtractions { + workers = maxParallelExtractions + } + + slog.Info("processing days in parallel", "workers", workers, "total_days", len(jobs)) + + jobsChan := make(chan extractionJob, len(jobs)) + resultsChan := make(chan extractionResult, len(jobs)) + + // Start workers + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for job := range jobsChan { + result := processSingleDay(job) + resultsChan <- result + } + }(i) + } + + // Send all jobs + for _, job := range jobs { + jobsChan <- job + } + close(jobsChan) + + // Wait for workers to finish + go func() { + wg.Wait() + close(resultsChan) + }() + + // Collect results + for result := range resultsChan { + if result.err != nil { + if firstErr == nil { + firstErr = result.err + } + slog.Error("day processing failed", "date", result.targetDay.Format("2006-01-02"), "error", result.err) + } else if result.skipped { + skipped++ + } else { + processed++ + } + } + + return processed, skipped, firstErr +} + +func processSingleDay(job extractionJob) extractionResult { + targetDay := job.targetDay + dayStr := targetDay.Format("2006-01-02") + + slog.Info("processing day", "date", dayStr) + + // Check if output already exists + outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) + outputFinal := filepath.Join(outputDir, dayStr+".sql.zst") + if _, err := os.Stat(outputFinal); err == nil { + slog.Info("output already exists, skipping", "path", outputFinal) + return extractionResult{targetDay: targetDay, skipped: true} + } + + // Extract target day into a per-day database + extractedDB := filepath.Join(job.tmpDir, "extracted-"+dayStr+".db") + slog.Info("extracting target day", "src", job.srcDB, "dst", extractedDB) + if err := ExtractDay(job.srcDB, extractedDB, targetDay); err != nil { + if err == ErrNoPosts { + slog.Warn("no posts found, skipping day", "date", dayStr) + os.Remove(extractedDB) + return extractionResult{targetDay: targetDay, skipped: true} + } + return extractionResult{targetDay: targetDay, err: fmt.Errorf("extracting day %s: %w", dayStr, err)} + } + + // Dump to SQL and compress + if err := os.MkdirAll(outputDir, 0755); err != nil { + os.Remove(extractedDB) + return extractionResult{targetDay: targetDay, err: fmt.Errorf("creating output directory %s: %w", outputDir, err)} + } + + outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp") + + slog.Info("dumping and compressing", "tmp_output", outputTmp) + if err := DumpAndCompress(extractedDB, outputTmp); err != nil { + os.Remove(outputTmp) + os.Remove(extractedDB) + return extractionResult{targetDay: targetDay, err: fmt.Errorf("dump and compress for %s: %w", dayStr, err)} + } + + slog.Info("verifying compressed output", "date", dayStr) + if err := VerifyOutput(outputTmp); err != nil { + os.Remove(outputTmp) + os.Remove(extractedDB) + return extractionResult{targetDay: targetDay, err: fmt.Errorf("verification failed for %s: %w", dayStr, err)} + } + + // Atomic rename to final path + slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) + if err := os.Rename(outputTmp, outputFinal); err != nil { + os.Remove(outputTmp) + os.Remove(extractedDB) + return extractionResult{targetDay: targetDay, err: fmt.Errorf("atomic rename for %s: %w", dayStr, err)} + } + + info, err := os.Stat(outputFinal) + if err != nil { + os.Remove(extractedDB) + return extractionResult{targetDay: targetDay, err: fmt.Errorf("stat final output: %w", err)} + } + slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size()) + + // Remove extracted DB to reclaim space + os.Remove(extractedDB) + + return extractionResult{targetDay: targetDay} +} \ No newline at end of file diff --git a/internal/bsdaily/run.go b/internal/bsdaily/run.go index ad9d9ec..3c216ae 100644 --- a/internal/bsdaily/run.go +++ b/internal/bsdaily/run.go @@ -76,70 +76,88 @@ func Run(targetDates []time.Time) error { } } - // Process each day - processed := 0 - skipped := 0 + // Process days - use parallel processing for multiple days + var processed, skipped int + var processErr error - for _, targetDay := range targetDates { - dayStr := targetDay.Format("2006-01-02") - slog.Info("processing day", "date", dayStr) - - // Check if output already exists - outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) - outputFinal := filepath.Join(outputDir, dayStr+".sql.zst") - if _, err := os.Stat(outputFinal); err == nil { - slog.Info("output already exists, skipping", "path", outputFinal) - skipped++ - continue + if len(targetDates) > 1 { + // Prepare jobs for parallel processing + jobs := make([]extractionJob, len(targetDates)) + for i, targetDay := range targetDates { + jobs[i] = extractionJob{ + targetDay: targetDay, + srcDB: dstDB, + tmpDir: tmpDir, + } } + processed, skipped, processErr = processParallel(jobs) + } else { + // Single day - process sequentially (simpler error handling) + for _, targetDay := range targetDates { + dayStr := targetDay.Format("2006-01-02") + slog.Info("processing day", "date", dayStr) - // Extract target day into a per-day database - extractedDB := filepath.Join(tmpDir, "extracted-"+dayStr+".db") - slog.Info("extracting target day", "src", dstDB, "dst", extractedDB) - if err := ExtractDay(dstDB, extractedDB, targetDay); err != nil { - if errors.Is(err, ErrNoPosts) { - slog.Warn("no posts found, skipping day", "date", dayStr) - os.Remove(extractedDB) + // Check if output already exists + outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) + outputFinal := filepath.Join(outputDir, dayStr+".sql.zst") + if _, err := os.Stat(outputFinal); err == nil { + slog.Info("output already exists, skipping", "path", outputFinal) skipped++ continue } - return fmt.Errorf("extracting day %s: %w", dayStr, err) + + // Extract target day into a per-day database + extractedDB := filepath.Join(tmpDir, "extracted-"+dayStr+".db") + slog.Info("extracting target day", "src", dstDB, "dst", extractedDB) + if err := ExtractDay(dstDB, extractedDB, targetDay); err != nil { + if errors.Is(err, ErrNoPosts) { + slog.Warn("no posts found, skipping day", "date", dayStr) + os.Remove(extractedDB) + skipped++ + continue + } + return fmt.Errorf("extracting day %s: %w", dayStr, err) + } + + // Dump to SQL and compress + if err := os.MkdirAll(outputDir, 0755); err != nil { + return fmt.Errorf("creating output directory %s: %w", outputDir, err) + } + + outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp") + + slog.Info("dumping and compressing", "tmp_output", outputTmp) + if err := DumpAndCompress(extractedDB, outputTmp); err != nil { + os.Remove(outputTmp) + return fmt.Errorf("dump and compress for %s: %w", dayStr, err) + } + + slog.Info("verifying compressed output") + if err := VerifyOutput(outputTmp); err != nil { + os.Remove(outputTmp) + return fmt.Errorf("verification failed for %s: %w", dayStr, err) + } + + // Atomic rename to final path + slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) + if err := os.Rename(outputTmp, outputFinal); err != nil { + return fmt.Errorf("atomic rename for %s: %w", dayStr, err) + } + + info, err := os.Stat(outputFinal) + if err != nil { + return fmt.Errorf("stat final output: %w", err) + } + slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size()) + + // Remove extracted DB to reclaim space + os.Remove(extractedDB) + processed++ } + } - // Dump to SQL and compress - if err := os.MkdirAll(outputDir, 0755); err != nil { - return fmt.Errorf("creating output directory %s: %w", outputDir, err) - } - - outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp") - - slog.Info("dumping and compressing", "tmp_output", outputTmp) - if err := DumpAndCompress(extractedDB, outputTmp); err != nil { - os.Remove(outputTmp) - return fmt.Errorf("dump and compress for %s: %w", dayStr, err) - } - - slog.Info("verifying compressed output") - if err := VerifyOutput(outputTmp); err != nil { - os.Remove(outputTmp) - return fmt.Errorf("verification failed for %s: %w", dayStr, err) - } - - // Atomic rename to final path - slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) - if err := os.Rename(outputTmp, outputFinal); err != nil { - return fmt.Errorf("atomic rename for %s: %w", dayStr, err) - } - - info, err := os.Stat(outputFinal) - if err != nil { - return fmt.Errorf("stat final output: %w", err) - } - slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size()) - - // Remove extracted DB to reclaim space - os.Remove(extractedDB) - processed++ + if processErr != nil { + return processErr } slog.Info("run summary", "processed", processed, "skipped", skipped, "total", len(targetDates)) diff --git a/internal/bsdaily/verify.go b/internal/bsdaily/verify.go index bcac24b..2e60558 100644 --- a/internal/bsdaily/verify.go +++ b/internal/bsdaily/verify.go @@ -19,7 +19,7 @@ func VerifyOutput(path string) error { slog.Info("verifying SQL content") catCmd := exec.Command("zstdcat", path) - headCmd := exec.Command("head", "-20") + headCmd := exec.Command("head", fmt.Sprintf("-%d", verificationHeadLines)) pipe, err := catCmd.StdoutPipe() if err != nil { @@ -30,16 +30,23 @@ func VerifyOutput(path string) error { var headOut strings.Builder headCmd.Stdout = &headOut - if err := headCmd.Start(); err != nil { - return fmt.Errorf("starting head: %w", err) - } if err := catCmd.Start(); err != nil { return fmt.Errorf("starting zstdcat: %w", err) } + if err := headCmd.Start(); err != nil { + catCmd.Process.Kill() // Clean up if head fails to start + return fmt.Errorf("starting head: %w", err) + } - // head closes pipe early, zstdcat gets SIGPIPE - expected - _ = headCmd.Wait() - _ = catCmd.Wait() + // Wait for head first (it will exit when it has enough lines) + if err := headCmd.Wait(); err != nil { + catCmd.Process.Kill() + return fmt.Errorf("head command failed: %w", err) + } + + // Kill zstdcat since head closed the pipe (expected SIGPIPE) + catCmd.Process.Kill() + _ = catCmd.Wait() // Reap the process content := headOut.String() if len(content) == 0 { @@ -53,9 +60,10 @@ func VerifyOutput(path string) error { break } } + const verificationSampleBytes = 200 if !hasSQLMarker { - return fmt.Errorf("decompressed content does not look like SQL; first 200 bytes: %s", - content[:min(200, len(content))]) + return fmt.Errorf("decompressed content does not look like SQL; first %d bytes: %s", + verificationSampleBytes, content[:min(verificationSampleBytes, len(content))]) } slog.Info("SQL content verification passed")