Under heavy write load, 30 seconds is too long between checkpoints, causing the WAL to grow and slow down read queries. More aggressive checkpointing keeps the WAL small and maintains read performance.
190 lines
4.7 KiB
Go
190 lines
4.7 KiB
Go
// Package routewatch contains the database maintainer for background maintenance tasks.
|
|
package routewatch
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
|
)
|
|
|
|
// Database maintenance configuration constants.
|
|
const (
|
|
// checkpointInterval is how often to run WAL checkpoint.
|
|
// Frequent checkpoints keep the WAL small, improving read performance.
|
|
// Under heavy write load, we need aggressive checkpointing.
|
|
checkpointInterval = 5 * time.Second
|
|
|
|
// vacuumInterval is how often to run incremental vacuum.
|
|
// Since incremental vacuum only frees ~1000 pages (~4MB) per run,
|
|
// we run it frequently to keep up with deletions.
|
|
vacuumInterval = 10 * time.Minute
|
|
|
|
// analyzeInterval is how often to run ANALYZE.
|
|
analyzeInterval = 1 * time.Hour
|
|
|
|
// checkpointTimeout is the max time for WAL checkpoint.
|
|
checkpointTimeout = 10 * time.Second
|
|
|
|
// vacuumTimeout is the max time for incremental vacuum (should be quick).
|
|
vacuumTimeout = 30 * time.Second
|
|
|
|
// analyzeTimeout is the max time for ANALYZE.
|
|
analyzeTimeout = 5 * time.Minute
|
|
)
|
|
|
|
// DBMaintainer handles background database maintenance tasks.
|
|
type DBMaintainer struct {
|
|
db database.Store
|
|
logger *slog.Logger
|
|
stopCh chan struct{}
|
|
wg sync.WaitGroup
|
|
|
|
// Stats tracking
|
|
statsMu sync.Mutex
|
|
lastCheckpoint time.Time
|
|
lastVacuum time.Time
|
|
lastAnalyze time.Time
|
|
checkpointCount int
|
|
vacuumCount int
|
|
analyzeCount int
|
|
lastCheckpointError error
|
|
lastVacuumError error
|
|
lastAnalyzeError error
|
|
}
|
|
|
|
// NewDBMaintainer creates a new database maintainer.
|
|
func NewDBMaintainer(db database.Store, logger *slog.Logger) *DBMaintainer {
|
|
return &DBMaintainer{
|
|
db: db,
|
|
logger: logger.With("component", "db_maintainer"),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start begins the background maintenance goroutine.
|
|
func (m *DBMaintainer) Start() {
|
|
m.wg.Add(1)
|
|
go m.run()
|
|
m.logger.Info("Database maintainer started",
|
|
"checkpoint_interval", checkpointInterval,
|
|
"vacuum_interval", vacuumInterval,
|
|
"analyze_interval", analyzeInterval,
|
|
)
|
|
}
|
|
|
|
// Stop gracefully shuts down the maintainer.
|
|
func (m *DBMaintainer) Stop() {
|
|
close(m.stopCh)
|
|
m.wg.Wait()
|
|
m.logger.Info("Database maintainer stopped")
|
|
}
|
|
|
|
// run is the main background loop.
|
|
func (m *DBMaintainer) run() {
|
|
defer m.wg.Done()
|
|
|
|
// Use different timers for each task
|
|
checkpointTimer := time.NewTimer(checkpointInterval)
|
|
vacuumTimer := time.NewTimer(vacuumInterval)
|
|
analyzeTimer := time.NewTimer(analyzeInterval)
|
|
defer checkpointTimer.Stop()
|
|
defer vacuumTimer.Stop()
|
|
defer analyzeTimer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-m.stopCh:
|
|
return
|
|
|
|
case <-checkpointTimer.C:
|
|
m.runCheckpoint()
|
|
checkpointTimer.Reset(checkpointInterval)
|
|
|
|
case <-vacuumTimer.C:
|
|
m.runVacuum()
|
|
vacuumTimer.Reset(vacuumInterval)
|
|
|
|
case <-analyzeTimer.C:
|
|
m.runAnalyze()
|
|
analyzeTimer.Reset(analyzeInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runCheckpoint performs a WAL checkpoint to keep the WAL file small.
|
|
func (m *DBMaintainer) runCheckpoint() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), checkpointTimeout)
|
|
defer cancel()
|
|
|
|
startTime := time.Now()
|
|
|
|
err := m.db.Checkpoint(ctx)
|
|
|
|
m.statsMu.Lock()
|
|
m.lastCheckpoint = time.Now()
|
|
m.lastCheckpointError = err
|
|
if err == nil {
|
|
m.checkpointCount++
|
|
}
|
|
m.statsMu.Unlock()
|
|
|
|
if err != nil {
|
|
m.logger.Error("WAL checkpoint failed", "error", err, "duration", time.Since(startTime))
|
|
} else {
|
|
m.logger.Debug("WAL checkpoint completed", "duration", time.Since(startTime))
|
|
}
|
|
}
|
|
|
|
// runVacuum performs an incremental vacuum operation on the database.
|
|
func (m *DBMaintainer) runVacuum() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), vacuumTimeout)
|
|
defer cancel()
|
|
|
|
m.logger.Debug("Running incremental vacuum")
|
|
startTime := time.Now()
|
|
|
|
err := m.db.Vacuum(ctx)
|
|
|
|
m.statsMu.Lock()
|
|
m.lastVacuum = time.Now()
|
|
m.lastVacuumError = err
|
|
if err == nil {
|
|
m.vacuumCount++
|
|
}
|
|
m.statsMu.Unlock()
|
|
|
|
if err != nil {
|
|
m.logger.Error("Incremental vacuum failed", "error", err, "duration", time.Since(startTime))
|
|
} else {
|
|
m.logger.Debug("Incremental vacuum completed", "duration", time.Since(startTime))
|
|
}
|
|
}
|
|
|
|
// runAnalyze performs an ANALYZE operation on the database.
|
|
func (m *DBMaintainer) runAnalyze() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), analyzeTimeout)
|
|
defer cancel()
|
|
|
|
m.logger.Info("Starting database ANALYZE")
|
|
startTime := time.Now()
|
|
|
|
err := m.db.Analyze(ctx)
|
|
|
|
m.statsMu.Lock()
|
|
m.lastAnalyze = time.Now()
|
|
m.lastAnalyzeError = err
|
|
if err == nil {
|
|
m.analyzeCount++
|
|
}
|
|
m.statsMu.Unlock()
|
|
|
|
if err != nil {
|
|
m.logger.Error("ANALYZE failed", "error", err, "duration", time.Since(startTime))
|
|
} else {
|
|
m.logger.Info("ANALYZE completed", "duration", time.Since(startTime))
|
|
}
|
|
}
|