routewatch/internal/routewatch/app.go
sneak cb1f4d9052 Add route update metrics tracking to PrefixHandler
- Add RecordIPv4Update and RecordIPv6Update to metrics package
- Add SetMetricsTracker method to PrefixHandler
- Track IPv4/IPv6 route updates when processing announcements
- Add GetMetricsTracker method to Streamer to expose metrics
2025-07-28 02:55:27 +02:00

268 lines
7.3 KiB
Go

// Package routewatch contains the primary RouteWatch type that represents a running instance
// of the application and contains pointers to its core dependencies, and is responsible for initialization.
package routewatch
import (
"context"
"fmt"
"os"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/config"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"git.eeqj.de/sneak/routewatch/internal/server"
"git.eeqj.de/sneak/routewatch/internal/snapshotter"
"git.eeqj.de/sneak/routewatch/internal/streamer"
"go.uber.org/fx"
)
const (
// routingTableStatsInterval is how often we log routing table statistics
routingTableStatsInterval = 15 * time.Second
)
// Dependencies contains all dependencies for RouteWatch
type Dependencies struct {
fx.In
DB database.Store
RoutingTable *routingtable.RoutingTable
Streamer *streamer.Streamer
Server *server.Server
Logger *logger.Logger
Config *config.Config
}
// RouteWatch represents the main application instance
type RouteWatch struct {
db database.Store
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer
server *server.Server
snapshotter *snapshotter.Snapshotter
logger *logger.Logger
maxRuntime time.Duration
shutdown bool
mu sync.Mutex
config *config.Config
dbHandler *ASHandler
peerHandler *PeerHandler
prefixHandler *PrefixHandler
peeringHandler *PeeringHandler
}
// isTruthy returns true if the value is considered truthy
// Empty string, "0", and "false" are considered falsy, everything else is truthy
func isTruthy(value string) bool {
return value != "" && value != "0" && value != "false"
}
// isSnapshotterEnabled checks if the snapshotter should be enabled based on environment variable
func isSnapshotterEnabled() bool {
return !isTruthy(os.Getenv("ROUTEWATCH_DISABLE_SNAPSHOTTER"))
}
// New creates a new RouteWatch instance
func New(deps Dependencies) *RouteWatch {
rw := &RouteWatch{
db: deps.DB,
routingTable: deps.RoutingTable,
streamer: deps.Streamer,
server: deps.Server,
logger: deps.Logger,
maxRuntime: deps.Config.MaxRuntime,
config: deps.Config,
}
// Create snapshotter if enabled
if isSnapshotterEnabled() {
snap, err := snapshotter.New(deps.RoutingTable, deps.Config, deps.Logger)
if err != nil {
deps.Logger.Error("Failed to create snapshotter", "error", err)
// Continue without snapshotter
} else {
rw.snapshotter = snap
}
}
return rw
}
// Run starts the RouteWatch application
func (rw *RouteWatch) Run(ctx context.Context) error {
rw.logger.Info("Starting RouteWatch")
// Apply runtime limit if specified
if rw.maxRuntime > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, rw.maxRuntime)
defer cancel()
rw.logger.Info("Running with time limit", "max_runtime", rw.maxRuntime)
}
// Register database handler to process BGP UPDATE messages
if rw.config.EnableBatchedDatabaseWrites {
rw.logger.Info("Using batched database handlers for improved performance")
// ASHandler maintains the asns table
rw.dbHandler = NewASHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.dbHandler)
// PeerHandler maintains the bgp_peers table
rw.peerHandler = NewPeerHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.peerHandler)
// PrefixHandler maintains the prefixes and live_routes tables
rw.prefixHandler = NewPrefixHandler(rw.db, rw.logger)
rw.prefixHandler.SetMetricsTracker(rw.streamer.GetMetricsTracker())
rw.streamer.RegisterHandler(rw.prefixHandler)
// PeeringHandler maintains the asn_peerings table
rw.peeringHandler = NewPeeringHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.peeringHandler)
} else {
// Non-batched handlers not implemented yet
rw.logger.Error("Non-batched handlers not implemented")
return fmt.Errorf("non-batched handlers not implemented")
}
// Register routing table handler to maintain in-memory routing table
rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger)
rw.streamer.RegisterHandler(rtHandler)
// Start periodic routing table stats logging
go rw.logRoutingTableStats(ctx)
// Start snapshotter if available
if rw.snapshotter != nil {
rw.snapshotter.Start(ctx)
}
// Start streaming
if err := rw.streamer.Start(); err != nil {
return err
}
// Start HTTP server
if err := rw.server.Start(); err != nil {
return err
}
// Wait for context cancellation
<-ctx.Done()
return nil
}
// Shutdown performs graceful shutdown of all services
func (rw *RouteWatch) Shutdown() {
rw.mu.Lock()
if rw.shutdown {
rw.mu.Unlock()
return
}
rw.shutdown = true
rw.mu.Unlock()
// Stop batched handlers first to flush remaining batches
if rw.dbHandler != nil {
rw.logger.Info("Flushing database handler")
rw.dbHandler.Stop()
}
if rw.peerHandler != nil {
rw.logger.Info("Flushing peer handler")
rw.peerHandler.Stop()
}
if rw.prefixHandler != nil {
rw.logger.Info("Flushing prefix handler")
rw.prefixHandler.Stop()
}
// Stop services
rw.streamer.Stop()
// Stop routing table expiration
rw.routingTable.Stop()
// Stop HTTP server with a timeout
const serverStopTimeout = 5 * time.Second
stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout)
defer cancel()
if err := rw.server.Stop(stopCtx); err != nil {
rw.logger.Error("Failed to stop HTTP server gracefully", "error", err)
}
// Log final metrics
metrics := rw.streamer.GetMetrics()
rw.logger.Info("Final metrics",
"total_messages", metrics.TotalMessages,
"total_bytes", metrics.TotalBytes,
"messages_per_sec", metrics.MessagesPerSec,
"bits_per_sec", metrics.BitsPerSec,
"duration", time.Since(metrics.ConnectedSince),
)
// Take final snapshot before shutdown if snapshotter is available
if rw.snapshotter != nil {
rw.logger.Info("Taking final snapshot before shutdown")
if err := rw.snapshotter.Shutdown(); err != nil {
rw.logger.Error("Failed to shutdown snapshotter", "error", err)
} else {
rw.logger.Info("Final snapshot completed")
}
} else {
rw.logger.Info("No snapshotter available")
}
}
// logRoutingTableStats periodically logs routing table statistics
func (rw *RouteWatch) logRoutingTableStats(ctx context.Context) {
// Log stats periodically
ticker := time.NewTicker(routingTableStatsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := rw.routingTable.GetDetailedStats()
rw.logger.Info("Routing table statistics",
"ipv4_routes", stats.IPv4Routes,
"ipv6_routes", stats.IPv6Routes,
"ipv4_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv4UpdatesRate),
"ipv6_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv6UpdatesRate),
"total_routes", stats.TotalRoutes,
"unique_prefixes", stats.UniquePrefixes,
"unique_origins", stats.UniqueOrigins,
"unique_peers", stats.UniquePeers,
)
}
}
}
// getModule provides all fx dependencies
func getModule() fx.Option {
return fx.Options(
fx.Provide(
logger.New,
config.New,
metrics.New,
fx.Annotate(
database.New,
fx.As(new(database.Store)),
),
routingtable.New,
streamer.New,
server.New,
New,
),
)
}