Add periodic WAL checkpointing to fix slow queries
The WAL file was growing to 700MB+ which caused COUNT(*) queries to timeout. Reads must scan the WAL to find current page versions, and a large WAL makes this slow. Add Checkpoint method to database interface and run PASSIVE checkpoints every 30 seconds via the DBMaintainer. This keeps the WAL small and maintains fast read performance under heavy write load.
This commit is contained in:
parent
c6fa2b0fbd
commit
8f524485f7
@ -1986,3 +1986,14 @@ func (d *Database) Analyze(ctx context.Context) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checkpoint runs a WAL checkpoint to transfer data from the WAL to the main database.
|
||||||
|
// Uses PASSIVE mode which doesn't block writers but may not checkpoint all frames.
|
||||||
|
func (d *Database) Checkpoint(ctx context.Context) error {
|
||||||
|
_, err := d.db.ExecContext(ctx, "PRAGMA wal_checkpoint(PASSIVE)")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to checkpoint WAL: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@ -87,6 +87,7 @@ type Store interface {
|
|||||||
// Maintenance operations
|
// Maintenance operations
|
||||||
Vacuum(ctx context.Context) error
|
Vacuum(ctx context.Context) error
|
||||||
Analyze(ctx context.Context) error
|
Analyze(ctx context.Context) error
|
||||||
|
Checkpoint(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure Database implements Store
|
// Ensure Database implements Store
|
||||||
|
|||||||
@ -415,6 +415,11 @@ func (m *mockStore) Analyze(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checkpoint mock implementation
|
||||||
|
func (m *mockStore) Checkpoint(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestRouteWatchLiveFeed(t *testing.T) {
|
func TestRouteWatchLiveFeed(t *testing.T) {
|
||||||
|
|
||||||
// Create mock database
|
// Create mock database
|
||||||
|
|||||||
@ -12,6 +12,10 @@ import (
|
|||||||
|
|
||||||
// Database maintenance configuration constants.
|
// Database maintenance configuration constants.
|
||||||
const (
|
const (
|
||||||
|
// checkpointInterval is how often to run WAL checkpoint.
|
||||||
|
// Frequent checkpoints keep the WAL small, improving read performance.
|
||||||
|
checkpointInterval = 30 * time.Second
|
||||||
|
|
||||||
// vacuumInterval is how often to run incremental vacuum.
|
// vacuumInterval is how often to run incremental vacuum.
|
||||||
// Since incremental vacuum only frees ~1000 pages (~4MB) per run,
|
// Since incremental vacuum only frees ~1000 pages (~4MB) per run,
|
||||||
// we run it frequently to keep up with deletions.
|
// we run it frequently to keep up with deletions.
|
||||||
@ -20,6 +24,9 @@ const (
|
|||||||
// analyzeInterval is how often to run ANALYZE.
|
// analyzeInterval is how often to run ANALYZE.
|
||||||
analyzeInterval = 1 * time.Hour
|
analyzeInterval = 1 * time.Hour
|
||||||
|
|
||||||
|
// checkpointTimeout is the max time for WAL checkpoint.
|
||||||
|
checkpointTimeout = 10 * time.Second
|
||||||
|
|
||||||
// vacuumTimeout is the max time for incremental vacuum (should be quick).
|
// vacuumTimeout is the max time for incremental vacuum (should be quick).
|
||||||
vacuumTimeout = 30 * time.Second
|
vacuumTimeout = 30 * time.Second
|
||||||
|
|
||||||
@ -36,10 +43,13 @@ type DBMaintainer struct {
|
|||||||
|
|
||||||
// Stats tracking
|
// Stats tracking
|
||||||
statsMu sync.Mutex
|
statsMu sync.Mutex
|
||||||
|
lastCheckpoint time.Time
|
||||||
lastVacuum time.Time
|
lastVacuum time.Time
|
||||||
lastAnalyze time.Time
|
lastAnalyze time.Time
|
||||||
|
checkpointCount int
|
||||||
vacuumCount int
|
vacuumCount int
|
||||||
analyzeCount int
|
analyzeCount int
|
||||||
|
lastCheckpointError error
|
||||||
lastVacuumError error
|
lastVacuumError error
|
||||||
lastAnalyzeError error
|
lastAnalyzeError error
|
||||||
}
|
}
|
||||||
@ -58,6 +68,7 @@ func (m *DBMaintainer) Start() {
|
|||||||
m.wg.Add(1)
|
m.wg.Add(1)
|
||||||
go m.run()
|
go m.run()
|
||||||
m.logger.Info("Database maintainer started",
|
m.logger.Info("Database maintainer started",
|
||||||
|
"checkpoint_interval", checkpointInterval,
|
||||||
"vacuum_interval", vacuumInterval,
|
"vacuum_interval", vacuumInterval,
|
||||||
"analyze_interval", analyzeInterval,
|
"analyze_interval", analyzeInterval,
|
||||||
)
|
)
|
||||||
@ -75,8 +86,10 @@ func (m *DBMaintainer) run() {
|
|||||||
defer m.wg.Done()
|
defer m.wg.Done()
|
||||||
|
|
||||||
// Use different timers for each task
|
// Use different timers for each task
|
||||||
|
checkpointTimer := time.NewTimer(checkpointInterval)
|
||||||
vacuumTimer := time.NewTimer(vacuumInterval)
|
vacuumTimer := time.NewTimer(vacuumInterval)
|
||||||
analyzeTimer := time.NewTimer(analyzeInterval)
|
analyzeTimer := time.NewTimer(analyzeInterval)
|
||||||
|
defer checkpointTimer.Stop()
|
||||||
defer vacuumTimer.Stop()
|
defer vacuumTimer.Stop()
|
||||||
defer analyzeTimer.Stop()
|
defer analyzeTimer.Stop()
|
||||||
|
|
||||||
@ -85,6 +98,10 @@ func (m *DBMaintainer) run() {
|
|||||||
case <-m.stopCh:
|
case <-m.stopCh:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
case <-checkpointTimer.C:
|
||||||
|
m.runCheckpoint()
|
||||||
|
checkpointTimer.Reset(checkpointInterval)
|
||||||
|
|
||||||
case <-vacuumTimer.C:
|
case <-vacuumTimer.C:
|
||||||
m.runVacuum()
|
m.runVacuum()
|
||||||
vacuumTimer.Reset(vacuumInterval)
|
vacuumTimer.Reset(vacuumInterval)
|
||||||
@ -96,6 +113,30 @@ func (m *DBMaintainer) run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runCheckpoint performs a WAL checkpoint to keep the WAL file small.
|
||||||
|
func (m *DBMaintainer) runCheckpoint() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), checkpointTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
err := m.db.Checkpoint(ctx)
|
||||||
|
|
||||||
|
m.statsMu.Lock()
|
||||||
|
m.lastCheckpoint = time.Now()
|
||||||
|
m.lastCheckpointError = err
|
||||||
|
if err == nil {
|
||||||
|
m.checkpointCount++
|
||||||
|
}
|
||||||
|
m.statsMu.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Error("WAL checkpoint failed", "error", err, "duration", time.Since(startTime))
|
||||||
|
} else {
|
||||||
|
m.logger.Debug("WAL checkpoint completed", "duration", time.Since(startTime))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// runVacuum performs an incremental vacuum operation on the database.
|
// runVacuum performs an incremental vacuum operation on the database.
|
||||||
func (m *DBMaintainer) runVacuum() {
|
func (m *DBMaintainer) runVacuum() {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), vacuumTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), vacuumTimeout)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user