Remove RoutingTableHandler and snapshotter, use database for route stats

- Remove RoutingTableHandler as PrefixHandler maintains live_routes table
- Update server to get route counts from database instead of in-memory routing table
- Add GetLiveRouteCounts method to database for IPv4/IPv6 route counts
- Use metrics tracker in PrefixHandler for route update rates
- Remove snapshotter entirely as database contains all information
- Update tests to work without routing table
This commit is contained in:
Jeffrey Paul 2025-07-28 03:02:44 +02:00
parent cb1f4d9052
commit d929f24f80
6 changed files with 77 additions and 265 deletions

View File

@ -522,3 +522,20 @@ func (d *Database) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []Pr
return ipv4, ipv6, nil
}
// GetLiveRouteCounts returns the count of IPv4 and IPv6 routes
func (d *Database) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) {
// Get IPv4 count
err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes WHERE ip_version = 4").Scan(&ipv4Count)
if err != nil {
return 0, 0, fmt.Errorf("failed to count IPv4 routes: %w", err)
}
// Get IPv6 count
err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes WHERE ip_version = 6").Scan(&ipv6Count)
if err != nil {
return 0, 0, fmt.Errorf("failed to count IPv6 routes: %w", err)
}
return ipv4Count, ipv6Count, nil
}

View File

@ -41,6 +41,7 @@ type Store interface {
UpsertLiveRoute(route *LiveRoute) error
DeleteLiveRoute(prefix string, originASN int, peerIP string) error
GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error)
// Lifecycle
Close() error

View File

@ -5,7 +5,6 @@ package routewatch
import (
"context"
"fmt"
"os"
"sync"
"time"
@ -13,38 +12,28 @@ import (
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"git.eeqj.de/sneak/routewatch/internal/server"
"git.eeqj.de/sneak/routewatch/internal/snapshotter"
"git.eeqj.de/sneak/routewatch/internal/streamer"
"go.uber.org/fx"
)
const (
// routingTableStatsInterval is how often we log routing table statistics
routingTableStatsInterval = 15 * time.Second
)
// Dependencies contains all dependencies for RouteWatch
type Dependencies struct {
fx.In
DB database.Store
RoutingTable *routingtable.RoutingTable
Streamer *streamer.Streamer
Server *server.Server
Logger *logger.Logger
Config *config.Config
DB database.Store
Streamer *streamer.Streamer
Server *server.Server
Logger *logger.Logger
Config *config.Config
}
// RouteWatch represents the main application instance
type RouteWatch struct {
db database.Store
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer
server *server.Server
snapshotter *snapshotter.Snapshotter
logger *logger.Logger
maxRuntime time.Duration
shutdown bool
@ -56,38 +45,15 @@ type RouteWatch struct {
peeringHandler *PeeringHandler
}
// isTruthy returns true if the value is considered truthy
// Empty string, "0", and "false" are considered falsy, everything else is truthy
func isTruthy(value string) bool {
return value != "" && value != "0" && value != "false"
}
// isSnapshotterEnabled checks if the snapshotter should be enabled based on environment variable
func isSnapshotterEnabled() bool {
return !isTruthy(os.Getenv("ROUTEWATCH_DISABLE_SNAPSHOTTER"))
}
// New creates a new RouteWatch instance
func New(deps Dependencies) *RouteWatch {
rw := &RouteWatch{
db: deps.DB,
routingTable: deps.RoutingTable,
streamer: deps.Streamer,
server: deps.Server,
logger: deps.Logger,
maxRuntime: deps.Config.MaxRuntime,
config: deps.Config,
}
// Create snapshotter if enabled
if isSnapshotterEnabled() {
snap, err := snapshotter.New(deps.RoutingTable, deps.Config, deps.Logger)
if err != nil {
deps.Logger.Error("Failed to create snapshotter", "error", err)
// Continue without snapshotter
} else {
rw.snapshotter = snap
}
db: deps.DB,
streamer: deps.Streamer,
server: deps.Server,
logger: deps.Logger,
maxRuntime: deps.Config.MaxRuntime,
config: deps.Config,
}
return rw
@ -131,17 +97,7 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
return fmt.Errorf("non-batched handlers not implemented")
}
// Register routing table handler to maintain in-memory routing table
rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger)
rw.streamer.RegisterHandler(rtHandler)
// Start periodic routing table stats logging
go rw.logRoutingTableStats(ctx)
// Start snapshotter if available
if rw.snapshotter != nil {
rw.snapshotter.Start(ctx)
}
// No longer need routing table handler - PrefixHandler maintains live_routes table
// Start streaming
if err := rw.streamer.Start(); err != nil {
@ -187,9 +143,6 @@ func (rw *RouteWatch) Shutdown() {
// Stop services
rw.streamer.Stop()
// Stop routing table expiration
rw.routingTable.Stop()
// Stop HTTP server with a timeout
const serverStopTimeout = 5 * time.Second
stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout)
@ -208,43 +161,6 @@ func (rw *RouteWatch) Shutdown() {
"duration", time.Since(metrics.ConnectedSince),
)
// Take final snapshot before shutdown if snapshotter is available
if rw.snapshotter != nil {
rw.logger.Info("Taking final snapshot before shutdown")
if err := rw.snapshotter.Shutdown(); err != nil {
rw.logger.Error("Failed to shutdown snapshotter", "error", err)
} else {
rw.logger.Info("Final snapshot completed")
}
} else {
rw.logger.Info("No snapshotter available")
}
}
// logRoutingTableStats periodically logs routing table statistics
func (rw *RouteWatch) logRoutingTableStats(ctx context.Context) {
// Log stats periodically
ticker := time.NewTicker(routingTableStatsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := rw.routingTable.GetDetailedStats()
rw.logger.Info("Routing table statistics",
"ipv4_routes", stats.IPv4Routes,
"ipv6_routes", stats.IPv6Routes,
"ipv4_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv4UpdatesRate),
"ipv6_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv6UpdatesRate),
"total_routes", stats.TotalRoutes,
"unique_prefixes", stats.UniquePrefixes,
"unique_origins", stats.UniqueOrigins,
"unique_peers", stats.UniquePeers,
)
}
}
}
// getModule provides all fx dependencies
@ -258,7 +174,6 @@ func getModule() fx.Option {
database.New,
fx.As(new(database.Store)),
),
routingtable.New,
streamer.New,
server.New,
New,

