From 8f524485f704eb8c4bb8812d3a6e71d4913ecac0 Mon Sep 17 00:00:00 2001 From: sneak Date: Thu, 1 Jan 2026 05:42:03 -0800 Subject: [PATCH] Add periodic WAL checkpointing to fix slow queries The WAL file was growing to 700MB+ which caused COUNT(*) queries to timeout. Reads must scan the WAL to find current page versions, and a large WAL makes this slow. Add Checkpoint method to database interface and run PASSIVE checkpoints every 30 seconds via the DBMaintainer. This keeps the WAL small and maintains fast read performance under heavy write load. --- internal/database/database.go | 11 +++++ internal/database/interface.go | 1 + internal/routewatch/app_integration_test.go | 5 ++ internal/routewatch/dbmaintainer.go | 55 ++++++++++++++++++--- 4 files changed, 65 insertions(+), 7 deletions(-) diff --git a/internal/database/database.go b/internal/database/database.go index 7234186..833b98d 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -1986,3 +1986,14 @@ func (d *Database) Analyze(ctx context.Context) error { return nil } + +// Checkpoint runs a WAL checkpoint to transfer data from the WAL to the main database. +// Uses PASSIVE mode which doesn't block writers but may not checkpoint all frames. +func (d *Database) Checkpoint(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "PRAGMA wal_checkpoint(PASSIVE)") + if err != nil { + return fmt.Errorf("failed to checkpoint WAL: %w", err) + } + + return nil +} diff --git a/internal/database/interface.go b/internal/database/interface.go index 2c01173..5cd6050 100644 --- a/internal/database/interface.go +++ b/internal/database/interface.go @@ -87,6 +87,7 @@ type Store interface { // Maintenance operations Vacuum(ctx context.Context) error Analyze(ctx context.Context) error + Checkpoint(ctx context.Context) error } // Ensure Database implements Store diff --git a/internal/routewatch/app_integration_test.go b/internal/routewatch/app_integration_test.go index 96e9eb7..097872e 100644 --- a/internal/routewatch/app_integration_test.go +++ b/internal/routewatch/app_integration_test.go @@ -415,6 +415,11 @@ func (m *mockStore) Analyze(ctx context.Context) error { return nil } +// Checkpoint mock implementation +func (m *mockStore) Checkpoint(ctx context.Context) error { + return nil +} + func TestRouteWatchLiveFeed(t *testing.T) { // Create mock database diff --git a/internal/routewatch/dbmaintainer.go b/internal/routewatch/dbmaintainer.go index 570cfcb..653301f 100644 --- a/internal/routewatch/dbmaintainer.go +++ b/internal/routewatch/dbmaintainer.go @@ -12,6 +12,10 @@ import ( // Database maintenance configuration constants. const ( + // checkpointInterval is how often to run WAL checkpoint. + // Frequent checkpoints keep the WAL small, improving read performance. + checkpointInterval = 30 * time.Second + // vacuumInterval is how often to run incremental vacuum. // Since incremental vacuum only frees ~1000 pages (~4MB) per run, // we run it frequently to keep up with deletions. @@ -20,6 +24,9 @@ const ( // analyzeInterval is how often to run ANALYZE. analyzeInterval = 1 * time.Hour + // checkpointTimeout is the max time for WAL checkpoint. + checkpointTimeout = 10 * time.Second + // vacuumTimeout is the max time for incremental vacuum (should be quick). vacuumTimeout = 30 * time.Second @@ -35,13 +42,16 @@ type DBMaintainer struct { wg sync.WaitGroup // Stats tracking - statsMu sync.Mutex - lastVacuum time.Time - lastAnalyze time.Time - vacuumCount int - analyzeCount int - lastVacuumError error - lastAnalyzeError error + statsMu sync.Mutex + lastCheckpoint time.Time + lastVacuum time.Time + lastAnalyze time.Time + checkpointCount int + vacuumCount int + analyzeCount int + lastCheckpointError error + lastVacuumError error + lastAnalyzeError error } // NewDBMaintainer creates a new database maintainer. @@ -58,6 +68,7 @@ func (m *DBMaintainer) Start() { m.wg.Add(1) go m.run() m.logger.Info("Database maintainer started", + "checkpoint_interval", checkpointInterval, "vacuum_interval", vacuumInterval, "analyze_interval", analyzeInterval, ) @@ -75,8 +86,10 @@ func (m *DBMaintainer) run() { defer m.wg.Done() // Use different timers for each task + checkpointTimer := time.NewTimer(checkpointInterval) vacuumTimer := time.NewTimer(vacuumInterval) analyzeTimer := time.NewTimer(analyzeInterval) + defer checkpointTimer.Stop() defer vacuumTimer.Stop() defer analyzeTimer.Stop() @@ -85,6 +98,10 @@ func (m *DBMaintainer) run() { case <-m.stopCh: return + case <-checkpointTimer.C: + m.runCheckpoint() + checkpointTimer.Reset(checkpointInterval) + case <-vacuumTimer.C: m.runVacuum() vacuumTimer.Reset(vacuumInterval) @@ -96,6 +113,30 @@ func (m *DBMaintainer) run() { } } +// runCheckpoint performs a WAL checkpoint to keep the WAL file small. +func (m *DBMaintainer) runCheckpoint() { + ctx, cancel := context.WithTimeout(context.Background(), checkpointTimeout) + defer cancel() + + startTime := time.Now() + + err := m.db.Checkpoint(ctx) + + m.statsMu.Lock() + m.lastCheckpoint = time.Now() + m.lastCheckpointError = err + if err == nil { + m.checkpointCount++ + } + m.statsMu.Unlock() + + if err != nil { + m.logger.Error("WAL checkpoint failed", "error", err, "duration", time.Since(startTime)) + } else { + m.logger.Debug("WAL checkpoint completed", "duration", time.Since(startTime)) + } +} + // runVacuum performs an incremental vacuum operation on the database. func (m *DBMaintainer) runVacuum() { ctx, cancel := context.WithTimeout(context.Background(), vacuumTimeout)