Add production hardening: health check, streamer panic fix, db maintenance
- Add health check endpoint at /.well-known/healthcheck.json that verifies database and RIS Live connectivity, returns 200/503 - Fix panic in streamer when encountering unknown RIS message types by logging a warning and continuing instead of crashing - Add DBMaintainer for periodic database maintenance: - VACUUM every 6 hours to reclaim space - ANALYZE every hour to update query statistics - Graceful shutdown support - Add Vacuum() and Analyze() methods to database interface
This commit is contained in:
parent
d2041a5a55
commit
da6d605e4d
@ -1950,3 +1950,23 @@ func (d *Database) getIPv6Info(ctx context.Context, ip string, parsedIP net.IP)
|
|||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Vacuum runs the SQLite VACUUM command to reclaim unused space and defragment the database.
|
||||||
|
func (d *Database) Vacuum(ctx context.Context) error {
|
||||||
|
_, err := d.db.ExecContext(ctx, "VACUUM")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to vacuum database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Analyze runs the SQLite ANALYZE command to update query planner statistics.
|
||||||
|
func (d *Database) Analyze(ctx context.Context) error {
|
||||||
|
_, err := d.db.ExecContext(ctx, "ANALYZE")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to analyze database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@ -81,6 +81,10 @@ type Store interface {
|
|||||||
|
|
||||||
// Lifecycle
|
// Lifecycle
|
||||||
Close() error
|
Close() error
|
||||||
|
|
||||||
|
// Maintenance operations
|
||||||
|
Vacuum(ctx context.Context) error
|
||||||
|
Analyze(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure Database implements Store
|
// Ensure Database implements Store
|
||||||
|
|||||||
@ -44,6 +44,7 @@ type RouteWatch struct {
|
|||||||
prefixHandler *PrefixHandler
|
prefixHandler *PrefixHandler
|
||||||
peeringHandler *PeeringHandler
|
peeringHandler *PeeringHandler
|
||||||
asnFetcher *ASNFetcher
|
asnFetcher *ASNFetcher
|
||||||
|
dbMaintainer *DBMaintainer
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new RouteWatch instance
|
// New creates a new RouteWatch instance
|
||||||
@ -115,6 +116,10 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
|
|||||||
rw.asnFetcher.Start()
|
rw.asnFetcher.Start()
|
||||||
rw.server.SetASNFetcher(rw.asnFetcher)
|
rw.server.SetASNFetcher(rw.asnFetcher)
|
||||||
|
|
||||||
|
// Start database maintenance goroutine
|
||||||
|
rw.dbMaintainer = NewDBMaintainer(rw.db, rw.logger.Logger)
|
||||||
|
rw.dbMaintainer.Start()
|
||||||
|
|
||||||
// Wait for context cancellation
|
// Wait for context cancellation
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
|
||||||
@ -155,6 +160,11 @@ func (rw *RouteWatch) Shutdown() {
|
|||||||
rw.asnFetcher.Stop()
|
rw.asnFetcher.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop database maintainer
|
||||||
|
if rw.dbMaintainer != nil {
|
||||||
|
rw.dbMaintainer.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
// Stop services
|
// Stop services
|
||||||
rw.streamer.Stop()
|
rw.streamer.Stop()
|
||||||
|
|
||||||
|
|||||||
@ -405,6 +405,16 @@ func (m *mockStore) UpdatePeerBatch(peers map[string]database.PeerUpdate) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Vacuum mock implementation
|
||||||
|
func (m *mockStore) Vacuum(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Analyze mock implementation
|
||||||
|
func (m *mockStore) Analyze(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestRouteWatchLiveFeed(t *testing.T) {
|
func TestRouteWatchLiveFeed(t *testing.T) {
|
||||||
|
|
||||||
// Create mock database
|
// Create mock database
|
||||||
|
|||||||
142
internal/routewatch/dbmaintainer.go
Normal file
142
internal/routewatch/dbmaintainer.go
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
// Package routewatch contains the database maintainer for background maintenance tasks.
|
||||||
|
package routewatch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Database maintenance configuration constants.
|
||||||
|
const (
|
||||||
|
// vacuumInterval is how often to run VACUUM.
|
||||||
|
vacuumInterval = 6 * time.Hour
|
||||||
|
|
||||||
|
// analyzeInterval is how often to run ANALYZE.
|
||||||
|
analyzeInterval = 1 * time.Hour
|
||||||
|
|
||||||
|
// maintenanceTimeout is the max time for a maintenance operation.
|
||||||
|
maintenanceTimeout = 5 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
// DBMaintainer handles background database maintenance tasks.
|
||||||
|
type DBMaintainer struct {
|
||||||
|
db database.Store
|
||||||
|
logger *slog.Logger
|
||||||
|
stopCh chan struct{}
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Stats tracking
|
||||||
|
statsMu sync.Mutex
|
||||||
|
lastVacuum time.Time
|
||||||
|
lastAnalyze time.Time
|
||||||
|
vacuumCount int
|
||||||
|
analyzeCount int
|
||||||
|
lastVacuumError error
|
||||||
|
lastAnalyzeError error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDBMaintainer creates a new database maintainer.
|
||||||
|
func NewDBMaintainer(db database.Store, logger *slog.Logger) *DBMaintainer {
|
||||||
|
return &DBMaintainer{
|
||||||
|
db: db,
|
||||||
|
logger: logger.With("component", "db_maintainer"),
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins the background maintenance goroutine.
|
||||||
|
func (m *DBMaintainer) Start() {
|
||||||
|
m.wg.Add(1)
|
||||||
|
go m.run()
|
||||||
|
m.logger.Info("Database maintainer started",
|
||||||
|
"vacuum_interval", vacuumInterval,
|
||||||
|
"analyze_interval", analyzeInterval,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop gracefully shuts down the maintainer.
|
||||||
|
func (m *DBMaintainer) Stop() {
|
||||||
|
close(m.stopCh)
|
||||||
|
m.wg.Wait()
|
||||||
|
m.logger.Info("Database maintainer stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is the main background loop.
|
||||||
|
func (m *DBMaintainer) run() {
|
||||||
|
defer m.wg.Done()
|
||||||
|
|
||||||
|
// Use different timers for each task
|
||||||
|
vacuumTimer := time.NewTimer(vacuumInterval)
|
||||||
|
analyzeTimer := time.NewTimer(analyzeInterval)
|
||||||
|
defer vacuumTimer.Stop()
|
||||||
|
defer analyzeTimer.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-m.stopCh:
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-vacuumTimer.C:
|
||||||
|
m.runVacuum()
|
||||||
|
vacuumTimer.Reset(vacuumInterval)
|
||||||
|
|
||||||
|
case <-analyzeTimer.C:
|
||||||
|
m.runAnalyze()
|
||||||
|
analyzeTimer.Reset(analyzeInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// runVacuum performs a VACUUM operation on the database.
|
||||||
|
func (m *DBMaintainer) runVacuum() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), maintenanceTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
m.logger.Info("Starting database VACUUM")
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
err := m.db.Vacuum(ctx)
|
||||||
|
|
||||||
|
m.statsMu.Lock()
|
||||||
|
m.lastVacuum = time.Now()
|
||||||
|
m.lastVacuumError = err
|
||||||
|
if err == nil {
|
||||||
|
m.vacuumCount++
|
||||||
|
}
|
||||||
|
m.statsMu.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Error("VACUUM failed", "error", err, "duration", time.Since(startTime))
|
||||||
|
} else {
|
||||||
|
m.logger.Info("VACUUM completed", "duration", time.Since(startTime))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// runAnalyze performs an ANALYZE operation on the database.
|
||||||
|
func (m *DBMaintainer) runAnalyze() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), maintenanceTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
m.logger.Info("Starting database ANALYZE")
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
err := m.db.Analyze(ctx)
|
||||||
|
|
||||||
|
m.statsMu.Lock()
|
||||||
|
m.lastAnalyze = time.Now()
|
||||||
|
m.lastAnalyzeError = err
|
||||||
|
if err == nil {
|
||||||
|
m.analyzeCount++
|
||||||
|
}
|
||||||
|
m.statsMu.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Error("ANALYZE failed", "error", err, "duration", time.Since(startTime))
|
||||||
|
} else {
|
||||||
|
m.logger.Info("ANALYZE completed", "duration", time.Since(startTime))
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -24,8 +24,70 @@ import (
|
|||||||
const (
|
const (
|
||||||
// statsContextTimeout is the timeout for stats API operations.
|
// statsContextTimeout is the timeout for stats API operations.
|
||||||
statsContextTimeout = 4 * time.Second
|
statsContextTimeout = 4 * time.Second
|
||||||
|
|
||||||
|
// healthCheckTimeout is the timeout for health check operations.
|
||||||
|
healthCheckTimeout = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// HealthCheckResponse represents the health check response.
|
||||||
|
type HealthCheckResponse struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Timestamp string `json:"timestamp"`
|
||||||
|
Checks map[string]string `json:"checks"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleHealthCheck returns a handler that performs health checks.
|
||||||
|
// Returns 200 if healthy, 503 if any check fails.
|
||||||
|
func (s *Server) handleHealthCheck() http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
checks := make(map[string]string)
|
||||||
|
healthy := true
|
||||||
|
|
||||||
|
// Check database connectivity
|
||||||
|
dbStats, err := s.db.GetStatsContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
checks["database"] = "error: " + err.Error()
|
||||||
|
healthy = false
|
||||||
|
} else if dbStats.ASNs == 0 && dbStats.Prefixes == 0 {
|
||||||
|
checks["database"] = "warning: empty database"
|
||||||
|
} else {
|
||||||
|
checks["database"] = "ok"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check streamer connection
|
||||||
|
metrics := s.streamer.GetMetrics()
|
||||||
|
if metrics.Connected {
|
||||||
|
checks["ris_live"] = "ok"
|
||||||
|
} else {
|
||||||
|
checks["ris_live"] = "disconnected"
|
||||||
|
healthy = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build response
|
||||||
|
status := "ok"
|
||||||
|
if !healthy {
|
||||||
|
status = "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
response := HealthCheckResponse{
|
||||||
|
Status: status,
|
||||||
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
||||||
|
Checks: checks,
|
||||||
|
}
|
||||||
|
|
||||||
|
if !healthy {
|
||||||
|
w.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writeJSONSuccess(w, response); err != nil {
|
||||||
|
s.logger.Error("Failed to encode health check response", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// handleRoot returns a handler that redirects to /status.
|
// handleRoot returns a handler that redirects to /status.
|
||||||
func (s *Server) handleRoot() http.HandlerFunc {
|
func (s *Server) handleRoot() http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|||||||
@ -24,6 +24,7 @@ func (s *Server) setupRoutes() {
|
|||||||
r.Get("/", s.handleRoot())
|
r.Get("/", s.handleRoot())
|
||||||
r.Get("/status", s.handleStatusHTML())
|
r.Get("/status", s.handleStatusHTML())
|
||||||
r.Get("/status.json", JSONValidationMiddleware(s.handleStatusJSON()).ServeHTTP)
|
r.Get("/status.json", JSONValidationMiddleware(s.handleStatusJSON()).ServeHTTP)
|
||||||
|
r.Get("/.well-known/healthcheck.json", JSONValidationMiddleware(s.handleHealthCheck()).ServeHTTP)
|
||||||
|
|
||||||
// AS and prefix detail pages
|
// AS and prefix detail pages
|
||||||
r.Get("/as/{asn}", s.handleASDetail())
|
r.Get("/as/{asn}", s.handleASDetail())
|
||||||
|
|||||||
@ -627,11 +627,11 @@ func (s *Streamer) stream(ctx context.Context) error {
|
|||||||
// Peer state changes - silently ignore
|
// Peer state changes - silently ignore
|
||||||
continue
|
continue
|
||||||
default:
|
default:
|
||||||
s.logger.Error("Unknown message type",
|
s.logger.Warn("Unknown message type, skipping",
|
||||||
"type", msg.Type,
|
"type", msg.Type,
|
||||||
"line", string(line),
|
|
||||||
)
|
)
|
||||||
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
|
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dispatch to interested handlers
|
// Dispatch to interested handlers
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user