View File

@ -12,7 +12,6 @@ import (
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"git.eeqj.de/sneak/routewatch/internal/server"
"git.eeqj.de/sneak/routewatch/internal/streamer"
"github.com/google/uuid"
@ -181,9 +180,13 @@ func (m *mockStore) GetPrefixDistribution() (ipv4 []database.PrefixDistribution,
return nil, nil, nil
}
// GetLiveRouteCounts mock implementation
func (m *mockStore) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) {
// Return mock counts
return m.RouteCount / 2, m.RouteCount / 2, nil
}
func TestRouteWatchLiveFeed(t *testing.T) {
// Disable snapshotter for tests
t.Setenv("ROUTEWATCH_DISABLE_SNAPSHOTTER", "1")
// Create mock database
mockDB := newMockStore()
@ -204,20 +207,16 @@ func TestRouteWatchLiveFeed(t *testing.T) {
EnableBatchedDatabaseWrites: true,
}
// Create routing table
rt := routingtable.New(cfg, logger)
// Create server
srv := server.New(mockDB, rt, s, logger)
srv := server.New(mockDB, s, logger)
// Create RouteWatch with 5 second limit
deps := Dependencies{
DB: mockDB,
RoutingTable: rt,
Streamer: s,
Server: srv,
Logger: logger,
Config: cfg,
DB: mockDB,
Streamer: s,
Server: srv,
Logger: logger,
Config: cfg,
}
rw := New(deps)

View File

@ -1,131 +0,0 @@
package routewatch
import (
"strconv"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"github.com/google/uuid"
)
const (
// routingTableHandlerQueueSize is the queue capacity for in-memory routing table operations
routingTableHandlerQueueSize = 100000
)
// RoutingTableHandler handles BGP messages and updates the in-memory routing table
type RoutingTableHandler struct {
rt *routingtable.RoutingTable
logger *logger.Logger
}
// NewRoutingTableHandler creates a new routing table handler
func NewRoutingTableHandler(rt *routingtable.RoutingTable, logger *logger.Logger) *RoutingTableHandler {
return &RoutingTableHandler{
rt: rt,
logger: logger,
}
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *RoutingTableHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the routing table
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *RoutingTableHandler) QueueCapacity() int {
// In-memory operations are very fast, so use a large queue
return routingTableHandlerQueueSize
}
// HandleMessage processes a RIS message and updates the routing table
func (h *RoutingTableHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Parse peer ASN
peerASN, err := strconv.Atoi(msg.PeerASN)
if err != nil {
h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err)
return
}
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
// Process announcements
for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes {
// Generate deterministic UUIDs based on the prefix and origin ASN
// This ensures consistency across restarts
prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
originASNID := uuid.NewSHA1(uuid.NameSpaceOID, []byte(strconv.Itoa(originASN)))
// Create route for the routing table
route := &routingtable.Route{
PrefixID: prefixID,
Prefix: prefix,
OriginASNID: originASNID,
OriginASN: originASN,
PeerASN: peerASN,
ASPath: msg.Path,
NextHop: announcement.NextHop,
AnnouncedAt: timestamp,
}
// Add route to routing table
h.rt.AddRoute(route)
}
}
// Process withdrawals
for _, prefix := range msg.Withdrawals {
// Generate deterministic UUID for the prefix
prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
// Withdraw all routes for this prefix from this peer
h.rt.WithdrawRoutesByPrefixAndPeer(prefixID, peerASN)
}
}
// GetRoutingTableStats returns statistics about the routing table
func (h *RoutingTableHandler) GetRoutingTableStats() map[string]int {
return h.rt.Stats()
}
// GetActiveRouteCount returns the number of active routes
func (h *RoutingTableHandler) GetActiveRouteCount() int {
return h.rt.Size()
}
// GetRoutesByPrefix returns all routes for a specific prefix
func (h *RoutingTableHandler) GetRoutesByPrefix(prefixID uuid.UUID) []*routingtable.Route {
return h.rt.GetRoutesByPrefix(prefixID)
}
// GetRoutesByOriginASN returns all routes originated by a specific ASN
func (h *RoutingTableHandler) GetRoutesByOriginASN(originASNID uuid.UUID) []*routingtable.Route {
return h.rt.GetRoutesByOriginASN(originASNID)
}
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
func (h *RoutingTableHandler) GetRoutesByPeerASN(peerASN int) []*routingtable.Route {
return h.rt.GetRoutesByPeerASN(peerASN)
}
// GetAllRoutes returns all active routes
func (h *RoutingTableHandler) GetAllRoutes() []*routingtable.Route {
return h.rt.GetAllRoutes()
}
// ClearRoutingTable clears all routes from the routing table
func (h *RoutingTableHandler) ClearRoutingTable() {
h.rt.Clear()
h.logger.Info("Cleared routing table")
}

