diff --git a/internal/database/database.go b/internal/database/database.go index 659b902..4d1580a 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -1950,3 +1950,23 @@ func (d *Database) getIPv6Info(ctx context.Context, ip string, parsedIP net.IP) return info, nil } + +// Vacuum runs the SQLite VACUUM command to reclaim unused space and defragment the database. +func (d *Database) Vacuum(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "VACUUM") + if err != nil { + return fmt.Errorf("failed to vacuum database: %w", err) + } + + return nil +} + +// Analyze runs the SQLite ANALYZE command to update query planner statistics. +func (d *Database) Analyze(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "ANALYZE") + if err != nil { + return fmt.Errorf("failed to analyze database: %w", err) + } + + return nil +} diff --git a/internal/database/interface.go b/internal/database/interface.go index 9eca3cb..fb0f5c8 100644 --- a/internal/database/interface.go +++ b/internal/database/interface.go @@ -81,6 +81,10 @@ type Store interface { // Lifecycle Close() error + + // Maintenance operations + Vacuum(ctx context.Context) error + Analyze(ctx context.Context) error } // Ensure Database implements Store diff --git a/internal/routewatch/app.go b/internal/routewatch/app.go index 6aaaebb..0c29d75 100644 --- a/internal/routewatch/app.go +++ b/internal/routewatch/app.go @@ -44,6 +44,7 @@ type RouteWatch struct { prefixHandler *PrefixHandler peeringHandler *PeeringHandler asnFetcher *ASNFetcher + dbMaintainer *DBMaintainer } // New creates a new RouteWatch instance @@ -115,6 +116,10 @@ func (rw *RouteWatch) Run(ctx context.Context) error { rw.asnFetcher.Start() rw.server.SetASNFetcher(rw.asnFetcher) + // Start database maintenance goroutine + rw.dbMaintainer = NewDBMaintainer(rw.db, rw.logger.Logger) + rw.dbMaintainer.Start() + // Wait for context cancellation <-ctx.Done() @@ -155,6 +160,11 @@ func (rw *RouteWatch) Shutdown() { rw.asnFetcher.Stop() } + // Stop database maintainer + if rw.dbMaintainer != nil { + rw.dbMaintainer.Stop() + } + // Stop services rw.streamer.Stop() diff --git a/internal/routewatch/app_integration_test.go b/internal/routewatch/app_integration_test.go index 2a176ce..96e9eb7 100644 --- a/internal/routewatch/app_integration_test.go +++ b/internal/routewatch/app_integration_test.go @@ -405,6 +405,16 @@ func (m *mockStore) UpdatePeerBatch(peers map[string]database.PeerUpdate) error return nil } +// Vacuum mock implementation +func (m *mockStore) Vacuum(ctx context.Context) error { + return nil +} + +// Analyze mock implementation +func (m *mockStore) Analyze(ctx context.Context) error { + return nil +} + func TestRouteWatchLiveFeed(t *testing.T) { // Create mock database diff --git a/internal/routewatch/dbmaintainer.go b/internal/routewatch/dbmaintainer.go new file mode 100644 index 0000000..9e4b0c4 --- /dev/null +++ b/internal/routewatch/dbmaintainer.go @@ -0,0 +1,142 @@ +// Package routewatch contains the database maintainer for background maintenance tasks. +package routewatch + +import ( + "context" + "log/slog" + "sync" + "time" + + "git.eeqj.de/sneak/routewatch/internal/database" +) + +// Database maintenance configuration constants. +const ( + // vacuumInterval is how often to run VACUUM. + vacuumInterval = 6 * time.Hour + + // analyzeInterval is how often to run ANALYZE. + analyzeInterval = 1 * time.Hour + + // maintenanceTimeout is the max time for a maintenance operation. + maintenanceTimeout = 5 * time.Minute +) + +// DBMaintainer handles background database maintenance tasks. +type DBMaintainer struct { + db database.Store + logger *slog.Logger + stopCh chan struct{} + wg sync.WaitGroup + + // Stats tracking + statsMu sync.Mutex + lastVacuum time.Time + lastAnalyze time.Time + vacuumCount int + analyzeCount int + lastVacuumError error + lastAnalyzeError error +} + +// NewDBMaintainer creates a new database maintainer. +func NewDBMaintainer(db database.Store, logger *slog.Logger) *DBMaintainer { + return &DBMaintainer{ + db: db, + logger: logger.With("component", "db_maintainer"), + stopCh: make(chan struct{}), + } +} + +// Start begins the background maintenance goroutine. +func (m *DBMaintainer) Start() { + m.wg.Add(1) + go m.run() + m.logger.Info("Database maintainer started", + "vacuum_interval", vacuumInterval, + "analyze_interval", analyzeInterval, + ) +} + +// Stop gracefully shuts down the maintainer. +func (m *DBMaintainer) Stop() { + close(m.stopCh) + m.wg.Wait() + m.logger.Info("Database maintainer stopped") +} + +// run is the main background loop. +func (m *DBMaintainer) run() { + defer m.wg.Done() + + // Use different timers for each task + vacuumTimer := time.NewTimer(vacuumInterval) + analyzeTimer := time.NewTimer(analyzeInterval) + defer vacuumTimer.Stop() + defer analyzeTimer.Stop() + + for { + select { + case <-m.stopCh: + return + + case <-vacuumTimer.C: + m.runVacuum() + vacuumTimer.Reset(vacuumInterval) + + case <-analyzeTimer.C: + m.runAnalyze() + analyzeTimer.Reset(analyzeInterval) + } + } +} + +// runVacuum performs a VACUUM operation on the database. +func (m *DBMaintainer) runVacuum() { + ctx, cancel := context.WithTimeout(context.Background(), maintenanceTimeout) + defer cancel() + + m.logger.Info("Starting database VACUUM") + startTime := time.Now() + + err := m.db.Vacuum(ctx) + + m.statsMu.Lock() + m.lastVacuum = time.Now() + m.lastVacuumError = err + if err == nil { + m.vacuumCount++ + } + m.statsMu.Unlock() + + if err != nil { + m.logger.Error("VACUUM failed", "error", err, "duration", time.Since(startTime)) + } else { + m.logger.Info("VACUUM completed", "duration", time.Since(startTime)) + } +} + +// runAnalyze performs an ANALYZE operation on the database. +func (m *DBMaintainer) runAnalyze() { + ctx, cancel := context.WithTimeout(context.Background(), maintenanceTimeout) + defer cancel() + + m.logger.Info("Starting database ANALYZE") + startTime := time.Now() + + err := m.db.Analyze(ctx) + + m.statsMu.Lock() + m.lastAnalyze = time.Now() + m.lastAnalyzeError = err + if err == nil { + m.analyzeCount++ + } + m.statsMu.Unlock() + + if err != nil { + m.logger.Error("ANALYZE failed", "error", err, "duration", time.Since(startTime)) + } else { + m.logger.Info("ANALYZE completed", "duration", time.Since(startTime)) + } +} diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 58333ab..61b0208 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -24,8 +24,70 @@ import ( const ( // statsContextTimeout is the timeout for stats API operations. statsContextTimeout = 4 * time.Second + + // healthCheckTimeout is the timeout for health check operations. + healthCheckTimeout = 2 * time.Second ) +// HealthCheckResponse represents the health check response. +type HealthCheckResponse struct { + Status string `json:"status"` + Timestamp string `json:"timestamp"` + Checks map[string]string `json:"checks"` +} + +// handleHealthCheck returns a handler that performs health checks. +// Returns 200 if healthy, 503 if any check fails. +func (s *Server) handleHealthCheck() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) + defer cancel() + + checks := make(map[string]string) + healthy := true + + // Check database connectivity + dbStats, err := s.db.GetStatsContext(ctx) + if err != nil { + checks["database"] = "error: " + err.Error() + healthy = false + } else if dbStats.ASNs == 0 && dbStats.Prefixes == 0 { + checks["database"] = "warning: empty database" + } else { + checks["database"] = "ok" + } + + // Check streamer connection + metrics := s.streamer.GetMetrics() + if metrics.Connected { + checks["ris_live"] = "ok" + } else { + checks["ris_live"] = "disconnected" + healthy = false + } + + // Build response + status := "ok" + if !healthy { + status = "error" + } + + response := HealthCheckResponse{ + Status: status, + Timestamp: time.Now().UTC().Format(time.RFC3339), + Checks: checks, + } + + if !healthy { + w.WriteHeader(http.StatusServiceUnavailable) + } + + if err := writeJSONSuccess(w, response); err != nil { + s.logger.Error("Failed to encode health check response", "error", err) + } + } +} + // handleRoot returns a handler that redirects to /status. func (s *Server) handleRoot() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/server/routes.go b/internal/server/routes.go index b628c88..8cc6696 100644 --- a/internal/server/routes.go +++ b/internal/server/routes.go @@ -24,6 +24,7 @@ func (s *Server) setupRoutes() { r.Get("/", s.handleRoot()) r.Get("/status", s.handleStatusHTML()) r.Get("/status.json", JSONValidationMiddleware(s.handleStatusJSON()).ServeHTTP) + r.Get("/.well-known/healthcheck.json", JSONValidationMiddleware(s.handleHealthCheck()).ServeHTTP) // AS and prefix detail pages r.Get("/as/{asn}", s.handleASDetail()) diff --git a/internal/streamer/streamer.go b/internal/streamer/streamer.go index f4f9bb9..3c1f48b 100644 --- a/internal/streamer/streamer.go +++ b/internal/streamer/streamer.go @@ -627,11 +627,11 @@ func (s *Streamer) stream(ctx context.Context) error { // Peer state changes - silently ignore continue default: - s.logger.Error("Unknown message type", + s.logger.Warn("Unknown message type, skipping", "type", msg.Type, - "line", string(line), ) - panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type)) + + continue } // Dispatch to interested handlers