diff --git a/internal/bsdaily/parallel.go b/internal/bsdaily/parallel.go deleted file mode 100644 index 8c5d2cd..0000000 --- a/internal/bsdaily/parallel.go +++ /dev/null @@ -1,148 +0,0 @@ -package bsdaily - -import ( - "fmt" - "log/slog" - "os" - "path/filepath" - "runtime" - "sync" - "time" -) - -const maxParallelExtractions = 2 // Limited to minimize seek thrashing on spinning disks - -type extractionJob struct { - targetDay time.Time - srcDB string - tmpDir string -} - -type extractionResult struct { - targetDay time.Time - err error - skipped bool -} - -func processParallel(jobs []extractionJob) (processed, skipped int, firstErr error) { - // Use limited parallelism - too many concurrent SQLite operations can be counterproductive - workers := runtime.NumCPU() - if workers > maxParallelExtractions { - workers = maxParallelExtractions - } - - slog.Info("processing days in parallel", "workers", workers, "total_days", len(jobs)) - - jobsChan := make(chan extractionJob, len(jobs)) - resultsChan := make(chan extractionResult, len(jobs)) - - // Start workers - var wg sync.WaitGroup - for i := 0; i < workers; i++ { - wg.Add(1) - go func(workerID int) { - defer wg.Done() - for job := range jobsChan { - result := processSingleDay(job) - resultsChan <- result - } - }(i) - } - - // Send all jobs - for _, job := range jobs { - jobsChan <- job - } - close(jobsChan) - - // Wait for workers to finish - go func() { - wg.Wait() - close(resultsChan) - }() - - // Collect results - for result := range resultsChan { - if result.err != nil { - if firstErr == nil { - firstErr = result.err - } - slog.Error("day processing failed", "date", result.targetDay.Format("2006-01-02"), "error", result.err) - } else if result.skipped { - skipped++ - } else { - processed++ - } - } - - return processed, skipped, firstErr -} - -func processSingleDay(job extractionJob) extractionResult { - targetDay := job.targetDay - dayStr := targetDay.Format("2006-01-02") - - slog.Info("processing day", "date", dayStr) - - // Check if output already exists - outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) - outputFinal := filepath.Join(outputDir, dayStr+".sql.zst") - if _, err := os.Stat(outputFinal); err == nil { - slog.Info("output already exists, skipping", "path", outputFinal) - return extractionResult{targetDay: targetDay, skipped: true} - } - - // Extract target day into a per-day database - extractedDB := filepath.Join(job.tmpDir, "extracted-"+dayStr+".db") - slog.Info("extracting target day", "src", job.srcDB, "dst", extractedDB) - if err := ExtractDay(job.srcDB, extractedDB, targetDay); err != nil { - if err == ErrNoPosts { - slog.Warn("no posts found, skipping day", "date", dayStr) - os.Remove(extractedDB) - return extractionResult{targetDay: targetDay, skipped: true} - } - return extractionResult{targetDay: targetDay, err: fmt.Errorf("extracting day %s: %w", dayStr, err)} - } - - // Dump to SQL and compress - if err := os.MkdirAll(outputDir, 0755); err != nil { - os.Remove(extractedDB) - return extractionResult{targetDay: targetDay, err: fmt.Errorf("creating output directory %s: %w", outputDir, err)} - } - - outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp") - - slog.Info("dumping and compressing", "tmp_output", outputTmp) - if err := DumpAndCompress(extractedDB, outputTmp); err != nil { - os.Remove(outputTmp) - os.Remove(extractedDB) - return extractionResult{targetDay: targetDay, err: fmt.Errorf("dump and compress for %s: %w", dayStr, err)} - } - - slog.Info("verifying compressed output", "date", dayStr) - if err := VerifyOutput(outputTmp); err != nil { - os.Remove(outputTmp) - os.Remove(extractedDB) - return extractionResult{targetDay: targetDay, err: fmt.Errorf("verification failed for %s: %w", dayStr, err)} - } - - // Atomic rename to final path - slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) - if err := os.Rename(outputTmp, outputFinal); err != nil { - os.Remove(outputTmp) - os.Remove(extractedDB) - return extractionResult{targetDay: targetDay, err: fmt.Errorf("atomic rename for %s: %w", dayStr, err)} - } - - info, err := os.Stat(outputFinal) - if err != nil { - os.Remove(extractedDB) - return extractionResult{targetDay: targetDay, err: fmt.Errorf("stat final output: %w", err)} - } - slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size()) - - // Remove extracted DB to reclaim space - os.Remove(extractedDB) - - return extractionResult{targetDay: targetDay} -} \ No newline at end of file diff --git a/internal/bsdaily/run.go b/internal/bsdaily/run.go index 3c216ae..fa27311 100644 --- a/internal/bsdaily/run.go +++ b/internal/bsdaily/run.go @@ -76,88 +76,77 @@ func Run(targetDates []time.Time) error { } } - // Process days - use parallel processing for multiple days - var processed, skipped int - var processErr error + // Process each day completely before moving to the next + // This ensures we don't have multiple SQLite operations competing for the same source database + processed := 0 + skipped := 0 - if len(targetDates) > 1 { - // Prepare jobs for parallel processing - jobs := make([]extractionJob, len(targetDates)) - for i, targetDay := range targetDates { - jobs[i] = extractionJob{ - targetDay: targetDay, - srcDB: dstDB, - tmpDir: tmpDir, - } + for _, targetDay := range targetDates { + dayStr := targetDay.Format("2006-01-02") + slog.Info("processing day", "date", dayStr) + + // Check if output already exists + outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) + outputFinal := filepath.Join(outputDir, dayStr+".sql.zst") + if _, err := os.Stat(outputFinal); err == nil { + slog.Info("output already exists, skipping", "path", outputFinal) + skipped++ + continue } - processed, skipped, processErr = processParallel(jobs) - } else { - // Single day - process sequentially (simpler error handling) - for _, targetDay := range targetDates { - dayStr := targetDay.Format("2006-01-02") - slog.Info("processing day", "date", dayStr) - // Check if output already exists - outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) - outputFinal := filepath.Join(outputDir, dayStr+".sql.zst") - if _, err := os.Stat(outputFinal); err == nil { - slog.Info("output already exists, skipping", "path", outputFinal) + // Extract target day into a per-day database + extractedDB := filepath.Join(tmpDir, "extracted-"+dayStr+".db") + slog.Info("extracting target day", "src", dstDB, "dst", extractedDB) + if err := ExtractDay(dstDB, extractedDB, targetDay); err != nil { + if errors.Is(err, ErrNoPosts) { + slog.Warn("no posts found, skipping day", "date", dayStr) + os.Remove(extractedDB) skipped++ continue } - - // Extract target day into a per-day database - extractedDB := filepath.Join(tmpDir, "extracted-"+dayStr+".db") - slog.Info("extracting target day", "src", dstDB, "dst", extractedDB) - if err := ExtractDay(dstDB, extractedDB, targetDay); err != nil { - if errors.Is(err, ErrNoPosts) { - slog.Warn("no posts found, skipping day", "date", dayStr) - os.Remove(extractedDB) - skipped++ - continue - } - return fmt.Errorf("extracting day %s: %w", dayStr, err) - } - - // Dump to SQL and compress - if err := os.MkdirAll(outputDir, 0755); err != nil { - return fmt.Errorf("creating output directory %s: %w", outputDir, err) - } - - outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp") - - slog.Info("dumping and compressing", "tmp_output", outputTmp) - if err := DumpAndCompress(extractedDB, outputTmp); err != nil { - os.Remove(outputTmp) - return fmt.Errorf("dump and compress for %s: %w", dayStr, err) - } - - slog.Info("verifying compressed output") - if err := VerifyOutput(outputTmp); err != nil { - os.Remove(outputTmp) - return fmt.Errorf("verification failed for %s: %w", dayStr, err) - } - - // Atomic rename to final path - slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) - if err := os.Rename(outputTmp, outputFinal); err != nil { - return fmt.Errorf("atomic rename for %s: %w", dayStr, err) - } - - info, err := os.Stat(outputFinal) - if err != nil { - return fmt.Errorf("stat final output: %w", err) - } - slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size()) - - // Remove extracted DB to reclaim space - os.Remove(extractedDB) - processed++ + return fmt.Errorf("extracting day %s: %w", dayStr, err) } - } - if processErr != nil { - return processErr + // Dump to SQL and compress + if err := os.MkdirAll(outputDir, 0755); err != nil { + os.Remove(extractedDB) + return fmt.Errorf("creating output directory %s: %w", outputDir, err) + } + + outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp") + + slog.Info("dumping and compressing", "tmp_output", outputTmp) + if err := DumpAndCompress(extractedDB, outputTmp); err != nil { + os.Remove(outputTmp) + os.Remove(extractedDB) + return fmt.Errorf("dump and compress for %s: %w", dayStr, err) + } + + slog.Info("verifying compressed output") + if err := VerifyOutput(outputTmp); err != nil { + os.Remove(outputTmp) + os.Remove(extractedDB) + return fmt.Errorf("verification failed for %s: %w", dayStr, err) + } + + // Atomic rename to final path + slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) + if err := os.Rename(outputTmp, outputFinal); err != nil { + os.Remove(outputTmp) + os.Remove(extractedDB) + return fmt.Errorf("atomic rename for %s: %w", dayStr, err) + } + + info, err := os.Stat(outputFinal) + if err != nil { + os.Remove(extractedDB) + return fmt.Errorf("stat final output: %w", err) + } + slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size()) + + // Remove extracted DB to reclaim space immediately + os.Remove(extractedDB) + processed++ } slog.Info("run summary", "processed", processed, "skipped", skipped, "total", len(targetDates))