View File

@ -10,7 +10,6 @@ import (
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"git.eeqj.de/sneak/routewatch/internal/streamer"
"git.eeqj.de/sneak/routewatch/internal/templates"
"github.com/go-chi/chi/v5"
@ -19,21 +18,19 @@ import (
// Server provides HTTP endpoints for status monitoring
type Server struct {
router *chi.Mux
db database.Store
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer
logger *logger.Logger
srv *http.Server
router *chi.Mux
db database.Store
streamer *streamer.Streamer
logger *logger.Logger
srv *http.Server
}
// New creates a new HTTP server
func New(db database.Store, rt *routingtable.RoutingTable, streamer *streamer.Streamer, logger *logger.Logger) *Server {
func New(db database.Store, streamer *streamer.Streamer, logger *logger.Logger) *Server {
s := &Server{
db: db,
routingTable: rt,
streamer: streamer,
logger: logger,
db: db,
streamer: streamer,
logger: logger,
}
s.setupRoutes()
@ -196,8 +193,15 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
const bitsPerMegabit = 1000000.0
// Get detailed routing table stats
rtStats := s.routingTable.GetDetailedStats()
// Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts
}
// Get route update metrics
routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics()
stats := Stats{
Uptime: uptime,
@ -213,10 +217,10 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
Peerings: dbStats.Peerings,
DatabaseSizeBytes: dbStats.FileSizeBytes,
LiveRoutes: dbStats.LiveRoutes,
IPv4Routes: rtStats.IPv4Routes,
IPv6Routes: rtStats.IPv6Routes,
IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
IPv4Routes: ipv4Routes,
IPv6Routes: ipv6Routes,
IPv4UpdatesPerSec: routeMetrics.IPv4UpdatesPerSec,
IPv6UpdatesPerSec: routeMetrics.IPv6UpdatesPerSec,
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
}
@ -324,8 +328,15 @@ func (s *Server) handleStats() http.HandlerFunc {
const bitsPerMegabit = 1000000.0
// Get detailed routing table stats
rtStats := s.routingTable.GetDetailedStats()
// Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts
}
// Get route update metrics
routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics()
// Get handler stats
handlerStats := s.streamer.GetHandlerStats()
@ -358,10 +369,10 @@ func (s *Server) handleStats() http.HandlerFunc {
Peerings: dbStats.Peerings,
DatabaseSizeBytes: dbStats.FileSizeBytes,
LiveRoutes: dbStats.LiveRoutes,
IPv4Routes: rtStats.IPv4Routes,
IPv6Routes: rtStats.IPv6Routes,
IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
IPv4Routes: ipv4Routes,
IPv6Routes: ipv6Routes,
IPv4UpdatesPerSec: routeMetrics.IPv4UpdatesPerSec,
IPv6UpdatesPerSec: routeMetrics.IPv6UpdatesPerSec,
HandlerStats: handlerStatsInfo,
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,