diff --git a/internal/database/database.go b/internal/database/database.go index 7234186..833b98d 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -1986,3 +1986,14 @@ func (d *Database) Analyze(ctx context.Context) error { return nil } + +// Checkpoint runs a WAL checkpoint to transfer data from the WAL to the main database. +// Uses PASSIVE mode which doesn't block writers but may not checkpoint all frames. +func (d *Database) Checkpoint(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "PRAGMA wal_checkpoint(PASSIVE)") + if err != nil { + return fmt.Errorf("failed to checkpoint WAL: %w", err) + } + + return nil +} diff --git a/internal/database/interface.go b/internal/database/interface.go index 2c01173..5cd6050 100644 --- a/internal/database/interface.go +++ b/internal/database/interface.go @@ -87,6 +87,7 @@ type Store interface { // Maintenance operations Vacuum(ctx context.Context) error Analyze(ctx context.Context) error + Checkpoint(ctx context.Context) error } // Ensure Database implements Store diff --git a/internal/routewatch/app_integration_test.go b/internal/routewatch/app_integration_test.go index 96e9eb7..097872e 100644 --- a/internal/routewatch/app_integration_test.go +++ b/internal/routewatch/app_integration_test.go @@ -415,6 +415,11 @@ func (m *mockStore) Analyze(ctx context.Context) error { return nil } +// Checkpoint mock implementation +func (m *mockStore) Checkpoint(ctx context.Context) error { + return nil +} + func TestRouteWatchLiveFeed(t *testing.T) { // Create mock database diff --git a/internal/routewatch/dbmaintainer.go b/internal/routewatch/dbmaintainer.go index 570cfcb..653301f 100644 --- a/internal/routewatch/dbmaintainer.go +++ b/internal/routewatch/dbmaintainer.go @@ -12,6 +12,10 @@ import ( // Database maintenance configuration constants. const ( + // checkpointInterval is how often to run WAL checkpoint. + // Frequent checkpoints keep the WAL small, improving read performance. + checkpointInterval = 30 * time.Second + // vacuumInterval is how often to run incremental vacuum. // Since incremental vacuum only frees ~1000 pages (~4MB) per run, // we run it frequently to keep up with deletions. @@ -20,6 +24,9 @@ const ( // analyzeInterval is how often to run ANALYZE. analyzeInterval = 1 * time.Hour + // checkpointTimeout is the max time for WAL checkpoint. + checkpointTimeout = 10 * time.Second + // vacuumTimeout is the max time for incremental vacuum (should be quick). vacuumTimeout = 30 * time.Second @@ -35,13 +42,16 @@ type DBMaintainer struct { wg sync.WaitGroup // Stats tracking - statsMu sync.Mutex - lastVacuum time.Time - lastAnalyze time.Time - vacuumCount int - analyzeCount int - lastVacuumError error - lastAnalyzeError error + statsMu sync.Mutex + lastCheckpoint time.Time + lastVacuum time.Time + lastAnalyze time.Time + checkpointCount int + vacuumCount int + analyzeCount int + lastCheckpointError error + lastVacuumError error + lastAnalyzeError error } // NewDBMaintainer creates a new database maintainer. @@ -58,6 +68,7 @@ func (m *DBMaintainer) Start() { m.wg.Add(1) go m.run() m.logger.Info("Database maintainer started", + "checkpoint_interval", checkpointInterval, "vacuum_interval", vacuumInterval, "analyze_interval", analyzeInterval, ) @@ -75,8 +86,10 @@ func (m *DBMaintainer) run() { defer m.wg.Done() // Use different timers for each task + checkpointTimer := time.NewTimer(checkpointInterval) vacuumTimer := time.NewTimer(vacuumInterval) analyzeTimer := time.NewTimer(analyzeInterval) + defer checkpointTimer.Stop() defer vacuumTimer.Stop() defer analyzeTimer.Stop() @@ -85,6 +98,10 @@ func (m *DBMaintainer) run() { case <-m.stopCh: return + case <-checkpointTimer.C: + m.runCheckpoint() + checkpointTimer.Reset(checkpointInterval) + case <-vacuumTimer.C: m.runVacuum() vacuumTimer.Reset(vacuumInterval) @@ -96,6 +113,30 @@ func (m *DBMaintainer) run() { } } +// runCheckpoint performs a WAL checkpoint to keep the WAL file small. +func (m *DBMaintainer) runCheckpoint() { + ctx, cancel := context.WithTimeout(context.Background(), checkpointTimeout) + defer cancel() + + startTime := time.Now() + + err := m.db.Checkpoint(ctx) + + m.statsMu.Lock() + m.lastCheckpoint = time.Now() + m.lastCheckpointError = err + if err == nil { + m.checkpointCount++ + } + m.statsMu.Unlock() + + if err != nil { + m.logger.Error("WAL checkpoint failed", "error", err, "duration", time.Since(startTime)) + } else { + m.logger.Debug("WAL checkpoint completed", "duration", time.Since(startTime)) + } +} + // runVacuum performs an incremental vacuum operation on the database. func (m *DBMaintainer) runVacuum() { ctx, cancel := context.WithTimeout(context.Background(), vacuumTimeout)