remove parallel processing to fix SQLite locking issues

- Remove parallel.go entirely
- Process days strictly sequentially to avoid SQLITE_BUSY errors
- Complete each day fully (extract, dump, compress, verify, write) before starting next
- Clean up temp files immediately after each day to save disk space
- SQLite cannot handle multiple concurrent ATTACH operations on same database
This commit is contained in:
Jeffrey Paul 2026-02-12 13:26:55 -08:00
parent 56c3fe7804
commit 8e3a868b7a
2 changed files with 63 additions and 222 deletions

View File

@ -1,148 +0,0 @@
package bsdaily
import (
"fmt"
"log/slog"
"os"
"path/filepath"
"runtime"
"sync"
"time"
)
const maxParallelExtractions = 2 // Limited to minimize seek thrashing on spinning disks
type extractionJob struct {
targetDay time.Time
srcDB string
tmpDir string
}
type extractionResult struct {
targetDay time.Time
err error
skipped bool
}
func processParallel(jobs []extractionJob) (processed, skipped int, firstErr error) {
// Use limited parallelism - too many concurrent SQLite operations can be counterproductive
workers := runtime.NumCPU()
if workers > maxParallelExtractions {
workers = maxParallelExtractions
}
slog.Info("processing days in parallel", "workers", workers, "total_days", len(jobs))
jobsChan := make(chan extractionJob, len(jobs))
resultsChan := make(chan extractionResult, len(jobs))
// Start workers
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobsChan {
result := processSingleDay(job)
resultsChan <- result
}
}(i)
}
// Send all jobs
for _, job := range jobs {
jobsChan <- job
}
close(jobsChan)
// Wait for workers to finish
go func() {
wg.Wait()
close(resultsChan)
}()
// Collect results
for result := range resultsChan {
if result.err != nil {
if firstErr == nil {
firstErr = result.err
}
slog.Error("day processing failed", "date", result.targetDay.Format("2006-01-02"), "error", result.err)
} else if result.skipped {
skipped++
} else {
processed++
}
}
return processed, skipped, firstErr
}
func processSingleDay(job extractionJob) extractionResult {
targetDay := job.targetDay
dayStr := targetDay.Format("2006-01-02")
slog.Info("processing day", "date", dayStr)
// Check if output already exists
outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01"))
outputFinal := filepath.Join(outputDir, dayStr+".sql.zst")
if _, err := os.Stat(outputFinal); err == nil {
slog.Info("output already exists, skipping", "path", outputFinal)
return extractionResult{targetDay: targetDay, skipped: true}
}
// Extract target day into a per-day database
extractedDB := filepath.Join(job.tmpDir, "extracted-"+dayStr+".db")
slog.Info("extracting target day", "src", job.srcDB, "dst", extractedDB)
if err := ExtractDay(job.srcDB, extractedDB, targetDay); err != nil {
if err == ErrNoPosts {
slog.Warn("no posts found, skipping day", "date", dayStr)
os.Remove(extractedDB)
return extractionResult{targetDay: targetDay, skipped: true}
}
return extractionResult{targetDay: targetDay, err: fmt.Errorf("extracting day %s: %w", dayStr, err)}
}
// Dump to SQL and compress
if err := os.MkdirAll(outputDir, 0755); err != nil {
os.Remove(extractedDB)
return extractionResult{targetDay: targetDay, err: fmt.Errorf("creating output directory %s: %w", outputDir, err)}
}
outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp")
slog.Info("dumping and compressing", "tmp_output", outputTmp)
if err := DumpAndCompress(extractedDB, outputTmp); err != nil {
os.Remove(outputTmp)
os.Remove(extractedDB)
return extractionResult{targetDay: targetDay, err: fmt.Errorf("dump and compress for %s: %w", dayStr, err)}
}
slog.Info("verifying compressed output", "date", dayStr)
if err := VerifyOutput(outputTmp); err != nil {
os.Remove(outputTmp)
os.Remove(extractedDB)
return extractionResult{targetDay: targetDay, err: fmt.Errorf("verification failed for %s: %w", dayStr, err)}
}
// Atomic rename to final path
slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal)
if err := os.Rename(outputTmp, outputFinal); err != nil {
os.Remove(outputTmp)
os.Remove(extractedDB)
return extractionResult{targetDay: targetDay, err: fmt.Errorf("atomic rename for %s: %w", dayStr, err)}
}
info, err := os.Stat(outputFinal)
if err != nil {
os.Remove(extractedDB)
return extractionResult{targetDay: targetDay, err: fmt.Errorf("stat final output: %w", err)}
}
slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size())
// Remove extracted DB to reclaim space
os.Remove(extractedDB)
return extractionResult{targetDay: targetDay}
}

