diff --git a/internal/database/database.go b/internal/database/database.go index a143b71..5558ff2 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -522,3 +522,20 @@ func (d *Database) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []Pr return ipv4, ipv6, nil } + +// GetLiveRouteCounts returns the count of IPv4 and IPv6 routes +func (d *Database) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) { + // Get IPv4 count + err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes WHERE ip_version = 4").Scan(&ipv4Count) + if err != nil { + return 0, 0, fmt.Errorf("failed to count IPv4 routes: %w", err) + } + + // Get IPv6 count + err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes WHERE ip_version = 6").Scan(&ipv6Count) + if err != nil { + return 0, 0, fmt.Errorf("failed to count IPv6 routes: %w", err) + } + + return ipv4Count, ipv6Count, nil +} diff --git a/internal/database/interface.go b/internal/database/interface.go index c2d4a3a..3e5b2f4 100644 --- a/internal/database/interface.go +++ b/internal/database/interface.go @@ -41,6 +41,7 @@ type Store interface { UpsertLiveRoute(route *LiveRoute) error DeleteLiveRoute(prefix string, originASN int, peerIP string) error GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) + GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) // Lifecycle Close() error diff --git a/internal/routewatch/app.go b/internal/routewatch/app.go index 1e4974f..063e3f4 100644 --- a/internal/routewatch/app.go +++ b/internal/routewatch/app.go @@ -5,7 +5,6 @@ package routewatch import ( "context" "fmt" - "os" "sync" "time" @@ -13,38 +12,28 @@ import ( "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/metrics" - "git.eeqj.de/sneak/routewatch/internal/routingtable" "git.eeqj.de/sneak/routewatch/internal/server" - "git.eeqj.de/sneak/routewatch/internal/snapshotter" "git.eeqj.de/sneak/routewatch/internal/streamer" "go.uber.org/fx" ) -const ( - // routingTableStatsInterval is how often we log routing table statistics - routingTableStatsInterval = 15 * time.Second -) - // Dependencies contains all dependencies for RouteWatch type Dependencies struct { fx.In - DB database.Store - RoutingTable *routingtable.RoutingTable - Streamer *streamer.Streamer - Server *server.Server - Logger *logger.Logger - Config *config.Config + DB database.Store + Streamer *streamer.Streamer + Server *server.Server + Logger *logger.Logger + Config *config.Config } // RouteWatch represents the main application instance type RouteWatch struct { db database.Store - routingTable *routingtable.RoutingTable streamer *streamer.Streamer server *server.Server - snapshotter *snapshotter.Snapshotter logger *logger.Logger maxRuntime time.Duration shutdown bool @@ -56,38 +45,15 @@ type RouteWatch struct { peeringHandler *PeeringHandler } -// isTruthy returns true if the value is considered truthy -// Empty string, "0", and "false" are considered falsy, everything else is truthy -func isTruthy(value string) bool { - return value != "" && value != "0" && value != "false" -} - -// isSnapshotterEnabled checks if the snapshotter should be enabled based on environment variable -func isSnapshotterEnabled() bool { - return !isTruthy(os.Getenv("ROUTEWATCH_DISABLE_SNAPSHOTTER")) -} - // New creates a new RouteWatch instance func New(deps Dependencies) *RouteWatch { rw := &RouteWatch{ - db: deps.DB, - routingTable: deps.RoutingTable, - streamer: deps.Streamer, - server: deps.Server, - logger: deps.Logger, - maxRuntime: deps.Config.MaxRuntime, - config: deps.Config, - } - - // Create snapshotter if enabled - if isSnapshotterEnabled() { - snap, err := snapshotter.New(deps.RoutingTable, deps.Config, deps.Logger) - if err != nil { - deps.Logger.Error("Failed to create snapshotter", "error", err) - // Continue without snapshotter - } else { - rw.snapshotter = snap - } + db: deps.DB, + streamer: deps.Streamer, + server: deps.Server, + logger: deps.Logger, + maxRuntime: deps.Config.MaxRuntime, + config: deps.Config, } return rw @@ -131,17 +97,7 @@ func (rw *RouteWatch) Run(ctx context.Context) error { return fmt.Errorf("non-batched handlers not implemented") } - // Register routing table handler to maintain in-memory routing table - rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger) - rw.streamer.RegisterHandler(rtHandler) - - // Start periodic routing table stats logging - go rw.logRoutingTableStats(ctx) - - // Start snapshotter if available - if rw.snapshotter != nil { - rw.snapshotter.Start(ctx) - } + // No longer need routing table handler - PrefixHandler maintains live_routes table // Start streaming if err := rw.streamer.Start(); err != nil { @@ -187,9 +143,6 @@ func (rw *RouteWatch) Shutdown() { // Stop services rw.streamer.Stop() - // Stop routing table expiration - rw.routingTable.Stop() - // Stop HTTP server with a timeout const serverStopTimeout = 5 * time.Second stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout) @@ -208,43 +161,6 @@ func (rw *RouteWatch) Shutdown() { "duration", time.Since(metrics.ConnectedSince), ) - // Take final snapshot before shutdown if snapshotter is available - if rw.snapshotter != nil { - rw.logger.Info("Taking final snapshot before shutdown") - if err := rw.snapshotter.Shutdown(); err != nil { - rw.logger.Error("Failed to shutdown snapshotter", "error", err) - } else { - rw.logger.Info("Final snapshot completed") - } - } else { - rw.logger.Info("No snapshotter available") - } -} - -// logRoutingTableStats periodically logs routing table statistics -func (rw *RouteWatch) logRoutingTableStats(ctx context.Context) { - // Log stats periodically - ticker := time.NewTicker(routingTableStatsInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - stats := rw.routingTable.GetDetailedStats() - rw.logger.Info("Routing table statistics", - "ipv4_routes", stats.IPv4Routes, - "ipv6_routes", stats.IPv6Routes, - "ipv4_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv4UpdatesRate), - "ipv6_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv6UpdatesRate), - "total_routes", stats.TotalRoutes, - "unique_prefixes", stats.UniquePrefixes, - "unique_origins", stats.UniqueOrigins, - "unique_peers", stats.UniquePeers, - ) - } - } } // getModule provides all fx dependencies @@ -258,7 +174,6 @@ func getModule() fx.Option { database.New, fx.As(new(database.Store)), ), - routingtable.New, streamer.New, server.New, New, diff --git a/internal/routewatch/app_integration_test.go b/internal/routewatch/app_integration_test.go index cb655bc..bc962cf 100644 --- a/internal/routewatch/app_integration_test.go +++ b/internal/routewatch/app_integration_test.go @@ -12,7 +12,6 @@ import ( "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/metrics" - "git.eeqj.de/sneak/routewatch/internal/routingtable" "git.eeqj.de/sneak/routewatch/internal/server" "git.eeqj.de/sneak/routewatch/internal/streamer" "github.com/google/uuid" @@ -181,9 +180,13 @@ func (m *mockStore) GetPrefixDistribution() (ipv4 []database.PrefixDistribution, return nil, nil, nil } +// GetLiveRouteCounts mock implementation +func (m *mockStore) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) { + // Return mock counts + return m.RouteCount / 2, m.RouteCount / 2, nil +} + func TestRouteWatchLiveFeed(t *testing.T) { - // Disable snapshotter for tests - t.Setenv("ROUTEWATCH_DISABLE_SNAPSHOTTER", "1") // Create mock database mockDB := newMockStore() @@ -204,20 +207,16 @@ func TestRouteWatchLiveFeed(t *testing.T) { EnableBatchedDatabaseWrites: true, } - // Create routing table - rt := routingtable.New(cfg, logger) - // Create server - srv := server.New(mockDB, rt, s, logger) + srv := server.New(mockDB, s, logger) // Create RouteWatch with 5 second limit deps := Dependencies{ - DB: mockDB, - RoutingTable: rt, - Streamer: s, - Server: srv, - Logger: logger, - Config: cfg, + DB: mockDB, + Streamer: s, + Server: srv, + Logger: logger, + Config: cfg, } rw := New(deps) diff --git a/internal/routewatch/routingtablehandler.go b/internal/routewatch/routingtablehandler.go deleted file mode 100644 index 9bf8b0c..0000000 --- a/internal/routewatch/routingtablehandler.go +++ /dev/null @@ -1,131 +0,0 @@ -package routewatch - -import ( - "strconv" - - "git.eeqj.de/sneak/routewatch/internal/logger" - "git.eeqj.de/sneak/routewatch/internal/ristypes" - "git.eeqj.de/sneak/routewatch/internal/routingtable" - "github.com/google/uuid" -) - -const ( - // routingTableHandlerQueueSize is the queue capacity for in-memory routing table operations - routingTableHandlerQueueSize = 100000 -) - -// RoutingTableHandler handles BGP messages and updates the in-memory routing table -type RoutingTableHandler struct { - rt *routingtable.RoutingTable - logger *logger.Logger -} - -// NewRoutingTableHandler creates a new routing table handler -func NewRoutingTableHandler(rt *routingtable.RoutingTable, logger *logger.Logger) *RoutingTableHandler { - return &RoutingTableHandler{ - rt: rt, - logger: logger, - } -} - -// WantsMessage returns true if this handler wants to process messages of the given type -func (h *RoutingTableHandler) WantsMessage(messageType string) bool { - // We only care about UPDATE messages for the routing table - return messageType == "UPDATE" -} - -// QueueCapacity returns the desired queue capacity for this handler -func (h *RoutingTableHandler) QueueCapacity() int { - // In-memory operations are very fast, so use a large queue - return routingTableHandlerQueueSize -} - -// HandleMessage processes a RIS message and updates the routing table -func (h *RoutingTableHandler) HandleMessage(msg *ristypes.RISMessage) { - // Use the pre-parsed timestamp - timestamp := msg.ParsedTimestamp - - // Parse peer ASN - peerASN, err := strconv.Atoi(msg.PeerASN) - if err != nil { - h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err) - - return - } - - // Get origin ASN from path (last element) - var originASN int - if len(msg.Path) > 0 { - originASN = msg.Path[len(msg.Path)-1] - } - - // Process announcements - for _, announcement := range msg.Announcements { - for _, prefix := range announcement.Prefixes { - // Generate deterministic UUIDs based on the prefix and origin ASN - // This ensures consistency across restarts - prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix)) - originASNID := uuid.NewSHA1(uuid.NameSpaceOID, []byte(strconv.Itoa(originASN))) - - // Create route for the routing table - route := &routingtable.Route{ - PrefixID: prefixID, - Prefix: prefix, - OriginASNID: originASNID, - OriginASN: originASN, - PeerASN: peerASN, - ASPath: msg.Path, - NextHop: announcement.NextHop, - AnnouncedAt: timestamp, - } - - // Add route to routing table - h.rt.AddRoute(route) - } - } - - // Process withdrawals - for _, prefix := range msg.Withdrawals { - // Generate deterministic UUID for the prefix - prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix)) - - // Withdraw all routes for this prefix from this peer - h.rt.WithdrawRoutesByPrefixAndPeer(prefixID, peerASN) - } -} - -// GetRoutingTableStats returns statistics about the routing table -func (h *RoutingTableHandler) GetRoutingTableStats() map[string]int { - return h.rt.Stats() -} - -// GetActiveRouteCount returns the number of active routes -func (h *RoutingTableHandler) GetActiveRouteCount() int { - return h.rt.Size() -} - -// GetRoutesByPrefix returns all routes for a specific prefix -func (h *RoutingTableHandler) GetRoutesByPrefix(prefixID uuid.UUID) []*routingtable.Route { - return h.rt.GetRoutesByPrefix(prefixID) -} - -// GetRoutesByOriginASN returns all routes originated by a specific ASN -func (h *RoutingTableHandler) GetRoutesByOriginASN(originASNID uuid.UUID) []*routingtable.Route { - return h.rt.GetRoutesByOriginASN(originASNID) -} - -// GetRoutesByPeerASN returns all routes received from a specific peer ASN -func (h *RoutingTableHandler) GetRoutesByPeerASN(peerASN int) []*routingtable.Route { - return h.rt.GetRoutesByPeerASN(peerASN) -} - -// GetAllRoutes returns all active routes -func (h *RoutingTableHandler) GetAllRoutes() []*routingtable.Route { - return h.rt.GetAllRoutes() -} - -// ClearRoutingTable clears all routes from the routing table -func (h *RoutingTableHandler) ClearRoutingTable() { - h.rt.Clear() - h.logger.Info("Cleared routing table") -} diff --git a/internal/server/server.go b/internal/server/server.go index eadcfba..0e04a91 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,7 +10,6 @@ import ( "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/logger" - "git.eeqj.de/sneak/routewatch/internal/routingtable" "git.eeqj.de/sneak/routewatch/internal/streamer" "git.eeqj.de/sneak/routewatch/internal/templates" "github.com/go-chi/chi/v5" @@ -19,21 +18,19 @@ import ( // Server provides HTTP endpoints for status monitoring type Server struct { - router *chi.Mux - db database.Store - routingTable *routingtable.RoutingTable - streamer *streamer.Streamer - logger *logger.Logger - srv *http.Server + router *chi.Mux + db database.Store + streamer *streamer.Streamer + logger *logger.Logger + srv *http.Server } // New creates a new HTTP server -func New(db database.Store, rt *routingtable.RoutingTable, streamer *streamer.Streamer, logger *logger.Logger) *Server { +func New(db database.Store, streamer *streamer.Streamer, logger *logger.Logger) *Server { s := &Server{ - db: db, - routingTable: rt, - streamer: streamer, - logger: logger, + db: db, + streamer: streamer, + logger: logger, } s.setupRoutes() @@ -196,8 +193,15 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { const bitsPerMegabit = 1000000.0 - // Get detailed routing table stats - rtStats := s.routingTable.GetDetailedStats() + // Get route counts from database + ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts() + if err != nil { + s.logger.Warn("Failed to get live route counts", "error", err) + // Continue with zero counts + } + + // Get route update metrics + routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics() stats := Stats{ Uptime: uptime, @@ -213,10 +217,10 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { Peerings: dbStats.Peerings, DatabaseSizeBytes: dbStats.FileSizeBytes, LiveRoutes: dbStats.LiveRoutes, - IPv4Routes: rtStats.IPv4Routes, - IPv6Routes: rtStats.IPv6Routes, - IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate, - IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate, + IPv4Routes: ipv4Routes, + IPv6Routes: ipv6Routes, + IPv4UpdatesPerSec: routeMetrics.IPv4UpdatesPerSec, + IPv6UpdatesPerSec: routeMetrics.IPv6UpdatesPerSec, IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution, IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution, } @@ -324,8 +328,15 @@ func (s *Server) handleStats() http.HandlerFunc { const bitsPerMegabit = 1000000.0 - // Get detailed routing table stats - rtStats := s.routingTable.GetDetailedStats() + // Get route counts from database + ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts() + if err != nil { + s.logger.Warn("Failed to get live route counts", "error", err) + // Continue with zero counts + } + + // Get route update metrics + routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics() // Get handler stats handlerStats := s.streamer.GetHandlerStats() @@ -358,10 +369,10 @@ func (s *Server) handleStats() http.HandlerFunc { Peerings: dbStats.Peerings, DatabaseSizeBytes: dbStats.FileSizeBytes, LiveRoutes: dbStats.LiveRoutes, - IPv4Routes: rtStats.IPv4Routes, - IPv6Routes: rtStats.IPv6Routes, - IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate, - IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate, + IPv4Routes: ipv4Routes, + IPv6Routes: ipv6Routes, + IPv4UpdatesPerSec: routeMetrics.IPv4UpdatesPerSec, + IPv6UpdatesPerSec: routeMetrics.IPv6UpdatesPerSec, HandlerStats: handlerStatsInfo, IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution, IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,