View File

@ -76,23 +76,11 @@ func Run(targetDates []time.Time) error {
} }
} }
// Process days - use parallel processing for multiple days // Process each day completely before moving to the next
var processed, skipped int // This ensures we don't have multiple SQLite operations competing for the same source database
var processErr error processed := 0
skipped := 0
if len(targetDates) > 1 {
// Prepare jobs for parallel processing
jobs := make([]extractionJob, len(targetDates))
for i, targetDay := range targetDates {
jobs[i] = extractionJob{
targetDay: targetDay,
srcDB: dstDB,
tmpDir: tmpDir,
}
}
processed, skipped, processErr = processParallel(jobs)
} else {
// Single day - process sequentially (simpler error handling)
for _, targetDay := range targetDates { for _, targetDay := range targetDates {
dayStr := targetDay.Format("2006-01-02") dayStr := targetDay.Format("2006-01-02")
slog.Info("processing day", "date", dayStr) slog.Info("processing day", "date", dayStr)
@ -121,6 +109,7 @@ func Run(targetDates []time.Time) error {
// Dump to SQL and compress // Dump to SQL and compress
if err := os.MkdirAll(outputDir, 0755); err != nil { if err := os.MkdirAll(outputDir, 0755); err != nil {
os.Remove(extractedDB)
return fmt.Errorf("creating output directory %s: %w", outputDir, err) return fmt.Errorf("creating output directory %s: %w", outputDir, err)
} }
@ -129,36 +118,36 @@ func Run(targetDates []time.Time) error {
slog.Info("dumping and compressing", "tmp_output", outputTmp) slog.Info("dumping and compressing", "tmp_output", outputTmp)
if err := DumpAndCompress(extractedDB, outputTmp); err != nil { if err := DumpAndCompress(extractedDB, outputTmp); err != nil {
os.Remove(outputTmp) os.Remove(outputTmp)
os.Remove(extractedDB)
return fmt.Errorf("dump and compress for %s: %w", dayStr, err) return fmt.Errorf("dump and compress for %s: %w", dayStr, err)
} }
slog.Info("verifying compressed output") slog.Info("verifying compressed output")
if err := VerifyOutput(outputTmp); err != nil { if err := VerifyOutput(outputTmp); err != nil {
os.Remove(outputTmp) os.Remove(outputTmp)
os.Remove(extractedDB)
return fmt.Errorf("verification failed for %s: %w", dayStr, err) return fmt.Errorf("verification failed for %s: %w", dayStr, err)
} }
// Atomic rename to final path // Atomic rename to final path
slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal)
if err := os.Rename(outputTmp, outputFinal); err != nil { if err := os.Rename(outputTmp, outputFinal); err != nil {
os.Remove(outputTmp)
os.Remove(extractedDB)
return fmt.Errorf("atomic rename for %s: %w", dayStr, err) return fmt.Errorf("atomic rename for %s: %w", dayStr, err)
} }
info, err := os.Stat(outputFinal) info, err := os.Stat(outputFinal)
if err != nil { if err != nil {
os.Remove(extractedDB)
return fmt.Errorf("stat final output: %w", err) return fmt.Errorf("stat final output: %w", err)
} }
slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size()) slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size())
// Remove extracted DB to reclaim space // Remove extracted DB to reclaim space immediately
os.Remove(extractedDB) os.Remove(extractedDB)
processed++ processed++
} }
}
if processErr != nil {
return processErr
}
slog.Info("run summary", "processed", processed, "skipped", skipped, "total", len(targetDates)) slog.Info("run summary", "processed", processed, "skipped", skipped, "total", len(targetDates))